> 文章列表 > Socket 心跳管理器

Socket 心跳管理器

Socket 心跳管理器

1. 添加配置参数,IOSocketOptions.java

  1.1 心跳频率

  1.2 心跳最大的丢失次数

/*** @Description: Socket 配置参数*/
public class IOSocketOptions {/*** 是否为调试模式,默认为 true*/private static boolean isDebug = true;/*** Socket 主机地址*/private SocketAddress socketAddress;/*** Socket 备用主机地址*/private SocketAddress backupAddress;/*** 连接超时时间(单位毫秒)*/private int connectTimeout;/*** Socket 工厂*/private SocketFactory socketFactory;/*** Socket 安全套接字协议相关配置*/private SocketSSLConfig socketSSLConfig;/*** 是否重连 Socket,默认为 true*/private boolean isReConnection;/*** Socket 重连管理器*/private AbsReConnection reConnectionManager;/*** 写入 Socket 字节时的字节序*/private ByteOrder writeOrder;/*** 写数据时单个数据包的最大值 默认 100*/private int maxWriteBytes;/*** 读取 Socket 字节时的字节序*/private ByteOrder readOrder;/*** 读取数据时,单次读取最大缓存值,默认 50,数值越大效率越高,但是系统消耗也越大*/private int maxReadBytes;/*** 从 Socket 读取数据时,遵从的数据包结构协议,在业务层进行定义*/private IMessageProtocol messageProtocol;/*** 服务器返回数据的最大值 (单位Mb) ,防止客服端内存溢出*/private int maxResponseDataMb;/*** 是否开启请求超时检测*/private boolean isOpenRequestTimeOut;/*** 请求超时时间,单位毫秒,默认 十秒*/private long requestTimeOut;/*** 实现回调功能需要 callBackID,而 callBackID 是保存在发送消息和应答消息中的,此工厂用来获取 Socket 消息中* 保存 callBackID 值的 key,比如 JSON 格式中的 key-value 中的 key*/private CallBackIDFactory callBackIDFactory;/*** 心跳频率/毫秒,默认 五秒*/private long heartBeatFreq;/*** 心跳最大的丢失次数,大于这个数时,将断开 Socket 连接,默认 五次*/private long maxHeartBeatLoseTimes;/*** IO 字节流的编码方式,默认 UTF-8*/private Charset charsetName;/*** 静态内部类*/public static class Builder {IOSocketOptions socketOptions;// 首先获取一个默认的配置public Builder() {this(getDefaultOptions());}public Builder(IOSocketOptions defaultOptions) {socketOptions = defaultOptions;}/*** 设置 Socket 主机地址** @param socketAddress* @return*/public Builder setSocketAddress(SocketAddress socketAddress) {socketOptions.socketAddress = socketAddress;return this;}/*** 设置 Socket 备用主机地址** @param backupAddress* @return*/public Builder setBackupAddress(SocketAddress backupAddress) {socketOptions.backupAddress = backupAddress;return this;}/*** 设置连接超时时间(单位毫秒)** @param connectTimeout* @return*/public Builder setConnectTimeout(int connectTimeout) {socketOptions.connectTimeout = connectTimeout;return this;}/*** 自定义创建 socket 工厂** @param socketFactory*/public Builder setSocketFactory(SocketFactory socketFactory) {socketOptions.socketFactory = socketFactory;return this;}/*** 安全套接字协议的配置** @param socketSSLConfig* @return*/public Builder setSocketSSLConfig(SocketSSLConfig socketSSLConfig) {socketOptions.socketSSLConfig = socketSSLConfig;return this;}/*** 是否重连 Socket** @param reConnection* @return*/public Builder setReConnection(boolean reConnection) {socketOptions.isReConnection = reConnection;return this;}/*** 设置 Socket 重连管理器** @param reConnectionManager* @return*/public Builder setReConnectionManager(AbsReConnection reConnectionManager) {socketOptions.reConnectionManager = reConnectionManager;return this;}/*** 设置 Socket 写字节时的字节序** @param writeOrder*/public Builder setWriteOrder(ByteOrder writeOrder) {socketOptions.writeOrder = writeOrder;return this;}/*** 设置写数据时,单个数据包的最大值** @param maxWriteBytes* @return*/public Builder setMaxWriteBytes(int maxWriteBytes) {socketOptions.maxWriteBytes = maxWriteBytes;return this;}/*** 设置 Socket 读取字节时的字节序** @param readOrder* @return*/public Builder setReadOrder(ByteOrder readOrder) {socketOptions.readOrder = readOrder;return this;}/*** 设置读取数据时,单次读取最大缓存值** @param maxReadBytes* @return*/public Builder setMaxReadBytes(int maxReadBytes) {socketOptions.maxReadBytes = maxReadBytes;return this;}/*** 设置读取数据的数据结构协议** @param messageProtocol*/public Builder setMessageProtocol(IMessageProtocol messageProtocol) {socketOptions.messageProtocol = messageProtocol;return this;}/*** 设置服务器返回数据的允许的最大值,单位兆/Mb** @param maxResponseDataMb* @return*/public Builder setMaxResponseDataMb(int maxResponseDataMb) {socketOptions.maxResponseDataMb = maxResponseDataMb;return this;}/*** 设置是否开启请求超时的检测** @param openRequestTimeOut*/public Builder setOpenRequestTimeOut(boolean openRequestTimeOut) {socketOptions.isOpenRequestTimeOut = openRequestTimeOut;return this;}/*** 设置请求超时时间** @param requestTimeOut 毫秒*/public Builder setRequestTimeOut(long requestTimeOut) {socketOptions.requestTimeOut = requestTimeOut;return this;}/*** 设置请求 ack 回调 callBackID 功能的工厂** @param callBackIDFactory* @return*/public Builder setCallBackIDFactory(CallBackIDFactory callBackIDFactory) {socketOptions.callBackIDFactory = callBackIDFactory;return this;}/*** 设置心跳发送频率,单位毫秒** @param heartBeatFreq 毫秒* @return*/public Builder setHeartBeatFreq(long heartBeatFreq) {socketOptions.heartBeatFreq = heartBeatFreq;return this;}/*** 设置心跳丢失的最大允许数,如果超过这个最大数时,就断开 Socket 连接** @param maxHeartBeatLoseTimes* @return*/public Builder setMaxHeartBeatLoseTimes(long maxHeartBeatLoseTimes) {socketOptions.maxHeartBeatLoseTimes = maxHeartBeatLoseTimes;return this;}/*** 设置 IO 字节流的编码方式,默认 UTF-8** @param charsetName* @return*/public Builder setCharsetName(Charset charsetName) {socketOptions.charsetName = charsetName;return this;}public IOSocketOptions build() {return socketOptions;}}/*** 获取默认的配置*/public static IOSocketOptions getDefaultOptions() {IOSocketOptions options = new IOSocketOptions();options.socketAddress = null;options.backupAddress = null;options.connectTimeout = 10 * 1000;// 连接超时默认 5 秒options.socketFactory = null;options.socketSSLConfig = null;options.isReConnection = true;// 是否重连主机,默认为 trueoptions.reConnectionManager = new DefaultReConnection(); // 默认 Socket 重连器options.writeOrder = ByteOrder.BIG_ENDIAN;// 大端序options.maxWriteBytes = 100;options.readOrder = ByteOrder.BIG_ENDIAN;// 大端序options.maxReadBytes = 50;options.messageProtocol = null;// 默认数据包结构协议为 nulloptions.maxResponseDataMb = 5;// 默认接收最大值 5MBoptions.isOpenRequestTimeOut = true;// 是否开启请求超时检测,默认 true 开启options.requestTimeOut = 10 * 1000;// 请求超时时间,默认10秒options.callBackIDFactory = null;// 实现回调功能需要的 callBackID 工厂options.heartBeatFreq = 5 * 1000;// 心跳频率,默认 5 秒options.maxHeartBeatLoseTimes = 5;// 心跳丢失的最大次数,大于这个数,将断开 Socket 连接options.charsetName = StandardCharsets.UTF_8;return options;}public boolean isIsDebug() {return isDebug;}public SocketAddress getSocketAddress() {return socketAddress;}public SocketAddress getBackupAddress() {return backupAddress;}public int getConnectTimeout() {return connectTimeout;}public SocketFactory getSocketFactory() {return socketFactory;}public SocketSSLConfig getSocketSSLConfig() {return socketSSLConfig;}public boolean isReConnection() {return isReConnection;}public AbsReConnection getReConnectionManager() {return reConnectionManager;}public ByteOrder getWriteOrder() {return writeOrder;}public int getMaxWriteBytes() {return maxWriteBytes;}public ByteOrder getReadOrder() {return readOrder;}public int getMaxReadBytes() {return maxReadBytes;}public IMessageProtocol getMessageProtocol() {return messageProtocol;}public int getMaxResponseDataMb() {return maxResponseDataMb;}public boolean isOpenRequestTimeOut() {return isOpenRequestTimeOut;}public long getRequestTimeOut() {return requestTimeOut;}public CallBackIDFactory getCallBackIDFactory() {return callBackIDFactory;}public long getHeartBeatFreq() {return heartBeatFreq;}public long getMaxHeartBeatLoseTimes() {return maxHeartBeatLoseTimes;}public Charset getCharsetName() {return charsetName;}public static void setIsDebug(boolean isDebug) {IOSocketOptions.isDebug = isDebug;}
}

2. 创建心跳数据接收监听类,HeartBeatListener.java

/*** @Description: 心跳数据接收监听*/
public interface HeartBeatListener {/*** 是否为服务器心跳** @param originReadData* @return*/boolean isServerHeartBeat(OriginReadData originReadData);
}

3. 创建心跳数据检测接口,IHeartBeatManager.java

/*** @Description: 心跳数据检测接口*/
public interface IHeartBeatManager extends IOptions {/*** 开始发送心跳** @param clientHeartBeat   心跳数据* @param heartBeatListener 心跳数据接收监听*/void startHeartBeat(byte[] clientHeartBeat, HeartBeatListener heartBeatListener);/*** 接收到心跳数据*/void onReceiveHeartBeat();/*** 停止发送心跳*/void stopHeartBeat();
}

4. 实现心跳数据检测接口,创建心跳管理器,HeartBeatManager.java

/*** @Description: 心跳管理器*/
public class HeartBeatManager extends SocketActionListener implements IHeartBeatManager {/*** Socket 连接器*/private IConnectionManager connectionManager;/*** Socket 配置参数*/private IOSocketOptions socketOptions;/*** 客户端心跳包*/private byte[] clientHeartBeat;/*** 心跳包发送线程*/private ScheduledExecutorService heartBeatExecutor;/*** 记录心跳的失联次数*/private AtomicInteger loseTimes = new AtomicInteger(-1);/*** 心跳频率*/private long freq;/*** 是否激活了心跳*/private boolean isActivate;/*** 心跳包接收监听*/private HeartBeatListener heartBeatListener;public HeartBeatManager(IConnectionManager connectionManager) {this.connectionManager = connectionManager;socketOptions = connectionManager.getOptions();//添加监听 Socket 的行为this.connectionManager.subscribeSocketAction(this);}@Overridepublic Object setOptions(IOSocketOptions socketOptions) {this.socketOptions = socketOptions;freq = socketOptions.getHeartBeatFreq();// 心跳频率不能小于一秒freq = freq < 1000 ? 1000 : freq;return this;}@Overridepublic IOSocketOptions getOptions() {return socketOptions;}@Overridepublic void startHeartBeat(byte[] clientHeartBeat, HeartBeatListener heartBeatListener) {this.clientHeartBeat = clientHeartBeat;this.heartBeatListener = heartBeatListener;isActivate = true;startThread();}/*** 启动心跳线程*/private void startThread() {// 心跳频率freq = socketOptions.getHeartBeatFreq();// 启动线程发送心跳if (heartBeatExecutor == null || heartBeatExecutor.isShutdown()) {heartBeatExecutor = Executors.newSingleThreadScheduledExecutor();heartBeatExecutor.scheduleWithFixedDelay(heartBeatTask, 0, freq, TimeUnit.MILLISECONDS);}}/*** 心跳发送任务*/private Runnable heartBeatTask = new Runnable() {@Overridepublic void run() {// 心跳丢失次数判断,心跳包丢失了一定的次数,则会进行 Socket 的断开重连if (socketOptions.getMaxHeartBeatLoseTimes() != -1 && loseTimes.incrementAndGet() >= socketOptions.getMaxHeartBeatLoseTimes()) {connectionManager.disConnect(socketOptions.isReConnection());// 重置失联次数resetLoseTimes();} else {// 发送心跳包connectionManager.upBytes(clientHeartBeat);}}};@Overridepublic void onReceiveHeartBeat() {// 重置失联次数resetLoseTimes();}@Overridepublic void stopHeartBeat() {isActivate = false;closeThread();}/*** 关闭发送心跳线程*/private void closeThread() {if (heartBeatExecutor != null && !heartBeatExecutor.isShutdown()) {heartBeatExecutor.shutdownNow();heartBeatExecutor = null;// 重置失联次数resetLoseTimes();//LogUtil.i("heartBeatExecutor closeThread");}}/*** 重置心跳的失联次数*/private void resetLoseTimes() {loseTimes.set(-1);}@Overridepublic void onSocketConnectSuccess(SocketAddress socketAddress) {if (isActivate) {startThread();}}@Overridepublic void onSocketConnectFail(SocketAddress socketAddress, boolean isNeedReconnect) {// 如果不需要重连,则关闭心跳频率线程if (!isNeedReconnect) {closeThread();}}@Overridepublic void onSocketDisconnect(SocketAddress socketAddress, boolean isNeedReconnect) {// 如果不需要重连,则关闭心跳检测if (!isNeedReconnect) {closeThread();}}@Overridepublic void onSocketResponse(SocketAddress socketAddress, OriginReadData originReadData) {if (heartBeatListener != null && heartBeatListener.isServerHeartBeat(originReadData)) {//收到服务器心跳数据onReceiveHeartBeat();}}
}

5. 修改 Socket 连接管理器

