> 文章列表 > 学习风`宇blog的websocket模块

学习风`宇blog的websocket模块

学习风`宇blog的websocket模块

文章目录

    • 后端
      • 代码
        • 引入依赖
        • WebSocketConfig
        • WebSocketServiceImpl
      • 分析
        • tb_chat_record表
        • WebSocketServiceImpl
          • ChatConfigurator
        • 聊天消息
          • ChatTypeEnums
          • WebsocketMessageDTO

后端

代码

引入依赖

仅需引入以下依赖

<!-- websocket依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency><!--fastJson-->
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.7</version>
</dependency>

WebSocketConfig

/* websocket配置类 @author yezhiqiu* @date 2021/07/29*/
@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}

WebSocketServiceImpl

/* websocket服务 @author yezhiqiu* @date 2021/07/28*/
@Data
@Service
@ServerEndpoint(value = "/websocket", configurator = WebSocketServiceImpl.ChatConfigurator.class)
public class WebSocketServiceImpl {/* 用户session*/private Session session;/* 用户session集合*/private static CopyOnWriteArraySet<WebSocketServiceImpl> webSocketSet = new CopyOnWriteArraySet<>();@Autowiredpublic void setChatRecordDao(ChatRecordDao chatRecordDao) {WebSocketServiceImpl.chatRecordDao = chatRecordDao;}@Autowiredpublic void setUploadStrategyContext(UploadStrategyContext uploadStrategyContext) {WebSocketServiceImpl.uploadStrategyContext = uploadStrategyContext;}private static ChatRecordDao chatRecordDao;private static UploadStrategyContext uploadStrategyContext;/* 获取客户端真实ip*/public static class ChatConfigurator extends ServerEndpointConfig.Configurator {public static String HEADER_NAME = "X-Real-IP";@Overridepublic void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {try {String firstFoundHeader = request.getHeaders().get(HEADER_NAME.toLowerCase()).get(0);sec.getUserProperties().put(HEADER_NAME, firstFoundHeader);} catch (Exception e) {sec.getUserProperties().put(HEADER_NAME, "未知ip");}}}/* 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session, EndpointConfig endpointConfig) throws IOException {// 加入连接this.session = session;webSocketSet.add(this);// 更新在线人数updateOnlineCount();// 加载历史聊天记录ChatRecordDTO chatRecordDTO = listChartRecords(endpointConfig);// 发送消息WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder().type(HISTORY_RECORD.getType()).data(chatRecordDTO).build();synchronized (session) {session.getBasicRemote().sendText(JSON.toJSONString(messageDTO));}}/* 收到客户端消息后调用的方法 @param message 客户端发送过来的消息*/@OnMessagepublic void onMessage(String message, Session session) throws IOException {WebsocketMessageDTO messageDTO = JSON.parseObject(message, WebsocketMessageDTO.class);switch (Objects.requireNonNull(getChatType(messageDTO.getType()))) {case SEND_MESSAGE:// 发送消息ChatRecord chatRecord = JSON.parseObject(JSON.toJSONString(messageDTO.getData()), ChatRecord.class);// 过滤html标签chatRecord.setContent(HTMLUtils.filter(chatRecord.getContent()));chatRecordDao.insert(chatRecord);messageDTO.setData(chatRecord);// 广播消息broadcastMessage(messageDTO);break;case RECALL_MESSAGE:// 撤回消息RecallMessageDTO recallMessage = JSON.parseObject(JSON.toJSONString(messageDTO.getData()), RecallMessageDTO.class);// 删除记录chatRecordDao.deleteById(recallMessage.getId());// 广播消息broadcastMessage(messageDTO);break;case HEART_BEAT:// 心跳消息messageDTO.setData("pong");session.getBasicRemote().sendText(JSON.toJSONString(JSON.toJSONString(messageDTO)));default:break;}}/* 连接关闭调用的方法*/@OnClosepublic void onClose() throws IOException {// 更新在线人数webSocketSet.remove(this);updateOnlineCount();}/* 加载历史聊天记录 @param endpointConfig 配置* @return 加载历史聊天记录*/private ChatRecordDTO listChartRecords(EndpointConfig endpointConfig) {// 获取聊天历史记录List<ChatRecord> chatRecordList = chatRecordDao.selectList(new LambdaQueryWrapper<ChatRecord>().ge(ChatRecord::getCreateTime, DateUtil.offsetHour(new Date(), -12)));// 获取当前用户ipString ipAddress = endpointConfig.getUserProperties().get(ChatConfigurator.HEADER_NAME).toString();return ChatRecordDTO.builder().chatRecordList(chatRecordList).ipAddress(ipAddress).ipSource(IpUtils.getIpSource(ipAddress)).build();}/* 更新在线人数 @throws IOException io异常*/@Asyncpublic void updateOnlineCount() throws IOException {// 获取当前在线人数WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder().type(ONLINE_COUNT.getType()).data(webSocketSet.size()).build();// 广播消息broadcastMessage(messageDTO);}/* 发送语音 @param voiceVO 语音路径*/public void sendVoice(VoiceVO voiceVO) {// 上传语音文件String content = uploadStrategyContext.executeUploadStrategy(voiceVO.getFile(), FilePathEnum.VOICE.getPath());voiceVO.setContent(content);// 保存记录ChatRecord chatRecord = BeanCopyUtils.copyObject(voiceVO, ChatRecord.class);chatRecordDao.insert(chatRecord);// 发送消息WebsocketMessageDTO messageDTO = WebsocketMessageDTO.builder().type(VOICE_MESSAGE.getType()).data(chatRecord).build();// 广播消息try {broadcastMessage(messageDTO);} catch (IOException e) {e.printStackTrace();}}/* 广播消息 @param messageDTO 消息dto* @throws IOException io异常*/private void broadcastMessage(WebsocketMessageDTO messageDTO) throws IOException {for (WebSocketServiceImpl webSocketService : webSocketSet) {synchronized (webSocketService.session) {webSocketService.session.getBasicRemote().sendText(JSON.toJSONString(messageDTO));}}}}

分析

tb_chat_record表

CREATE TABLE `tb_chat_record` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',`user_id` int(11) DEFAULT NULL COMMENT '用户id',`nickname` varchar(50) NOT NULL COMMENT '昵称',`avatar` varchar(255) NOT NULL COMMENT '头像',`content` varchar(1000) NOT NULL COMMENT '聊天内容',`ip_address` varchar(50) NOT NULL COMMENT 'ip地址',`ip_source` varchar(255) NOT NULL COMMENT 'ip来源',`type` tinyint(4) NOT NULL COMMENT '类型',`create_time` datetime NOT NULL COMMENT '创建时间',`update_time` datetime DEFAULT NULL COMMENT '更新时间',PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=2991 DEFAULT CHARSET=utf8mb4;

WebSocketServiceImpl

  • 使用@ServerEndpoint标记一个websocket服务器端点类,提供该websocket服务端点的连接路径,并可以使用configurator属性指定一个配置器,该配置器可以介入握手过程。

  • 这个类基本上处理了websocket的几乎所有逻辑,每当有一个新的连接进来时,都会创建一个新的WebSocketServiceImpl对象,并且回调@OnOpen标识的方法,@OnOpen方法可以声明Session 和 EndpointConfig 类型参数,Session将会被存储起来,用于后面与客户端进行双向通信。

学习风`宇blog的websocket模块

