> 文章列表 > Socket 心跳管理器

Socket 心跳管理器

Socket 心跳管理器

1. 添加配置参数,IOSocketOptions.java

  1.1 心跳频率

  1.2 心跳最大的丢失次数

/* @Description: Socket 配置参数*/
public class IOSocketOptions {/* 是否为调试模式,默认为 true*/private static boolean isDebug = true;/* Socket 主机地址*/private SocketAddress socketAddress;/* Socket 备用主机地址*/private SocketAddress backupAddress;/* 连接超时时间(单位毫秒)*/private int connectTimeout;/* Socket 工厂*/private SocketFactory socketFactory;/* Socket 安全套接字协议相关配置*/private SocketSSLConfig socketSSLConfig;/* 是否重连 Socket,默认为 true*/private boolean isReConnection;/* Socket 重连管理器*/private AbsReConnection reConnectionManager;/* 写入 Socket 字节时的字节序*/private ByteOrder writeOrder;/* 写数据时单个数据包的最大值 默认 100*/private int maxWriteBytes;/* 读取 Socket 字节时的字节序*/private ByteOrder readOrder;/* 读取数据时,单次读取最大缓存值,默认 50,数值越大效率越高,但是系统消耗也越大*/private int maxReadBytes;/* 从 Socket 读取数据时,遵从的数据包结构协议,在业务层进行定义*/private IMessageProtocol messageProtocol;/* 服务器返回数据的最大值 (单位Mb) ,防止客服端内存溢出*/private int maxResponseDataMb;/* 是否开启请求超时检测*/private boolean isOpenRequestTimeOut;/* 请求超时时间,单位毫秒,默认 十秒*/private long requestTimeOut;/* 实现回调功能需要 callBackID,而 callBackID 是保存在发送消息和应答消息中的,此工厂用来获取 Socket 消息中* 保存 callBackID 值的 key,比如 JSON 格式中的 key-value 中的 key*/private CallBackIDFactory callBackIDFactory;/* 心跳频率/毫秒,默认 五秒*/private long heartBeatFreq;/* 心跳最大的丢失次数,大于这个数时,将断开 Socket 连接,默认 五次*/private long maxHeartBeatLoseTimes;/* IO 字节流的编码方式,默认 UTF-8*/private Charset charsetName;/* 静态内部类*/public static class Builder {IOSocketOptions socketOptions;// 首先获取一个默认的配置public Builder() {this(getDefaultOptions());}public Builder(IOSocketOptions defaultOptions) {socketOptions = defaultOptions;}/* 设置 Socket 主机地址 @param socketAddress* @return*/public Builder setSocketAddress(SocketAddress socketAddress) {socketOptions.socketAddress = socketAddress;return this;}/* 设置 Socket 备用主机地址 @param backupAddress* @return*/public Builder setBackupAddress(SocketAddress backupAddress) {socketOptions.backupAddress = backupAddress;return this;}/* 设置连接超时时间(单位毫秒) @param connectTimeout* @return*/public Builder setConnectTimeout(int connectTimeout) {socketOptions.connectTimeout = connectTimeout;return this;}/* 自定义创建 socket 工厂 @param socketFactory*/public Builder setSocketFactory(SocketFactory socketFactory) {socketOptions.socketFactory = socketFactory;return this;}/* 安全套接字协议的配置 @param socketSSLConfig* @return*/public Builder setSocketSSLConfig(SocketSSLConfig socketSSLConfig) {socketOptions.socketSSLConfig = socketSSLConfig;return this;}/* 是否重连 Socket @param reConnection* @return*/public Builder setReConnection(boolean reConnection) {socketOptions.isReConnection = reConnection;return this;}/* 设置 Socket 重连管理器 @param reConnectionManager* @return*/public Builder setReConnectionManager(AbsReConnection reConnectionManager) {socketOptions.reConnectionManager = reConnectionManager;return this;}/* 设置 Socket 写字节时的字节序 @param writeOrder*/public Builder setWriteOrder(ByteOrder writeOrder) {socketOptions.writeOrder = writeOrder;return this;}/* 设置写数据时,单个数据包的最大值 @param maxWriteBytes* @return*/public Builder setMaxWriteBytes(int maxWriteBytes) {socketOptions.maxWriteBytes = maxWriteBytes;return this;}/* 设置 Socket 读取字节时的字节序 @param readOrder* @return*/public Builder setReadOrder(ByteOrder readOrder) {socketOptions.readOrder = readOrder;return this;}/* 设置读取数据时,单次读取最大缓存值 @param maxReadBytes* @return*/public Builder setMaxReadBytes(int maxReadBytes) {socketOptions.maxReadBytes = maxReadBytes;return this;}/* 设置读取数据的数据结构协议 @param messageProtocol*/public Builder setMessageProtocol(IMessageProtocol messageProtocol) {socketOptions.messageProtocol = messageProtocol;return this;}/* 设置服务器返回数据的允许的最大值,单位兆/Mb @param maxResponseDataMb* @return*/public Builder setMaxResponseDataMb(int maxResponseDataMb) {socketOptions.maxResponseDataMb = maxResponseDataMb;return this;}/* 设置是否开启请求超时的检测 @param openRequestTimeOut*/public Builder setOpenRequestTimeOut(boolean openRequestTimeOut) {socketOptions.isOpenRequestTimeOut = openRequestTimeOut;return this;}/* 设置请求超时时间 @param requestTimeOut 毫秒*/public Builder setRequestTimeOut(long requestTimeOut) {socketOptions.requestTimeOut = requestTimeOut;return this;}/* 设置请求 ack 回调 callBackID 功能的工厂 @param callBackIDFactory* @return*/public Builder setCallBackIDFactory(CallBackIDFactory callBackIDFactory) {socketOptions.callBackIDFactory = callBackIDFactory;return this;}/* 设置心跳发送频率,单位毫秒 @param heartBeatFreq 毫秒* @return*/public Builder setHeartBeatFreq(long heartBeatFreq) {socketOptions.heartBeatFreq = heartBeatFreq;return this;}/* 设置心跳丢失的最大允许数,如果超过这个最大数时,就断开 Socket 连接 @param maxHeartBeatLoseTimes* @return*/public Builder setMaxHeartBeatLoseTimes(long maxHeartBeatLoseTimes) {socketOptions.maxHeartBeatLoseTimes = maxHeartBeatLoseTimes;return this;}/* 设置 IO 字节流的编码方式,默认 UTF-8 @param charsetName* @return*/public Builder setCharsetName(Charset charsetName) {socketOptions.charsetName = charsetName;return this;}public IOSocketOptions build() {return socketOptions;}}/* 获取默认的配置*/public static IOSocketOptions getDefaultOptions() {IOSocketOptions options = new IOSocketOptions();options.socketAddress = null;options.backupAddress = null;options.connectTimeout = 10 * 1000;// 连接超时默认 5 秒options.socketFactory = null;options.socketSSLConfig = null;options.isReConnection = true;// 是否重连主机,默认为 trueoptions.reConnectionManager = new DefaultReConnection(); // 默认 Socket 重连器options.writeOrder = ByteOrder.BIG_ENDIAN;// 大端序options.maxWriteBytes = 100;options.readOrder = ByteOrder.BIG_ENDIAN;// 大端序options.maxReadBytes = 50;options.messageProtocol = null;// 默认数据包结构协议为 nulloptions.maxResponseDataMb = 5;// 默认接收最大值 5MBoptions.isOpenRequestTimeOut = true;// 是否开启请求超时检测,默认 true 开启options.requestTimeOut = 10 * 1000;// 请求超时时间,默认10秒options.callBackIDFactory = null;// 实现回调功能需要的 callBackID 工厂options.heartBeatFreq = 5 * 1000;// 心跳频率,默认 5 秒options.maxHeartBeatLoseTimes = 5;// 心跳丢失的最大次数,大于这个数,将断开 Socket 连接options.charsetName = StandardCharsets.UTF_8;return options;}public boolean isIsDebug() {return isDebug;}public SocketAddress getSocketAddress() {return socketAddress;}public SocketAddress getBackupAddress() {return backupAddress;}public int getConnectTimeout() {return connectTimeout;}public SocketFactory getSocketFactory() {return socketFactory;}public SocketSSLConfig getSocketSSLConfig() {return socketSSLConfig;}public boolean isReConnection() {return isReConnection;}public AbsReConnection getReConnectionManager() {return reConnectionManager;}public ByteOrder getWriteOrder() {return writeOrder;}public int getMaxWriteBytes() {return maxWriteBytes;}public ByteOrder getReadOrder() {return readOrder;}public int getMaxReadBytes() {return maxReadBytes;}public IMessageProtocol getMessageProtocol() {return messageProtocol;}public int getMaxResponseDataMb() {return maxResponseDataMb;}public boolean isOpenRequestTimeOut() {return isOpenRequestTimeOut;}public long getRequestTimeOut() {return requestTimeOut;}public CallBackIDFactory getCallBackIDFactory() {return callBackIDFactory;}public long getHeartBeatFreq() {return heartBeatFreq;}public long getMaxHeartBeatLoseTimes() {return maxHeartBeatLoseTimes;}public Charset getCharsetName() {return charsetName;}public static void setIsDebug(boolean isDebug) {IOSocketOptions.isDebug = isDebug;}
}

2. 创建心跳数据接收监听类,HeartBeatListener.java

/* @Description: 心跳数据接收监听*/
public interface HeartBeatListener {/* 是否为服务器心跳 @param originReadData* @return*/boolean isServerHeartBeat(OriginReadData originReadData);
}

3. 创建心跳数据检测接口,IHeartBeatManager.java

/* @Description: 心跳数据检测接口*/
public interface IHeartBeatManager extends IOptions {/* 开始发送心跳 @param clientHeartBeat   心跳数据* @param heartBeatListener 心跳数据接收监听*/void startHeartBeat(byte[] clientHeartBeat, HeartBeatListener heartBeatListener);/* 接收到心跳数据*/void onReceiveHeartBeat();/* 停止发送心跳*/void stopHeartBeat();
}

4. 实现心跳数据检测接口,创建心跳管理器,HeartBeatManager.java

/* @Description: 心跳管理器*/
public class HeartBeatManager extends SocketActionListener implements IHeartBeatManager {/* Socket 连接器*/private IConnectionManager connectionManager;/* Socket 配置参数*/private IOSocketOptions socketOptions;/* 客户端心跳包*/private byte[] clientHeartBeat;/* 心跳包发送线程*/private ScheduledExecutorService heartBeatExecutor;/* 记录心跳的失联次数*/private AtomicInteger loseTimes = new AtomicInteger(-1);/* 心跳频率*/private long freq;/* 是否激活了心跳*/private boolean isActivate;/* 心跳包接收监听*/private HeartBeatListener heartBeatListener;public HeartBeatManager(IConnectionManager connectionManager) {this.connectionManager = connectionManager;socketOptions = connectionManager.getOptions();//添加监听 Socket 的行为this.connectionManager.subscribeSocketAction(this);}@Overridepublic Object setOptions(IOSocketOptions socketOptions) {this.socketOptions = socketOptions;freq = socketOptions.getHeartBeatFreq();// 心跳频率不能小于一秒freq = freq < 1000 ? 1000 : freq;return this;}@Overridepublic IOSocketOptions getOptions() {return socketOptions;}@Overridepublic void startHeartBeat(byte[] clientHeartBeat, HeartBeatListener heartBeatListener) {this.clientHeartBeat = clientHeartBeat;this.heartBeatListener = heartBeatListener;isActivate = true;startThread();}/* 启动心跳线程*/private void startThread() {// 心跳频率freq = socketOptions.getHeartBeatFreq();// 启动线程发送心跳if (heartBeatExecutor == null || heartBeatExecutor.isShutdown()) {heartBeatExecutor = Executors.newSingleThreadScheduledExecutor();heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, 0, freq, TimeUnit.MILLISECONDS);}}/* 心跳发送任务*/private Runnable heartBeatTask = new Runnable() {@Overridepublic void run() {// 心跳丢失次数判断,心跳包丢失了一定的次数,则会进行 Socket 的断开重连if (socketOptions.getMaxHeartBeatLoseTimes() != -1 && loseTimes.incrementAndGet() >= socketOptions.getMaxHeartBeatLoseTimes()) {connectionManager.disConnect(socketOptions.isReConnection());// 重置失联次数resetLoseTimes();} else {// 发送心跳包connectionManager.upBytes(clientHeartBeat);}}};@Overridepublic void onReceiveHeartBeat() {// 重置失联次数resetLoseTimes();}@Overridepublic void stopHeartBeat() {isActivate = false;closeThread();}/* 关闭发送心跳线程*/private void closeThread() {if (heartBeatExecutor != null && !heartBeatExecutor.isShutdown()) {heartBeatExecutor.shutdownNow();heartBeatExecutor = null;// 重置失联次数resetLoseTimes();//LogUtil.i("heartBeatExecutor closeThread");}}/* 重置心跳的失联次数*/private void resetLoseTimes() {loseTimes.set(-1);}@Overridepublic void onSocketConnectSuccess(SocketAddress socketAddress) {if (isActivate) {startThread();}}@Overridepublic void onSocketConnectFail(SocketAddress socketAddress, boolean isNeedReconnect) {// 如果不需要重连,则关闭心跳频率线程if (!isNeedReconnect) {closeThread();}}@Overridepublic void onSocketDisconnect(SocketAddress socketAddress, boolean isNeedReconnect) {// 如果不需要重连,则关闭心跳检测if (!isNeedReconnect) {closeThread();}}@Overridepublic void onSocketResponse(SocketAddress socketAddress, OriginReadData originReadData) {if (heartBeatListener != null && heartBeatListener.isServerHeartBeat(originReadData)) {//收到服务器心跳数据onReceiveHeartBeat();}}
}

5. 修改 Socket 连接管理器

