Netty实战与调优
Netty实战与调优
聊天室业务介绍
代码参考
/* 用户管理接口*/
public interface UserService {/* 登录* @param username 用户名* @param password 密码* @return 登录成功返回 true, 否则返回 false*/boolean login(String username, String password);
}
/* 会话管理接口*/
public interface Session {/* 绑定会话* @param channel 哪个 channel 要绑定会话* @param username 会话绑定用户*/void bind(Channel channel, String username);/* 解绑会话* @param channel 哪个 channel 要解绑会话*/void unbind(Channel channel);/* 获取属性* @param channel 哪个 channel* @param name 属性名* @return 属性值*/Object getAttribute(Channel channel, String name);/* 设置属性* @param channel 哪个 channel* @param name 属性名* @param value 属性值*/void setAttribute(Channel channel, String name, Object value);/* 根据用户名获取 channel* @param username 用户名* @return channel*/Channel getChannel(String username);
}
/* 聊天组会话管理接口*/
public interface GroupSession {/* 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null* @param name 组名* @param members 成员* @return 成功时返回组对象, 失败返回 null*/Group createGroup(String name, Set<String> members);/* 加入聊天组* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group joinMember(String name, String member);/* 移除组成员* @param name 组名* @param member 成员名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeMember(String name, String member);/* 移除聊天组* @param name 组名* @return 如果组不存在返回 null, 否则返回组对象*/Group removeGroup(String name);/* 获取组成员* @param name 组名* @return 成员集合, 没有成员会返回 empty set*/Set<String> getMembers(String name);/* 获取组成员的 channel 集合, 只有在线的 channel 才会返回* @param name 组名* @return 成员 channel 集合*/List<Channel> getMembersChannel(String name);
}
注册事件处理器
@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();GroupChatMessageHandler GROUP_CHAT_HANDLER = new GroupChatMessageHandler();QuitHandler QUIT_HANDLER = new QuitHandler();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//空闲检测 arg1:读空闲 arg2:写空闲 arg3:读写空闲 单位:s 0代表不关注ch.pipeline().addLast(new IdleStateHandler(5,0,0));//处理空闲事件ch.pipeline().addLast(new ChannelDuplexHandler(){@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent){IdleStateEvent event = (IdleStateEvent) evt;if (event.state().equals(IdleState.READER_IDLE)){log.info("读空闲超过5s!");}}}});ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);ch.pipeline().addLast(LOGIN_HANDLER);ch.pipeline().addLast(CHAT_HANDLER);ch.pipeline().addLast(GROUP_CREATE_HANDLER);ch.pipeline().addLast(GROUP_CHAT_HANDLER);ch.pipeline().addLast(QUIT_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
@Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);AtomicBoolean LOGIN = new AtomicBoolean(false);try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());
// ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(MESSAGE_CODEC);//添加心跳机制ch.pipeline().addLast(new IdleStateHandler(0,3,0));ch.pipeline().addLast(new ChannelDuplexHandler(){@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent){IdleStateEvent event = (IdleStateEvent) evt;//超过3s没有写入数据则触发WRITER_IDLE事件if (IdleState.WRITER_IDLE.equals(event.state())){
// log.info("发送心跳数据包...");ctx.writeAndFlush(new PingMessage());}}}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {new Thread(() -> {Scanner scanner = new Scanner(System.in);System.out.print("username:");String username = scanner.nextLine();System.out.print("password:");String password = scanner.nextLine();LoginRequestMessage request = new LoginRequestMessage(username, password);ctx.channel().writeAndFlush(request);//阻塞try {WAIT_FOR_LOGIN.await();} catch (InterruptedException e) {throw new RuntimeException(e);}if (LOGIN.get()) {while (true) {System.out.println("==================================");System.out.println("send [username] [content]");System.out.println("gsend [group name] [content]");System.out.println("gcreate [group name] [m1,m2,m3...]");System.out.println("gmembers [group name]");System.out.println("gjoin [group name]");System.out.println("gquit [group name]");System.out.println("quit");System.out.println("==================================");String command = scanner.nextLine();if (command != null && command.length() > 0) {String[] commandList = command.split(" ");switch (commandList[0]) {case "send":ChatRequestMessage message = new ChatRequestMessage(username, commandList[1], commandList[2]);ctx.channel().writeAndFlush(message);break;case "gsend":GroupChatRequestMessage groupMessage = new GroupChatRequestMessage(username, commandList[1], commandList[2]);ctx.channel().writeAndFlush(groupMessage);break;case "gcreate":String members = commandList[2];if (members != null && members.length() > 0) {String[] memberList = members.split(",");Set<String> memberSet = Arrays.stream(memberList).collect(Collectors.toSet());memberSet.add(username);GroupCreateRequestMessage groupCreateRequestMessage = new GroupCreateRequestMessage(commandList[1], memberSet);ctx.channel().writeAndFlush(groupCreateRequestMessage);}break;case "gmembers":GroupMembersRequestMessage groupMembersRequestMessage = new GroupMembersRequestMessage(commandList[1]);ctx.channel().writeAndFlush(groupMembersRequestMessage);break;case "gjoin":GroupJoinRequestMessage groupJoinRequestMessage = new GroupJoinRequestMessage(username, commandList[1]);ctx.channel().writeAndFlush(groupJoinRequestMessage);break;case "gquit":GroupQuitRequestMessage groupQuitRequestMessage = new GroupQuitRequestMessage(username, commandList[1]);ctx.channel().writeAndFlush(groupQuitRequestMessage);break;case "quit":ctx.channel().close();return;}}}} else {ctx.channel().close();}}).start();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("msg:{}", msg);if (msg instanceof LoginResponseMessage response) {LOGIN.set(response.isSuccess());WAIT_FOR_LOGIN.countDown();}}});}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
登录
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {String username = msg.getUsername();String password = msg.getPassword();boolean login = UserServiceFactory.getUserService().login(username, password);if (login) {SessionFactory.getSession().bind(ctx.channel(), username);}ctx.channel().writeAndFlush(new LoginResponseMessage(login, login ? "登录成功!" : "登录失败,用户名或密码有误!"));}
}
单聊
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage chatRequestMessage) throws Exception {String to = chatRequestMessage.getTo();String from = chatRequestMessage.getFrom();String content = chatRequestMessage.getContent();Channel channel = SessionFactory.getSession().getChannel(to);if (channel!=null){//转发消息channel.writeAndFlush(new ChatResponseMessage(from,content));}else {channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,"发送失败,用户不在线!"));}}
}
创建群聊
@ChannelHandler.Sharable
@Slf4j
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupCreateRequestMessage groupCreateRequestMessage) throws Exception {Set<String> members = groupCreateRequestMessage.getMembers();String groupName = groupCreateRequestMessage.getGroupName();GroupSession session = GroupSessionFactory.getGroupSession();Group group = session.createGroup(groupName, members);GroupCreateResponseMessage message = null;if (group != null) {message = new GroupCreateResponseMessage(false, "创建群聊失败!");} else {message = new GroupCreateResponseMessage(true, "创建群聊成功!");List<Channel> channels = session.getMembersChannel(groupName);channels.forEach(channel -> {GroupJoinResponseMessage joinResponseMessage = new GroupJoinResponseMessage(true, "您已被拉入群聊:" + groupName);channel.writeAndFlush(joinResponseMessage);});}channelHandlerContext.writeAndFlush(message);}
}
发送群聊消息
@ChannelHandler.Sharable
public class GroupChatMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupChatRequestMessage groupChatRequestMessage) throws Exception {String groupName = groupChatRequestMessage.getGroupName();String msg = groupChatRequestMessage.getContent();String from = groupChatRequestMessage.getFrom();List<Channel> channels = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);channels.forEach(channel -> {channel.writeAndFlush(new GroupChatResponseMessage(from, msg));});}
}
退出
@ChannelHandler.Sharable
@Slf4j
public class QuitHandler extends ChannelInboundHandlerAdapter {//处理channel正常关闭事件@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {//移除channelSessionFactory.getSession().unbind(ctx.channel());log.info("channel:{}已断开!",ctx.channel());}//处理channel异常关闭事件@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {SessionFactory.getSession().unbind(ctx.channel());log.info("channel:{}异常断开!",ctx.channel());}
}
空闲检测
连接假死
原因
- 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
- 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
- 应用程序线程阻塞,无法进行数据读写
问题
- 假死的连接占用的资源不能自动释放
- 向假死的连接发送数据,得到的反馈是发送超时
服务器端解决
- 怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
//空闲检测 arg1:读空闲 arg2:写空闲 arg3:读写空闲 单位:s 0代表不关注ch.pipeline().addLast(new IdleStateHandler(5,0,0));//处理空闲事件ch.pipeline().addLast(new ChannelDuplexHandler(){@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent){IdleStateEvent event = (IdleStateEvent) evt;if (event.state().equals(IdleState.READER_IDLE)){log.info("读空闲超过5s!");//关闭channel}}}});
客户端定时心跳
- 客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
//添加心跳机制ch.pipeline().addLast(new IdleStateHandler(0,3,0));ch.pipeline().addLast(new ChannelDuplexHandler(){@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent){IdleStateEvent event = (IdleStateEvent) evt;//超过3s没有写入数据则触发WRITER_IDLE事件if (IdleState.WRITER_IDLE.equals(event.state())){log.info("发送心跳数据包...");ctx.writeAndFlush(new PingMessage());}}}});
优化
拓展序列化算法
序列化,反序列化主要用在消息正文的转换上
- 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
- 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
为了支持更多序列化算法,抽象一个 Serializer 接口
public interface Serializer {// 反序列化方法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化方法<T> byte[] serialize(T object);}
提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
enum Algorithm implements Serializer {Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));Object object = in.readObject();return (T) object;} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream out = new ByteArrayOutputStream();new ObjectOutputStream(out).writeObject(object);return out.toByteArray();} catch (IOException e) {throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);}}},// Json 实现(引入了 Gson 依赖)Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);}@Overridepublic <T> byte[] serialize(T object) {return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);}};// 需要从协议的字节中得到是哪种序列化算法public static Algorithm getByInt(int type) {Algorithm[] array = Algorithm.values();if (type < 0 || type > array.length - 1) {throw new IllegalArgumentException("Out of index!");}return array[type];}}
增加配置类和配置文件
public abstract class Config {static Properties properties;static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);} catch (IOException e) {throw new ExceptionInInitializerError(e);}}public static int getServerPort() {String value = properties.getProperty("server.port");if(value == null) {return 8080;} else {return Integer.parseInt(value);}}public static Serializer.Algorithm getSerializerAlgorithm() {String value = properties.getProperty("serializer.algorithm");if(value == null) {return Serializer.Algorithm.Java;} else {return Serializer.Algorithm.valueOf(value);}}
}
配置文件
serializer.algorithm=Json
修改编解码器
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}
参数调优
CONNECT_TIMEOUT_MILLIS
- 属于 SocketChannal的参数
- 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
- 注意:Netty 中不要用成了SO_TIMEOUT 主要用在阻塞 IO,而 Netty 是非阻塞 IO
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,300);// SocketChannel0.3s内未建立连接就抛出异常
源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();//获取超时时间if (connectTimeoutMillis > 0) { //创建定时任务connectTimeoutFuture = eventLoop().schedule(new Runnable() {@Overridepublic void run() { //如果超时后Promise依然存在,则向Promise标记异常ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause =new ConnectTimeoutException("connection timed out: " + remoteAddress);if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());}}}, connectTimeoutMillis, TimeUnit.MILLISECONDS);//在connectTimeoutMillis后开始执行任务}