> 文章列表 > mqttx read econnreset异常排查

mqttx read econnreset异常排查

mqttx read econnreset异常排查

mqtt 会话 read econnreset
mqttx read econnreset异常排查

使用mqttx连接mqtt服务器时出现READ ECONNRESET的排查
前段时间公司新增了mqtt服务器,在我们初步测试的时候没有问题,但是随着连接数量增多,后续几天连续间隔出现READ ECONNRESET,导致项目无法正常使用,于是排查了一下问题,
根据网上的答案,找到以下文章
https://blog.csdn.net/slxz001/article/details/123368088

  1. 分析可能是mqtt会话队列满了,然后修改了mqtt会话队列参数
  2. 也有可能是项目代码中的连接太多,这就要修改项目代码的连接逻辑了,用同一个mqtt client连接mqtt,然后同时监听多个主题,这样可以避免出现太多连接数连接到mqtt,从而避免造成这个问题
    建议在正式部署之前进行一定连接数量测试,避免出现问题

以下为一点测试代码

/
* 需要自己添加application.yml配置
*/
@AllArgsConstructor
@NoArgsConstructor
@Data
@ConfigurationProperties(prefix = "mqtt")
@Component
public class MqttProperties {// mqtt服务器urlprivate String uri;//登录用户名private String username;// 密码private String password;// 暂时没用,主题会在代码里动态获取private String topic;}
@RunWith(SpringRunner.class)
@SpringBootTest
@Log
public class MqttProducerTest {@Autowiredprivate MqttProperties mqttProperties;private static final String WATCH_PREFIX = "TEST";/* 设备心跳客户端列表*/static Map<String, MqttClient> clientMap;private static final ThreadPoolExecutor POOL_EXECUTOR;static {POOL_EXECUTOR = new ThreadPoolExecutor(2, 2, 30 * 1000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10000),new CustomizableThreadFactory("pool"));clientMap = new HashMap<>();}@Testpublic void testConnect() {int count = 10000;int success = 0;AtomicInteger fail = new AtomicInteger();for (int i = 0; i < count; i++) {log.info(String.format("准备创建第%d个client", i + 1));// MQTTString urlFrontSuffix = mqttProperties.getUri();String clientId = "CLIENT-" + (System.currentTimeMillis() + "").substring(6);/* TOPIC:TEST */String watchTopic = WATCH_PREFIX + (i * 100);System.out.println("clientId: " + clientId + " watchTopic: " + watchTopic);try {MemoryPersistence memoryPersistence = new MemoryPersistence();MqttClient client = new MqttClient(urlFrontSuffix, clientId, memoryPersistence);MqttConnectOptions options = getOptions();// 设置回调函数,当订阅到信息时调用此方法client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {System.out.println(watchTopic + " Connection Lost.");fail.incrementAndGet();}@Overridepublic void messageArrived(String str, MqttMessage mqttMessage) throws Exception {// 订阅成功,并接受信息时调用String payload = new String(mqttMessage.getPayload()); // 获取消息内容log.info(String.format("accepted: %s channel:%s", payload, watchTopic));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("deliveryComplete" + iMqttDeliveryToken);}});client.connect(options);client.subscribe(watchTopic);log.info("connected the mqtt client" + clientId);// mqtt客户端clientMap.put(watchTopic, client);if (client.isConnected()) {success++;log.info(String.format("第%d个client连接成功", success));}} catch (Exception e) {e.printStackTrace();}}log.info(String.format("总共启动%d, 成功%d, 失败%d", count, success, fail.get()));}private MqttConnectOptions getOptions() {MqttConnectOptions options = new MqttConnectOptions();// 设置客户端和服务器是否应在重新启动和重新连接期间记住状态 默认falseoptions.setCleanSession(true);// 设置超时时间options.setConnectionTimeout(10);// 设置会话心跳时间options.setKeepAliveInterval(20);// 设置超时时间options.setConnectionTimeout(10);// 设置会话心跳时间options.setKeepAliveInterval(20);options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());return options;}
}

运行结果:

控制台- 10:35:55.099 [main]  INFO   c.f.a.c.m.MqttProducerTest - [testConnect,58] -准备创建第700个client
clientId: JAVA-CLIENT-4155099 watchTopic: xxx
已断开连接 (32109) - java.io.EOFExceptionat org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFExceptionat java.io.DataInputStream.readByte(DataInputStream.java:267)at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137)... 1 more
控制台- 10:35:55.119 [main]  INFO   c.f.a.c.m.MqttProducerTest - [testConnect,58] -准备创建第701个client
clientId: JAVA-CLIENT-4155119 watchTopic: xxx
已断开连接 (32109) - java.io.EOFException

使用Java代码创建了测试用例,运行测试发现,当连接数建立到700左右时,该问题出现,并抛出了java.io.EOFException异常错误信息
java.io.EOFException

EOFException:当输入过程中意外到达文件或流的末尾时,抛出此异常。

此异常主要被数据输入流用来表明到达流的末尾。注意,其他许多输入操作返回一个特殊值表示到达流的末尾,而不是抛出异常。

产生原因:

  1. 数据流中写入数据时的顺序和读取时的顺序不一致
  2. UTF是双字节编码,而writeChars方法写入的是按照字符格式写入的,在文件中的占位要小于以Unicode编码的同样字符串,所以,使用readUTF方法读取时,会出现EOF错误