Netty入门案例,实现简单地服务端到客户端的数据发送和读取
案例使用Netty实现简单地服务端到客户端的数据发送和读取
一、导入依赖
Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标。
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.69.Final</version>
</dependency>
二、案例代码
NettyServerDemo : 服务端Demo依赖 NettyServerHandlerDemo
NettyServerHandlerDemo:服务端的一个ChannelHandler的实现类,用来承接业务逻辑
2.1 服务端代码
public class NettyServerDemo {/* 服务端Demo实现目标* 1.创建BossGroup线程组,处理网络连接事件* 2.创建workerGroup线程组 处理网络读写事件* 3.创建服务端启动助手,serverBootStrap* 4.设置服务端通道实现方式为NIO* 5.设置服务端options* 6.创建通道初始化对象* 8.向pipeline中添加自定义业务初处理逻辑handler* 9.启动服务端并绑定端口,将异步改为同步* 10.关闭通道和连接池*/public static void main(String[] args) throws InterruptedException {//1.创建BossGroup线程组,处理网络连接事件,默认线程数量与电脑处理器相关,处理器线程数*2EventLoopGroup bossGroup = new NioEventLoopGroup(1);//2.创建workerGroup线程组 处理网络读写事件EventLoopGroup wrokerGroup = new NioEventLoopGroup();//3.创建服务端启动助手,serverBootStrapServerBootstrap serverBootstrap = new ServerBootstrap();//配置线程组serverBootstrap.group(bossGroup, wrokerGroup).channel(NioServerSocketChannel.class)//配置Channel的实现,这里使用NIO的TCP服务端Channel.option(ChannelOption.SO_BACKLOG, 128)//对于阻塞的连接队列大小的配置.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//开启keep-alive 会根据操作系统的设定保持长连接并检测连接可用性.childHandler(new ChannelInitializer<SocketChannel>() {//通过一个特殊的ChannelInboundHandler 来初始化注册到EventLoop的Channel@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NettyServerHandlerDemo());}});//启动服务并绑定端口,并且将异步改成同步ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();System.out.println("服务器启动成功。。。");//关闭通道(停止接受新的连接)监听通道关闭的状态,这里改为同步后通道关闭才会返回channelFuture.channel().closeFuture().sync();bossGroup.shutdownGracefully();wrokerGroup.shutdownGracefully();}}
/* 自定义处理Handler*/
public class NettyServerHandlerDemo implements ChannelInboundHandler {/* 通道读取事件 @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8));}/* 通道读取完毕事件 @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("你好.我是Netty服务端",CharsetUtil.UTF_8));//消息出站}/* 通道异常事件 @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}}
2.2 客户端代码
NettyClientDemo 客户端启动器
NettyClientHandlerDemo 承载客户端业务代码的 channelhandler
public class NettyClientDemo {/* 1.创建线程组* 2.设置线程组启动助手 bootstrap* 3.设置客户端通道为NIO* 4.创建通道初始化对象* 5.向pipeline中添加自定义业务处理的handler* 6.启动客户端,等待链接服务端,同时将异步改为同步* 8.关闭通道和连接池*/public static void main(String[] args) throws InterruptedException {//1.创建线程组EventLoopGroup group=new NioEventLoopGroup();//2.创建bootstrapBootstrap bootstrap=new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyClientHandlerDemo());}});//启动客户端ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();//关闭通道和连接池//监听通道关闭的状态事件channelFuture.channel().closeFuture().sync();group.shutdownGracefully();}}
public class NettyClientHandlerDemo implements ChannelInboundHandler {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("Hello! 我是Netty客户端!", CharsetUtil.UTF_8));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println("服务端发来消息:" + byteBuf.toString(CharsetUtil.UTF_8));}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}
}
2.3 关于ChannelInitializer
- 当一个新的客户端连接到服务器时,Netty 会创建一个新的 Channel 和对应的 ChannelPipeline,并将 ChannelInitializer 添加为该 Pipeline 的第一个处理器。ChannelInitializer 主要任务是负责向 Pipeline 中添加其他需要的处理器,以完成数据的编解码、业务逻辑处理等操作。
- ChannelInitializer 是一个入站处理器,用于初始化新连接的 ChannelPipeline,并不直接对出站和入站消息进行处理。当 ChannelInitializer 的 initChannel() 方法被调用时,它会向该 Pipeline 中添加其他的 ChannelHandler,包括入站和出站处理器,以完成数据的编解码、业务逻辑处理等操作。
- 一旦完成 Pipeline 的初始化工作后,ChannelInitializer 就没有了实质性的作用,不会再对出站和入站消息进行处理。而是交由其他的具体的处理器来完成具体的业务逻辑。
- ChannelInitializer 会一直存在于 Pipeline 中,直到连接关闭或者手动从 Pipeline 中移除。
三、Future和Future-Listener
JAVA中原本的Future参考:异步Future模式
表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,ChannelFuture 是他的一个子接口. ChannelFuture 是一个接口 ,可以添加监听器,当监听的事件发生时,就会通知到监听器当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态, 注册监听函数来执行完成后的操作。
- sync 方法, 阻塞等待程序结果反回
- isDone 方法来判断当前操作是否完成;
- isSuccess 方法来判断已完成的当前操作是否成功;
- getCause 方法来获取已完成的当前操作失败的原因;
- isCancelled 方法来判断已完成的当前操作是否被取消;
- addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果Future 对象已完成,则通知指定的监听器
ChannelFuture future = bootstrap.bind(9999);
future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println("端口绑定成功!");} else {System.out.println("端口绑定失败!");}}
});ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println("数据发送成功.");} else {System.out.println("数据发送失败.");}}
});