核心通信模块代码优化

This commit is contained in:
attiya 2025-11-07 16:20:24 +08:00
parent aa1e9fc3a2
commit f4ce280a4c
2 changed files with 22 additions and 9 deletions

View File

@ -7,6 +7,8 @@ import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/** /**
* MQTT 连接工厂 * MQTT 连接工厂
*/ */
@ -54,14 +56,23 @@ public class MqttConnectionFactory extends BasePooledObjectFactory<Channel> {
} }
@Override @Override
public void destroyObject(PooledObject<Channel> p){ public void destroyObject(PooledObject<Channel> p) {
Channel channel = p.getObject(); Channel channel = p.getObject();
if (channel != null) { if (channel != null) {
try { try {
String remoteAddress = channel.remoteAddress() != null ?
channel.remoteAddress().toString() : "unknown";
// 记录详细的销毁原因
log.info("销毁MQTT连接 - 远程地址: {}, 活跃: {}, 开启: {}, 可写: {}",
remoteAddress,
channel.isActive(),
channel.isOpen(),
channel.isWritable()); // 需要实现这个方法
if (channel.isActive()) { if (channel.isActive()) {
channel.close().await(5, java.util.concurrent.TimeUnit.SECONDS); channel.close().await(5, TimeUnit.SECONDS);
} }
log.debug("MQTT连接已销毁");
} catch (Exception e) { } catch (Exception e) {
log.warn("销毁MQTT连接异常", e); log.warn("销毁MQTT连接异常", e);
} }

View File

@ -46,11 +46,13 @@ public class MqttConnectionPool implements DisposableBean, InitializingBean {
config.setMinIdle(MqttConfig.getPoolMinIdle()); config.setMinIdle(MqttConfig.getPoolMinIdle());
// 启用空闲连接维护 // 启用空闲连接维护
config.setTimeBetweenEvictionRuns(Duration.ofSeconds(30)); config.setTimeBetweenEvictionRuns(Duration.ofMinutes(5));
config.setSoftMinEvictableIdleTime(Duration.ofMinutes(2)); config.setSoftMinEvictableIdleTime(Duration.ofMinutes(30));
config.setNumTestsPerEvictionRun(3); config.setMinEvictableIdleTime(Duration.ofMinutes(30));
config.setTestWhileIdle(true); config.setNumTestsPerEvictionRun(-1);
config.setTestOnBorrow(true);
config.setTestOnBorrow(false);
config.setTestOnReturn(false);
config.setTestOnReturn(false); config.setTestOnReturn(false);
connectionPool = new GenericObjectPool<>(mqttConnectionFactory, config); connectionPool = new GenericObjectPool<>(mqttConnectionFactory, config);
@ -101,7 +103,7 @@ public class MqttConnectionPool implements DisposableBean, InitializingBean {
} catch (Exception e) { } catch (Exception e) {
log.error("维护MQTT连接池异常", e); log.error("维护MQTT连接池异常", e);
} }
}, 10, 60, TimeUnit.SECONDS); }, 10, 60*5, TimeUnit.SECONDS);
} }
/** /**