  5.1 创建发送数据接口

/*** @Description: 发送数据接口*/
public interface ISend {/*** 发送一个有回调的消息** @param superCallBackSender 消息结构* @return*/IConnectionManager upCallBackMessage(SuperCallBackSender superCallBackSender);/*** 发送 byte 数组** @param bytes byte 类型消息* @return*/IConnectionManager upBytes(byte[] bytes);
}

  5.2 修改连接管理器接口规范,IConnectionManager.java

/*** @Description: 连接管理器的接口规范*/
public interface IConnectionManager extends ISubscribeSocketAction, IOptions<IConnectionManager>, ISend {/*** 开始连接*/void connect();/*** 关闭连接** @param isNeedReconnect 是否需要重连*/void disConnect(boolean isNeedReconnect);/*** 获取连接状态** @return*/int getConnectionStatus();/*** 是否可连接的** @return*/boolean isConnectViable();/*** 切换 host** @param socketAddress*/void switchHost(SocketAddress socketAddress);/*** 获取输入流** @return*/InputStream getInputStream();/*** 获取输出流** @return*/OutputStream getOutputStream();/*** 获取心跳管理器** @return*/IHeartBeatManager getHeartBeatManager();
}

  5.3 在Socket 连接超类中添加心跳管理器,SuperConnection.java

/*** @Description: Socket 连接的超类*/
public abstract class SuperConnection implements IConnectionManager {/*** 连接的状态,初始值为断开连接*/protected final AtomicInteger connectionStatus = new AtomicInteger(SocketStatus.SOCKET_DISCONNECTED);/*** Socket 配置参数*/protected IOSocketOptions socketOptions;/*** 连接线程*/private ExecutorService connectionExecutor;/*** Socket 主机地址*/protected SocketAddress socketAddress;/*** Socket 行为分发器*/private ISocketActionDispatch actionDispatcher;/*** Socket 重连管理器*/private AbsReConnection reConnection;/*** Socket 读写数据管理器*/private IIOManager ioManager;/*** Socket 回调消息分发器*/private CallBackResponseDispatcher callBackResponseDispatcher;/*** Socket 心跳管理器*/private IHeartBeatManager heartBeatManager;public SuperConnection(SocketAddress socketAddress) {this.socketAddress = socketAddress;this.actionDispatcher = new SocketActionDispatcher(this, socketAddress);}@Overridepublic void subscribeSocketAction(ISocketActionListener iSocketActionListener) {// 订阅 Socket 行为监听this.actionDispatcher.subscribe(iSocketActionListener);}@Overridepublic void unSubscribeSocketAction(ISocketActionListener iSocketActionListener) {// 解除 Socket 行为监听this.actionDispatcher.unSubscribe(iSocketActionListener);}@Overridepublic IConnectionManager setOptions(IOSocketOptions socketOptions) {if (socketOptions == null) return this;this.socketOptions = socketOptions;// 是否更改重连管理器if (reConnection != null && !reConnection.equals(socketOptions.getReConnectionManager())) {// 停止分离重连管理器reConnection.detach();reConnection = socketOptions.getReConnectionManager();// 关联连接器reConnection.attach(this);}// 设置读写数据管理器的配置参数if (ioManager != null) ioManager.setOptions(socketOptions);// 设置回调消息分发器的配置参数if (callBackResponseDispatcher != null)callBackResponseDispatcher.setOptions(socketOptions);// 设置心跳包管理器的配置参数if (heartBeatManager != null) heartBeatManager.setOptions(socketOptions);return this;}@Overridepublic IOSocketOptions getOptions() {return socketOptions;}/*** 连接成功*/protected void onConnectionOpened() {LogUtil.i("Socket 连接成功: " + socketAddress.toString());// Socket 行为分发: 连接成功actionDispatcher.dispatchAction(SocketAction.ACTION_CONNECT_SUCCESS);// Socket 连接状态: 成功connectionStatus.set(SocketStatus.SOCKET_CONNECTED);// 打开 Socket 处理数据的管理器openSocketManager();}/*** 打开 Socket 处理数据的管理器*/private void openSocketManager() {if (callBackResponseDispatcher == null) {// 创建回调消息分发器callBackResponseDispatcher = new CallBackResponseDispatcher(this);}if (ioManager == null) {// 创建 io 管理器ioManager = new IOManager(this, actionDispatcher);}// 启动回调消息分发器线程callBackResponseDispatcher.engineThread();// 启动 IO 管理器线程ioManager.startIO();}/*** 连接 Socket 任务*/private Runnable connectTask = () -> {try {openConnection();} catch (Exception e) {LogUtil.e("Socket 连接失败: " + socketAddress.toString());e.printStackTrace();// Socket 连接状态: 已断开连接connectionStatus.set(SocketStatus.SOCKET_DISCONNECTED);// Socket 行为分发: 1.连接失败 2.是否重连,默认为 trueactionDispatcher.dispatchAction(SocketAction.ACTION_CONNECT_FAIL, socketOptions.isReConnection());}};@Overridepublic synchronized void connect() {LogUtil.i("Socket 开始连接: " + socketAddress.toString());if (socketAddress.getIp() == null) {throw new NotNullException("请检查是否设置了IP地址.");}// Socket 状态: 正在连接connectionStatus.set(SocketStatus.SOCKET_CONNECTING);// 重连管理器if (reConnection != null) {reConnection.detach(); // 停止分离重连管理器}reConnection = socketOptions.getReConnectionManager();if (reConnection != null) {reConnection.attach(this);//关联连接器}// 开始 Socket 行为分发线程if (actionDispatcher != null) {actionDispatcher.startDispatchThread();}// 心跳管理器if (heartBeatManager == null) {heartBeatManager = new HeartBeatManager(this);}// 开启线程,进行连接if (connectionExecutor == null || connectionExecutor.isShutdown()) {// 核心线程数为 0,非核心线程数可以有 Integer.MAX_VALUE 个,存活时间为 60 秒,适合于在不断的进行连接情况下,避免重复创建和销毁线程connectionExecutor = Executors.newCachedThreadPool();//LogUtil.i("Executors newCachedThreadPool");}// 执行连接任务connectionExecutor.execute(connectTask);}@Overridepublic synchronized void disConnect(boolean isNeedReconnect) {// 只有在已经连接的状态下才能断开连接if (connectionStatus.get() != SocketStatus.SOCKET_CONNECTED) {return;}// 正在重连中, 则不断开 Socketif (isNeedReconnect && reConnection.isReConnecting()) {return;}// Socket 连接状态: 正在断开连接connectionStatus.set(SocketStatus.SOCKET_DISCONNECTING);// 开启断开连接线程String info = socketAddress.toString();Thread disConnectThread = new DisConnectThread("DisConnection thread: " + info, isNeedReconnect);// setDaemon: true 既设置该线程为守护线程,表示该线程是不重要的,进程退出时,不需要等待这个线程执行完成,目的,避免子线程无限死循环,导致退不出程序disConnectThread.setDaemon(true);disConnectThread.start();}/*** 断开连接线程*/private class DisConnectThread extends Thread {// 当前连接断开后,是否需要自动重连boolean isNeedReconnect;public DisConnectThread(@NonNull String name, boolean isNeedReconnect) {super(name);this.isNeedReconnect = isNeedReconnect;}@Overridepublic void run() {try {// 关闭 io 管理器中线程if (ioManager != null) ioManager.closeIO();// 关闭回调分发器线程if (callBackResponseDispatcher != null) callBackResponseDispatcher.shutdownThread();// 关闭连接线程if (connectionExecutor != null && !connectionExecutor.isShutdown()) {connectionExecutor.shutdown();connectionExecutor = null;}closeConnection();// getName()LogUtil.i("关闭Socket连接: " + socketAddress.toString());// Socket 连接状态: 已断开连接connectionStatus.set(SocketStatus.SOCKET_DISCONNECTED);// Socket 行为分发: 1.断开连接 2.是否重连actionDispatcher.dispatchAction(SocketAction.ACTION_DISCONNECTION, isNeedReconnect);} catch (Exception e) {// 断开连接时,发生异常e.printStackTrace();LogUtil.e(e.getMessage());}}}@Overridepublic synchronized void switchHost(SocketAddress socketAddress) {if (socketAddress != null) {SocketAddress oldAddress = this.socketAddress;this.socketAddress = socketAddress;//Socket 分发行为更改主机地址if (actionDispatcher != null) {actionDispatcher.setSocketAddress(socketAddress);}}}@Overridepublic int getConnectionStatus() {return connectionStatus.get();}@Overridepublic boolean isConnectViable() {// 当前 socket 是否处于可连接的状态 Util.isNetConnected() &&//TODOreturn connectionStatus.get() == SocketStatus.SOCKET_DISCONNECTED;}/*** 打开连接** @throws Exception*/protected abstract void openConnection() throws Exception;/*** 关闭连接** @throws IOException*/abstract void closeConnection() throws Exception;@Overridepublic synchronized IConnectionManager upCallBackMessage(SuperCallBackSender superCallBackSender) {// 检测发送数据消息体中 callBackID 是不是一样callBackResponseDispatcher.checkCallBackSender(superCallBackSender);//根据自己的协议打包消息,发送数据sendBytes(superCallBackSender.pack());return this;}@Overridepublic synchronized IConnectionManager upBytes(byte[] bytes) {//发送数据sendBytes(bytes);return this;}/*** 发送 byte 数组数据** @param bytes* @return*/private IConnectionManager sendBytes(byte[] bytes) {// 判断 Socket 是否已连接if (ioManager == null || connectionStatus.get() != SocketStatus.SOCKET_CONNECTED) {return this;}ioManager.sendBytes(bytes);return this;}@Overridepublic IHeartBeatManager getHeartBeatManager() {return heartBeatManager;}
}

6. 发送数据定义