  5.1 创建发送数据接口

/* @Description: 发送数据接口*/
public interface ISend {/* 发送一个有回调的消息 @param superCallBackSender 消息结构* @return*/IConnectionManager upCallBackMessage(SuperCallBackSender superCallBackSender);/* 发送 byte 数组 @param bytes byte 类型消息* @return*/IConnectionManager upBytes(byte[] bytes);
}

  5.2 修改连接管理器接口规范,IConnectionManager.java

/* @Description: 连接管理器的接口规范*/
public interface IConnectionManager extends ISubscribeSocketAction, IOptions<IConnectionManager>, ISend {/* 开始连接*/void connect();/* 关闭连接 @param isNeedReconnect 是否需要重连*/void disConnect(boolean isNeedReconnect);/* 获取连接状态 @return*/int getConnectionStatus();/* 是否可连接的 @return*/boolean isConnectViable();/* 切换 host @param socketAddress*/void switchHost(SocketAddress socketAddress);/* 获取输入流 @return*/InputStream getInputStream();/* 获取输出流 @return*/OutputStream getOutputStream();/* 获取心跳管理器 @return*/IHeartBeatManager getHeartBeatManager();
}

  5.3 在Socket 连接超类中添加心跳管理器,SuperConnection.java

/* @Description: Socket 连接的超类*/
public abstract class SuperConnection implements IConnectionManager {/* 连接的状态,初始值为断开连接*/protected final AtomicInteger connectionStatus = new AtomicInteger(SocketStatus.SOCKET_DISCONNECTED);/* Socket 配置参数*/protected IOSocketOptions socketOptions;/* 连接线程*/private ExecutorService connectionExecutor;/* Socket 主机地址*/protected SocketAddress socketAddress;/* Socket 行为分发器*/private ISocketActionDispatch actionDispatcher;/* Socket 重连管理器*/private AbsReConnection reConnection;/* Socket 读写数据管理器*/private IIOManager ioManager;/* Socket 回调消息分发器*/private CallBackResponseDispatcher callBackResponseDispatcher;/* Socket 心跳管理器*/private IHeartBeatManager heartBeatManager;public SuperConnection(SocketAddress socketAddress) {this.socketAddress = socketAddress;this.actionDispatcher = new SocketActionDispatcher(this, socketAddress);}@Overridepublic void subscribeSocketAction(ISocketActionListener iSocketActionListener) {// 订阅 Socket 行为监听this.actionDispatcher.subscribe(iSocketActionListener);}@Overridepublic void unSubscribeSocketAction(ISocketActionListener iSocketActionListener) {// 解除 Socket 行为监听this.actionDispatcher.unSubscribe(iSocketActionListener);}@Overridepublic IConnectionManager setOptions(IOSocketOptions socketOptions) {if (socketOptions == null) return this;this.socketOptions = socketOptions;// 是否更改重连管理器if (reConnection != null && !reConnection.equals(socketOptions.getReConnectionManager())) {// 停止分离重连管理器reConnection.detach();reConnection = socketOptions.getReConnectionManager();// 关联连接器reConnection.attach(this);}// 设置读写数据管理器的配置参数if (ioManager != null) ioManager.setOptions(socketOptions);// 设置回调消息分发器的配置参数if (callBackResponseDispatcher != null)callBackResponseDispatcher.setOptions(socketOptions);// 设置心跳包管理器的配置参数if (heartBeatManager != null) heartBeatManager.setOptions(socketOptions);return this;}@Overridepublic IOSocketOptions getOptions() {return socketOptions;}/* 连接成功*/protected void onConnectionOpened() {LogUtil.i("Socket 连接成功: " + socketAddress.toString());// Socket 行为分发: 连接成功actionDispatcher.dispatchAction(SocketAction.ACTION_CONNECT_SUCCESS);// Socket 连接状态: 成功connectionStatus.set(SocketStatus.SOCKET_CONNECTED);// 打开 Socket 处理数据的管理器openSocketManager();}/* 打开 Socket 处理数据的管理器*/private void openSocketManager() {if (callBackResponseDispatcher == null) {// 创建回调消息分发器callBackResponseDispatcher = new CallBackResponseDispatcher(this);}if (ioManager == null) {// 创建 io 管理器ioManager = new IOManager(this, actionDispatcher);}// 启动回调消息分发器线程callBackResponseDispatcher.engineThread();// 启动 IO 管理器线程ioManager.startIO();}/* 连接 Socket 任务*/private Runnable connectTask = () -> {try {openConnection();} catch (Exception e) {LogUtil.e("Socket 连接失败: " + socketAddress.toString());e.printStackTrace();// Socket 连接状态: 已断开连接connectionStatus.set(SocketStatus.SOCKET_DISCONNECTED);// Socket 行为分发: 1.连接失败 2.是否重连,默认为 trueactionDispatcher.dispatchAction(SocketAction.ACTION_CONNECT_FAIL, socketOptions.isReConnection());}};@Overridepublic synchronized void connect() {LogUtil.i("Socket 开始连接: " + socketAddress.toString());if (socketAddress.getIp() == null) {throw new NotNullException("请检查是否设置了IP地址.");}// Socket 状态: 正在连接connectionStatus.set(SocketStatus.SOCKET_CONNECTING);// 重连管理器if (reConnection != null) {reConnection.detach(); // 停止分离重连管理器}reConnection = socketOptions.getReConnectionManager();if (reConnection != null) {reConnection.attach(this);//关联连接器}// 开始 Socket 行为分发线程if (actionDispatcher != null) {actionDispatcher.startDispatchThread();}// 心跳管理器if (heartBeatManager == null) {heartBeatManager = new HeartBeatManager(this);}// 开启线程,进行连接if (connectionExecutor == null || connectionExecutor.isShutdown()) {// 核心线程数为 0,非核心线程数可以有 Integer.MAX_VALUE 个,存活时间为 60 秒,适合于在不断的进行连接情况下,避免重复创建和销毁线程connectionExecutor = Executors.newCachedThreadPool();//LogUtil.i("Executors newCachedThreadPool");}// 执行连接任务connectionExecutor.execute(connectTask);}@Overridepublic synchronized void disConnect(boolean isNeedReconnect) {// 只有在已经连接的状态下才能断开连接if (connectionStatus.get() != SocketStatus.SOCKET_CONNECTED) {return;}// 正在重连中, 则不断开 Socketif (isNeedReconnect && reConnection.isReConnecting()) {return;}// Socket 连接状态: 正在断开连接connectionStatus.set(SocketStatus.SOCKET_DISCONNECTING);// 开启断开连接线程String info = socketAddress.toString();Thread disConnectThread = new DisConnectThread("DisConnection thread: " + info, isNeedReconnect);// setDaemon: true 既设置该线程为守护线程,表示该线程是不重要的,进程退出时,不需要等待这个线程执行完成,目的,避免子线程无限死循环,导致退不出程序disConnectThread.setDaemon(true);disConnectThread.start();}/* 断开连接线程*/private class DisConnectThread extends Thread {// 当前连接断开后,是否需要自动重连boolean isNeedReconnect;public DisConnectThread(@NonNull String name, boolean isNeedReconnect) {super(name);this.isNeedReconnect = isNeedReconnect;}@Overridepublic void run() {try {// 关闭 io 管理器中线程if (ioManager != null) ioManager.closeIO();// 关闭回调分发器线程if (callBackResponseDispatcher != null) callBackResponseDispatcher.shutdownThread();// 关闭连接线程if (connectionExecutor != null && !connectionExecutor.isShutdown()) {connectionExecutor.shutdown();connectionExecutor = null;}closeConnection();// getName()LogUtil.i("关闭Socket连接: " + socketAddress.toString());// Socket 连接状态: 已断开连接connectionStatus.set(SocketStatus.SOCKET_DISCONNECTED);// Socket 行为分发: 1.断开连接 2.是否重连actionDispatcher.dispatchAction(SocketAction.ACTION_DISCONNECTION, isNeedReconnect);} catch (Exception e) {// 断开连接时,发生异常e.printStackTrace();LogUtil.e(e.getMessage());}}}@Overridepublic synchronized void switchHost(SocketAddress socketAddress) {if (socketAddress != null) {SocketAddress oldAddress = this.socketAddress;this.socketAddress = socketAddress;//Socket 分发行为更改主机地址if (actionDispatcher != null) {actionDispatcher.setSocketAddress(socketAddress);}}}@Overridepublic int getConnectionStatus() {return connectionStatus.get();}@Overridepublic boolean isConnectViable() {// 当前 socket 是否处于可连接的状态 Util.isNetConnected() &&//TODOreturn connectionStatus.get() == SocketStatus.SOCKET_DISCONNECTED;}/* 打开连接 @throws Exception*/protected abstract void openConnection() throws Exception;/* 关闭连接 @throws IOException*/abstract void closeConnection() throws Exception;@Overridepublic synchronized IConnectionManager upCallBackMessage(SuperCallBackSender superCallBackSender) {// 检测发送数据消息体中 callBackID 是不是一样callBackResponseDispatcher.checkCallBackSender(superCallBackSender);//根据自己的协议打包消息,发送数据sendBytes(superCallBackSender.pack());return this;}@Overridepublic synchronized IConnectionManager upBytes(byte[] bytes) {//发送数据sendBytes(bytes);return this;}/* 发送 byte 数组数据 @param bytes* @return*/private IConnectionManager sendBytes(byte[] bytes) {// 判断 Socket 是否已连接if (ioManager == null || connectionStatus.get() != SocketStatus.SOCKET_CONNECTED) {return this;}ioManager.sendBytes(bytes);return this;}@Overridepublic IHeartBeatManager getHeartBeatManager() {return heartBeatManager;}
}

6. 发送数据定义

