> 文章列表 > 从零开发短视频电商 Java Websocket方案之Java_Websocket(二)

从零开发短视频电商 Java Websocket方案之Java_Websocket(二)

从零开发短视频电商 Java Websocket方案之Java_Websocket(二)

文章目录

    • 保活策略
    • 集群问题
    • 流量整形/流控
    • 消息压缩
    • 堆外内存优化
    • 限制连接数
    • IP黑白名单过滤
    • 认证授权过滤
    • 框架内部日志

保活策略

通常建议在客户端来做心跳,减少服务端压力.

客户端定时发送 ping操作帧 即可

当服务端接收到ping操作帧后,会自动发送pong帧。Java_websocket的客户端默认心跳数据包每 60 秒自动发送一次

服务端接收到Ping后会自动发送Pong相关核心代码

// 服务端接收到不同的帧做不同的处理 
public void processFrame(WebSocketImpl webSocketImpl, Framedata frame)throws InvalidDataException {Opcode curop = frame.getOpcode();if (curop == Opcode.CLOSING) {// 接收到 关闭 ,关闭连接processFrameClosing(webSocketImpl, frame);} else if (curop == Opcode.PING) {// 接收到 ping ,传递事件 会自动发个 pongwebSocketImpl.getWebSocketListener().onWebsocketPing(webSocketImpl, frame);} else if (curop == Opcode.PONG) {// 接收到 pong , 更新下对应连接的最新pong时间并 传递事件webSocketImpl.updateLastPong();// 传递事件webSocketImpl.getWebSocketListener().onWebsocketPong(webSocketImpl, frame);} else if (!frame.isFin() || curop == Opcode.CONTINUOUS) {processFrameContinuousAndNonFin(webSocketImpl, frame, curop);} else if (currentContinuousFrame != null) {log.error("Protocol error: Continuous frame sequence not completed.");throw new InvalidDataException(CloseFrame.PROTOCOL_ERROR,"Continuous frame sequence not completed.");} else if (curop == Opcode.TEXT) {// 文本帧 帮我们转了一手 Charsetfunctions.stringUtf8(frame.getPayloadData())processFrameText(webSocketImpl, frame);} else if (curop == Opcode.BINARY) {// 二进制帧processFrameBinary(webSocketImpl, frame);} else {log.error("non control or continious frame expected");throw new InvalidDataException(CloseFrame.PROTOCOL_ERROR,"non control or continious frame expected");}}// 当服务端接收到`ping操作帧`后,会自动发送`pong帧`。
public abstract class WebSocketAdapter implements WebSocketListener {  @Overridepublic void onWebsocketPing(WebSocket conn, Framedata f) {conn.sendFrame(new PongFrame((PingFrame) f));}

集群问题

  • 广播(发布订阅)模式方案
    • 用rocketmq广播。
    • 用redis订阅发布。
  • 高可用方案
    • 客户端配置多个server地址。
    • 每次都跟多个server通信,即每个server多少平等的。
  • 共享状态+转发方案
    • redis记录会话id:serverId(serverIp+serverPort)
    • 每次发送的时候根据会话id判断是自己server node发,还是其他node,如果是自己直接发就行,其他node就给它发送个rpc请求。

流量整形/流控

流量整形(Traffic Shaping)是一种主动调整流量输出速率的措施。流量整形与流量监管的主要区别在于,流量整形对流量监管中需要丢弃的报文进行缓存——通常是将它们放入缓冲区或队列内,也称流量整形(Traffic Shaping,简称TS)。当报文的发送速度过快时,首先在缓冲区进行缓存;再通过流量计量算法的控制下“均匀”地发送这些被缓冲的报文。流量整形与流量监管的另一区别是,整形可能会增加延迟,而监管几乎不引入额外的延迟。

消息发送保护机制
通过流量整形可以控制发送速度,但是它的控制原理是将待发送的消息封装成Task放入消息队列,等待执行时间到达后继续发送,所以如果业务发送线程不判断channle的可以状态,就可能会导致OOM问题。

java_websocket不支持。。。Netty支持哈,高低水位是需要你在send之前判断isWrite,具体请参考netty

消息压缩

只体会到了 server响应的消息压缩。

请求示例

    GET ws://127.0.0.1:8085/ HTTP/1.1\\r\\nHost: 127.0.0.1:8085\\r\\nConnection: Upgrade\\r\\nPragma: no-cache\\r\\nCache-Control: no-cache\\r\\nUpgrade: websocket\\r\\nOrigin: http://www.websocket-test.com\\r\\nSec-WebSocket-Version: 13\\r\\nUser-Agent: Mozilla/5.0 (Linux; Android 6.0; H60-L01 Build/HDH60-L01) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.124 Mobile Safari/537.36\\r\\nAccept-Encoding: gzip, deflate, sdch\\r\\nAccept-Language: zh-CN,zh;q=0.8\\r\\nSec-WebSocket-Key: S4iljLdlI5qk3jpx2fHU4A==\\r\\n// 这里Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\\r\\n

响应示例

    HTTP/1.1 101 \\r\\nServer: TooTallNate Java-WebSocket\\r\\nUpgrade: websocket\\r\\nConnection: upgrade\\r\\nSec-WebSocket-Accept: xwLDQrb5kzxpZDdeTcUd+7diXXU=\\r\\n// 这里Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15\\r\\nDate: Sun, 09 Oct 2016 23:07:39 GMT\\r\\n

根据请求中的 Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits来协商是否对传输数据进行deflate压缩。

重点响应头字段 Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15

// 客户端
private static final Draft perMessageDeflateDraft = new Draft_6455(new PerMessageDeflateExtension());  
private static class DeflateClient extends WebSocketClient { public DeflateClient() throws URISyntaxException {super(new URI("ws://localhost:" + PORT), perMessageDeflateDraft);
}
// 服务端        
private static class DeflateServer extends WebSocketServer {public DeflateServer() { super(new InetSocketAddress(PORT), Collections.singletonList(perMessageDeflateDraft));
}

Netty类似如下

        .childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(65536));if (config.isUseCompressionHandler()) {// 这里使用netty内置的WebSocketServerCompressionHandlerpipeline.addLast(new WebSocketServerCompressionHandler());}pipeline.addLast(new HttpServerHandler(xx, config));}});

堆外内存优化

当发送量非常大 (如:100000/s),可以使用堆外内存(零拷贝提升性能)来群发进行优化

        // sessions 为 List<Session>String str = "Hello Netty!";byte[] bytes = str.getBytes();ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(bytes.length).writeBytes(bytes);try {for (Session session : sessions) {if (session.isWritable()) {session.sendText(buf.retainedDuplicate());}}} catch (Exception e) {e.printStackTrace();} finally {ReferenceCountUtil.release(buf);} 

限制连接数

如果需要严格限制的话,可以直接使用AtomicInteger类对连接数进行统计,当超过限制时关闭连接

IP黑白名单过滤

在握手时只做一次即可,跟http server一个逻辑。

认证授权过滤

在握手时只做一次即可,跟http server鉴权一个逻辑。

框架内部日志

将此添加到 logback.xml

<logger name="org.java_websocket" level="TRACE"> <appender-ref ref="CONSOLE"/> 
</logger>