> 文章列表 > Netty 教程 – 初窥Netty编程

Netty 教程 – 初窥Netty编程

Netty 教程 – 初窥Netty编程

什么是Netty

Netty是业界有名且最流行的NIO框架之一,健壮,稳定,高性能,可定制,可扩展在同类框架都是首屈一指,而且成功的运用在各大商业项目中,比如HadoopRPC框架avro,当当接盘的DubboX都在使用…

Netty 的优点

  • API使用简单,开发门槛低
  • 功能强大,多种解码与编码器
  • 支持多种主流的通讯协议
  • 定制能力强大,可以通过ChannelHandler对通讯框架灵活的扩展
  • 相比业界主流NIO框架,Netty综合评价更高
  • 成熟稳定,社区活跃

Netty 缺点

  • 5.x 模型存在问题,已被废弃

编译

GIT:GitHub - netty/netty: Netty project - an event-driven asynchronous network application framework

如果需要编译Netty,需要最低JDK1.7,在运行时Netty3.x只需要JDK1.5,同时博客参考 李林峰 大神的 《Netty权威指南第二版》…

因为主要是学习Netty,而不是实战,同时为了更好的适配即将推出的Netty6,用Netty5的API也许会更好点,就当为Netty6做技术储备吧…

如果使用Maven,在项目中需要添加

<dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>5.0.0.Alpha2</version></dependency>
</dependencies>

Hello Netty

继续用TimeServer 与 TimeClient 为例,改造代码使用Netty实现服务端客户端的通讯,以及上一章博客遗留的数据丢失问题,在使用Netty后都是不在的…

TimeServer

public static void bind(int port) {EventLoopGroup masterGroup = new NioEventLoopGroup();//创建线程组EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();//创建NIO服务端启动辅助类bootstrap.group(masterGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)//连接数.option(ChannelOption.TCP_NODELAY, true)//不延迟,立即推送.childHandler(new ChildChannelHandler());//绑定端口,同步等待成功,System.out.println("绑定端口,同步等待成功......");ChannelFuture future = bootstrap.bind(port).sync();//等待服务端监听端口关闭future.channel().closeFuture().sync();System.out.println("等待服务端监听端口关闭......");} catch (Exception e) {e.printStackTrace();} finally {//优雅退出释放线程池masterGroup.shutdownGracefully();workerGroup.shutdownGracefully();System.out.println("优雅退出释放线程池......");}
}

1.实例化个含NIO线程的线程组,专门用来处理网络事件,管理线程
2.使用ServerBootstrap类来初始化Netty服务器,并且开始监听端口的socket请求
3.根据ServerBootstrap內封装好的方法设置服务器基础信息
4.设置服务端的连接数 ChannelOption.SO_BACKLOG 为 1024
5.设置服务端消息不延迟,立即推送 ChannelOption.TCP_NODELAY
6.配置好服务器,在服务器启动时绑定闯入的port端口,等待同步
7.优雅退出,释放资源

数据操作类ServerHandler:在该类中对客户端发来的数据进行操作,发送给客户端数据

private static class ChildChannelHandler extends ChannelInitializer {@Overrideprotected void initChannel(Channel channel) throws Exception {//创建Channel通道channel.pipeline().addLast(new TimeServerHandler());//往通道中添加i/o事件处理类}private static class TimeServerHandler extends ChannelHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//channelRead方法中的msg参数(服务器接收到的客户端发送的消息)强制转换成ByteBuf类型ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");//返回String,指定编码(有可能出现不支持的编码类型异常,所以要trycatchSystem.out.println("TimeServer 接收到的消息 :" + body);ByteBuf resp = Unpooled.copiedBuffer("你在说什么呢...".getBytes());ctx.write(resp);// 将消息放入缓冲数组}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {//将消息队列中信息写入到SocketChannel中去,解决了频繁唤醒Selector所带来不必要的性能开销//Netty的 write 只是将消息放入缓冲数组,再通过调用 flush 才会把缓冲区的数据写入到 SocketChannelctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();//发生异常时候,执行重写后的 exceptionCaught 进行资源关闭}}
}public static void main(String[] args) {TimeServer.bind(4040);
}

ChildChannelHandler可以看到,只需重写对应的方法,即可简单的实现一个Netty通讯的服务端,这里的ByteBuf可以看成是NettyByteBuffer的增强实现…

接下来编写TimeClient程序

TimeClient

public static void connect(String host, int port) {EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();ChannelFuture future = null;try {bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)//不延迟,消息立即推送.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new TimeClientHandler());}});//发起异步请求future = bootstrap.connect(host, port).sync();//等待客户端链路关闭future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}
}

一个很简单的connect连接操作,在服务端我们使用的是ServerBootstrap 客户端使用 Bootstrap就可以了,是不是很像NIO编程中的ServerSocketChannel 与 SocketChannel,这里与服务端不同的是它的Channel需要设置为NioSocketChannel,然后为其添加一个handler…..

private static class TimeClientHandler extends ChannelHandlerAdapter {private final ByteBuf firstMessage;public TimeClientHandler() {byte[] req = "QUERY TIME ORDER ".getBytes();firstMessage = Unpooled.buffer(req.length);firstMessage.writeBytes(req);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(firstMessage);//推送消息}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("TimeClient 接收到的消息 :" + body);//ctx.close();//接受完消息关闭连接,注释掉可以看到释放资源,否则请求完后就关闭连接是看不到异常情况的}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("释放资源:" + cause.getMessage());//不重写将会看到堆栈信息以及资源无法关闭ctx.close();}
}public static void main(String[] args) {TimeClient.connect("127.0.0.1", 4040);
}

1.首先跟服务器操作类一样继承ChannelHandlerAdapter类,重写channelReadchannelActiveexceptionCaught三个方法
2.其中channelActive方法是用来发送客户端信息的,channelRead方法客户端是接收服务器数据的
3.先声明一个全局变量firstMessage,用来接收客户端发出去的信息的值
4.在channelActive方法中,把要传输的数据转化为字节数组
5.在channelRead方法中,通过ByteBuf接收服务端回复的内容,然后关闭当前连接(当然具体是否关闭取决于业务了,如果请求完就关闭连接是看不到释放资源的日志)
6.在exceptionCaught方法中,可以处理一些异常情况,比如服务器关闭,该方法会监听到…

测试一下

启动 TimeServer

绑定端口,同步等待成功......
TimeServer 接收到的消息 :QUERY TIME ORDER

启动 TimeClient 然后关闭 TimeServer

TimeClient 接收到的消息 :你在说什么呢...
释放资源:远程主机强迫关闭了一个现有的连接。