  6.1 创建发送数据接口类,IMessage.java

/* @Description: 发送数据接口*/
public interface IMessage {/* 根据自己的协议打包消息 @return*/byte[] pack();
}

  6.2 创建发送消息基类,AbsMessage.java

/* @Description: 发送消息基类*/
public class AbsMessage implements IMessage {@Overridepublic byte[] pack() {byte[] body = new Gson().toJson(this).getBytes();ByteBuffer byteBuffer = ByteBuffer.allocate(body.length + 4);byteBuffer.order(ByteOrder.BIG_ENDIAN);byteBuffer.putInt(body.length);byteBuffer.put(body);return byteBuffer.array();}
}

  6.3 创建心跳数据包示例,ClientHeartBeat.java

/* @Description: 心跳数据包示例*/
public class ClientHeartBeat extends AbsMessage {private String msgId;private String from;public String getMsgId() {return msgId;}public void setMsgId(String msgId) {this.msgId = msgId;}public String getFrom() {return from;}public void setFrom(String from) {this.from = from;}@Overridepublic String toString() {return "ClientHeartBeat{" + "msgId='" + msgId + '\\'' + ", from='" + from + '\\'' + '}';}
}

7. 测试方法

    //测试方法public void initViewHeartBeat(View view) {SocketAddress socketAddress = new SocketAddress("192.168.1.52", 6688);IOSocketOptions socketOptions = new IOSocketOptions.Builder().setSocketAddress(socketAddress).setMessageProtocol(new DefaultMessageProtocol()).build();TcpConnection connection = new TcpConnection(socketAddress);connection.setOptions(socketOptions);view.findViewById(R.id.but_connect).setOnClickListener(v -> connection.connect());//心跳示例ClientHeartBeat heartBeat = new ClientHeartBeat();heartBeat.setMsgId("heart_beat");heartBeat.setFrom("client");view.findViewById(R.id.but_add).setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View v) {connection.getHeartBeatManager().startHeartBeat(heartBeat.pack(), new HeartBeatListener() {@Overridepublic boolean isServerHeartBeat(OriginReadData originReadData) {try {String s = originReadData.getBodyString();JSONObject jsonObject = new JSONObject(s);if ("heart_beat".equals(jsonObject.getString("msgId"))) {LogUtil.i("收到服务器心跳");return true;}} catch (JSONException e) {e.printStackTrace();}return false;}});}});view.findViewById(R.id.but_stop).setOnClickListener(v -> connection.getHeartBeatManager().stopHeartBeat());}

大连人才网