From 2253508efba8df46ab6a7dd0136bd27f94c4df98a80f4ad95636df1bddc13280 Mon Sep 17 00:00:00 2001 From: attiya <2413103649@qq.com> Date: Fri, 19 Dec 2025 09:55:41 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=E8=BF=9E=E6=8E=A5=E6=B1=A0=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E3=80=81=E5=8E=BB=E9=99=A4=E5=A4=9A=E4=BA=8E=E4=BE=9D?= =?UTF-8?q?=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/cdzy/gather/mqtt/MqttConnectionPool.java | 5 ++++- .../src/main/java/org/cdzy/gather/mqtt/MqttHandler.java | 3 ++- pom.xml | 9 --------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttConnectionPool.java b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttConnectionPool.java index b393b0c..55f3af7 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttConnectionPool.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttConnectionPool.java @@ -44,6 +44,9 @@ public class MqttConnectionPool implements DisposableBean, InitializingBean { config.setMaxTotal(MqttConfig.getPoolMaxTotal()); config.setMaxIdle(MqttConfig.getPoolMaxIdle()); config.setMinIdle(MqttConfig.getPoolMinIdle()); + + config.setBlockWhenExhausted(true); // 必须设置为 true + config.setMaxWaitMillis(5000); // 等待5秒,不能无限等待 // 启用空闲连接维护 config.setTimeBetweenEvictionRuns(Duration.ofMinutes(5)); @@ -53,7 +56,7 @@ public class MqttConnectionPool implements DisposableBean, InitializingBean { config.setTestOnBorrow(false); config.setTestOnReturn(false); - config.setTestOnReturn(false); + config.setTestWhileIdle(true); // 空闲时验证(保持连接健康) connectionPool = new GenericObjectPool<>(mqttConnectionFactory, config); diff --git a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java index d2e426e..ca0d677 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java @@ -38,6 +38,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { case CONNACK: // CONNACK已经在专门的handler中处理,这里不应该收到 log.warn("在MqttHandler中收到CONNACK,可能配置有误"); + handleConnAck(ctx,(MqttConnAckMessage)mqttMessage); break; case PUBLISH: handlePublish(ctx, (MqttPublishMessage) mqttMessage); @@ -46,7 +47,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { handlePubAck((MqttPubAckMessage) mqttMessage); break; case PINGRESP: - log.debug("收到PING响应"); +// log.debug("收到PING响应"); break; case SUBACK: handleSubAck((MqttSubAckMessage) mqttMessage); diff --git a/pom.xml b/pom.xml index 999f208..89c5725 100644 --- a/pom.xml +++ b/pom.xml @@ -35,7 +35,6 @@ 1.11.3 3.0.4 1.40.0 - 8.0.32 5.0.1 4.12.0 8.5.7 @@ -177,14 +176,6 @@ import - - com.mysql - mysql-connector-j - ${mysql.version} - pom - import - - org.springframework.data spring-data-bom