  6.1 创建发送数据接口类,IMessage.java

/*** @Description: 发送数据接口*/
public interface IMessage {/*** 根据自己的协议打包消息** @return*/byte[] pack();
}

  6.2 创建发送消息基类,AbsMessage.java

/*** @Description: 发送消息基类*/
public class AbsMessage implements IMessage {@Overridepublic byte[] pack() {byte[] body = new Gson().toJson(this).getBytes();ByteBuffer byteBuffer = ByteBuffer.allocate(body.length + 4);byteBuffer.order(ByteOrder.BIG_ENDIAN);byteBuffer.putInt(body.length);byteBuffer.put(body);return byteBuffer.array();}
}

  6.3 创建心跳数据包示例,ClientHeartBeat.java

/*** @Description: 心跳数据包示例*/
public class ClientHeartBeat extends AbsMessage {private String msgId;private String from;public String getMsgId() {return msgId;}public void setMsgId(String msgId) {this.msgId = msgId;}public String getFrom() {return from;}public void setFrom(String from) {this.from = from;}@Overridepublic String toString() {return "ClientHeartBeat{" + "msgId='" + msgId + '\\'' + ", from='" + from + '\\'' + '}';}
}

7. 测试方法

    //测试方法public void initViewHeartBeat(View view) {SocketAddress socketAddress = new SocketAddress("192.168.1.52", 6688);IOSocketOptions socketOptions = new IOSocketOptions.Builder().setSocketAddress(socketAddress).setMessageProtocol(new DefaultMessageProtocol()).build();TcpConnection connection = new TcpConnection(socketAddress);connection.setOptions(socketOptions);view.findViewById(R.id.but_connect).setOnClickListener(v -> connection.connect());//心跳示例ClientHeartBeat heartBeat = new ClientHeartBeat();heartBeat.setMsgId("heart_beat");heartBeat.setFrom("client");view.findViewById(R.id.but_add).setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View v) {connection.getHeartBeatManager().startHeartBeat(heartBeat.pack(), new HeartBeatListener() {@Overridepublic boolean isServerHeartBeat(OriginReadData originReadData) {try {String s = originReadData.getBodyString();JSONObject jsonObject = new JSONObject(s);if ("heart_beat".equals(jsonObject.getString("msgId"))) {LogUtil.i("收到服务器心跳");return true;}} catch (JSONException e) {e.printStackTrace();}return false;}});}});view.findViewById(R.id.but_stop).setOnClickListener(v -> connection.getHeartBeatManager().stopHeartBeat());}

大连人才网