ChatConfigurator
  • 在握手(即在modifyHandShake方法中)时,获取客户端的ip,存入ServerEndpointConfig的userProperties属性中。等到在@OnOpen表示的方法中可以声明EndpointConfig参数类型,拿到userProperties,从而拿到存到里面的客户端的ip。

  • 属于ServerEndpointConfig.Configurator类型,可追溯到UpgradeUtil#doUpgrade升级协议时的处理,在Configurator类的modifyHandShake方法中,可以拿到握手请求对象,握手成功之后,@OnOpen方法才会调用执行。

    • 还有一点就是,不能每个客户端想连接websocket服务端的时候,就来连接吧?!参考:【JavaScript】在websocket里面添加Token

      • 至少需要携带一个凭证,放在请求头里面,就可以在这个modifyHandShake方法里做手脚,前端通过let websocket = new WebSocket('ws://localhost:8084/websocket/user001/username001,“eyxxxx-yyyy”),第二个参数就是"Sec-WebSocket-Protocol"请求头,服务端需要返回一摸一样的响应头,并且值也要跟客户端发过来的值一样,websocket才会连接成功,否则,不会建立websocket连接。但是这样把协议头变成了token,不知道合不合适。
      • 还有一些变通的方法,比如:
        1. 可以在ws://…后面拼接查询参数,然后再在modifyHandShake里面校验。
        2. 也可以websocket连接完成后,再让客户端把token发过来,如果token不对,立即断掉websocket连接
public static class ChatConfigurator extends ServerEndpointConfig.Configurator {public static String HEADER_NAME = "X-Real-IP";@Overridepublic void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {try {String firstFoundHeader = request.getHeaders().get(HEADER_NAME.toLowerCase()).get(0);sec.getUserProperties().put(HEADER_NAME, firstFoundHeader);} catch (Exception e) {sec.getUserProperties().put(HEADER_NAME, "未知ip");}}
}

聊天消息

ChatTypeEnums

websocket服务端和客户端之间发送的消息内容,使用json格式,它必须先指明消息类型,然后对方得到消息类型后,就能根据该消息类型做相应的处理。
学习风`宇blog的websocket模块

WebsocketMessageDTO

不管什么消息,都能转为WebsocketMessageDTO类型。浏览器客户端发过来的消息必须是json格式,并且有type标识消息类型,然后data标识消息内容。根据不同的消息类型type,消息内容中的数据会有不同。
学习风`宇blog的websocket模块

// 其中data是个字符串
WebsocketMessageDTO wsMsgDto = JSON.parseObject("{\\"type\\":100,\\"data\\":\\"very good~\\"}", WebsocketMessageDTO.class);
System.out.println(wsMsgDto.getData()); // very good~ // String类型// 其中data是个json格式字符串
WebsocketMessageDTO wsMsgDto2 = JSON.parseObject("{\\"type\\":100,\\"data\\":{\\"name\\":\\"zzhua\\",\\"sex\\":1}}",WebsocketMessageDTO.class);
System.out.println(wsMsgDto2.getData()); // {"sex":1,"name":"zzhua"} // JSONObject类型, 里面使用map存储了name->zzhua,sex->1// 其中data是个多层级的json格式字符串
WebsocketMessageDTO wsMsgDto3 = JSON.parseObject("{\\"type\\":100,\\"data\\":{\\"name\\":\\"zzhua\\",\\"sex\\":1, \\"info\\":{\\"idcard\\":\\"430xxx\\",\\"hobbies\\":[\\"java\\",\\"spring\\",\\"vue\\"]}}}",WebsocketMessageDTO.class);
System.out.println(wsMsgDto3.getData());