从零开发短视频电商 Java Websocket方案之Java_Websocket(二)
保活策略
通常建议在客户端来做心跳,减少服务端压力.
客户端定时发送 ping操作帧
即可
当服务端接收到ping操作帧
后,会自动发送pong帧
。Java_websocket的客户端默认心跳数据包每 60 秒自动发送一次
服务端接收到Ping后会自动发送Pong相关核心代码
// 服务端接收到不同的帧做不同的处理
public void processFrame(WebSocketImpl webSocketImpl, Framedata frame)throws InvalidDataException {Opcode curop = frame.getOpcode();if (curop == Opcode.CLOSING) {// 接收到 关闭 ,关闭连接processFrameClosing(webSocketImpl, frame);} else if (curop == Opcode.PING) {// 接收到 ping ,传递事件 会自动发个 pongwebSocketImpl.getWebSocketListener().onWebsocketPing(webSocketImpl, frame);} else if (curop == Opcode.PONG) {// 接收到 pong , 更新下对应连接的最新pong时间并 传递事件webSocketImpl.updateLastPong();// 传递事件webSocketImpl.getWebSocketListener().onWebsocketPong(webSocketImpl, frame);} else if (!frame.isFin() || curop == Opcode.CONTINUOUS) {processFrameContinuousAndNonFin(webSocketImpl, frame, curop);} else if (currentContinuousFrame != null) {log.error("Protocol error: Continuous frame sequence not completed.");throw new InvalidDataException(CloseFrame.PROTOCOL_ERROR,"Continuous frame sequence not completed.");} else if (curop == Opcode.TEXT) {// 文本帧 帮我们转了一手 Charsetfunctions.stringUtf8(frame.getPayloadData())processFrameText(webSocketImpl, frame);} else if (curop == Opcode.BINARY) {// 二进制帧processFrameBinary(webSocketImpl, frame);} else {log.error("non control or continious frame expected");throw new InvalidDataException(CloseFrame.PROTOCOL_ERROR,"non control or continious frame expected");}}// 当服务端接收到`ping操作帧`后,会自动发送`pong帧`。
public abstract class WebSocketAdapter implements WebSocketListener { @Overridepublic void onWebsocketPing(WebSocket conn, Framedata f) {conn.sendFrame(new PongFrame((PingFrame) f));}
集群问题
- 广播(发布订阅)模式方案
- 用rocketmq广播。
- 用redis订阅发布。
- 高可用方案
- 客户端配置多个server地址。
- 每次都跟多个server通信,即每个server多少平等的。
- 共享状态+转发方案
- redis记录会话id:serverId(serverIp+serverPort)
- 每次发送的时候根据会话id判断是自己server node发,还是其他node,如果是自己直接发就行,其他node就给它发送个rpc请求。
流量整形/流控
流量整形(Traffic Shaping)是一种主动调整流量输出速率的措施。流量整形与流量监管的主要区别在于,流量整形对流量监管中需要丢弃的报文进行缓存——通常是将它们放入缓冲区或队列内,也称流量整形(Traffic Shaping,简称TS)。当报文的发送速度过快时,首先在缓冲区进行缓存;再通过流量计量算法的控制下“均匀”地发送这些被缓冲的报文。流量整形与流量监管的另一区别是,整形可能会增加延迟,而监管几乎不引入额外的延迟。
消息发送保护机制
通过流量整形可以控制发送速度,但是它的控制原理是将待发送的消息封装成Task放入消息队列,等待执行时间到达后继续发送,所以如果业务发送线程不判断channle的可以状态,就可能会导致OOM问题。
java_websocket不支持。。。Netty支持哈,高低水位是需要你在send之前判断isWrite,具体请参考netty
消息压缩
只体会到了 server响应的消息压缩。
请求示例
GET ws://127.0.0.1:8085/ HTTP/1.1\\r\\nHost: 127.0.0.1:8085\\r\\nConnection: Upgrade\\r\\nPragma: no-cache\\r\\nCache-Control: no-cache\\r\\nUpgrade: websocket\\r\\nOrigin: http://www.websocket-test.com\\r\\nSec-WebSocket-Version: 13\\r\\nUser-Agent: Mozilla/5.0 (Linux; Android 6.0; H60-L01 Build/HDH60-L01) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.124 Mobile Safari/537.36\\r\\nAccept-Encoding: gzip, deflate, sdch\\r\\nAccept-Language: zh-CN,zh;q=0.8\\r\\nSec-WebSocket-Key: S4iljLdlI5qk3jpx2fHU4A==\\r\\n// 这里Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\\r\\n
响应示例
HTTP/1.1 101 \\r\\nServer: TooTallNate Java-WebSocket\\r\\nUpgrade: websocket\\r\\nConnection: upgrade\\r\\nSec-WebSocket-Accept: xwLDQrb5kzxpZDdeTcUd+7diXXU=\\r\\n// 这里Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15\\r\\nDate: Sun, 09 Oct 2016 23:07:39 GMT\\r\\n
根据请求中的 Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
来协商是否对传输数据进行deflate压缩。
重点响应头字段 Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15
。
// 客户端
private static final Draft perMessageDeflateDraft = new Draft_6455(new PerMessageDeflateExtension());
private static class DeflateClient extends WebSocketClient { public DeflateClient() throws URISyntaxException {super(new URI("ws://localhost:" + PORT), perMessageDeflateDraft);
}
// 服务端
private static class DeflateServer extends WebSocketServer {public DeflateServer() { super(new InetSocketAddress(PORT), Collections.singletonList(perMessageDeflateDraft));
}
Netty类似如下
.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(65536));if (config.isUseCompressionHandler()) {// 这里使用netty内置的WebSocketServerCompressionHandlerpipeline.addLast(new WebSocketServerCompressionHandler());}pipeline.addLast(new HttpServerHandler(xx, config));}});
堆外内存优化
当发送量非常大 (如:100000/s),可以使用堆外内存(零拷贝提升性能)来群发进行优化
// sessions 为 List<Session>String str = "Hello Netty!";byte[] bytes = str.getBytes();ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(bytes.length).writeBytes(bytes);try {for (Session session : sessions) {if (session.isWritable()) {session.sendText(buf.retainedDuplicate());}}} catch (Exception e) {e.printStackTrace();} finally {ReferenceCountUtil.release(buf);}
限制连接数
如果需要严格限制的话,可以直接使用AtomicInteger类对连接数进行统计,当超过限制时关闭连接
IP黑白名单过滤
在握手时只做一次即可,跟http server一个逻辑。
认证授权过滤
在握手时只做一次即可,跟http server鉴权一个逻辑。
框架内部日志
将此添加到 logback.xml
<logger name="org.java_websocket" level="TRACE"> <appender-ref ref="CONSOLE"/>
</logger>