Merge branch 'gather-1.0'

This commit is contained in:
attiya 2025-11-06 18:26:59 +08:00
commit 2e45df1893
10 changed files with 1182 additions and 411 deletions

View File

@ -1,73 +1,111 @@
package org.cdzy.gather.config;
import jakarta.annotation.PostConstruct;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* @author attiya
* MQTT 配置类
*/
@Slf4j
@Component
public class MqttConfig {
// 静态获取方法
@Getter
private static String host;
@Getter
private static int port;
@Getter
private static String username;
@Getter
private static List<String> topic;
@Getter
private static String password;
@Getter
private static String clientId;
// 连接池参数
@Getter
private static final int poolMaxTotal = 20;
private static String username;
@Getter
private static String password;
@Getter
private static int timeout;
@Getter
private static boolean cleanSession;
@Getter
private static int keepAlive;
@Getter
private static final int poolMaxIdle = 10;
private static int poolMaxTotal;
@Getter
private static int poolMaxIdle;
@Getter
private static int poolMinIdle;
@Getter
private static final int poolMinIdle = 5;
private static String autoSubscribeTopics;
@Value("${mqtt.host}")
@Value("${mqtt.host:localhost}")
public void setHost(String host) {
MqttConfig.host = host;
}
@Value("${mqtt.port}")
@Value("${mqtt.port:1883}")
public void setPort(int port) {
MqttConfig.port = port;
}
@Value("${mqtt.username}")
@Value("${mqtt.client-id:}")
public void setClientId(String clientId) {
MqttConfig.clientId = clientId;
}
@Value("${mqtt.username:}")
public void setUsername(String username) {
MqttConfig.username = username;
}
@Value("${mqtt.password}")
@Value("${mqtt.password:}")
public void setPassword(String password) {
MqttConfig.password = password;
}
@Value("${mqtt.topic}")
public void setTopic(String topic) {
MqttConfig.topic = Arrays.stream(topic.split(",")).toList();
@Value("${mqtt.auto-subscribe-topics:}")
public void setAutoSubscribeTopics(String autoSubscribeTopics) {
MqttConfig.autoSubscribeTopics = autoSubscribeTopics;
}
@Value("${mqtt.clientId}")
public void setClientId(String clientId) {
MqttConfig.clientId = clientId;
@Value("${mqtt.timeout:30}")
public void setTimeout(int timeout) {
MqttConfig.timeout = timeout;
}
}
@Value("${mqtt.clean-session:true}")
public void setCleanSession(boolean cleanSession) {
MqttConfig.cleanSession = cleanSession;
}
@Value("${mqtt.keep-alive:60}")
public void setKeepAlive(int keepAlive) {
MqttConfig.keepAlive = keepAlive;
}
@Value("${mqtt.pool.max-total:20}")
public void setPoolMaxTotal(int poolMaxTotal) {
MqttConfig.poolMaxTotal = poolMaxTotal;
}
@Value("${mqtt.pool.max-idle:10}")
public void setPoolMaxIdle(int poolMaxIdle) {
MqttConfig.poolMaxIdle = poolMaxIdle;
}
@Value("${mqtt.pool.min-idle:5}")
public void setPoolMinIdle(int poolMinIdle) {
MqttConfig.poolMinIdle = poolMinIdle;
}
@PostConstruct
public void init() {
log.info("MQTT配置初始化 - 地址: {}:{}", host, port);
log.info("MQTT连接池配置 - maxTotal: {}, maxIdle: {}, minIdle: {}",
poolMaxTotal, poolMaxIdle, poolMinIdle);
}
}

View File

@ -1,9 +1,9 @@
package org.cdzy.gather.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.handler.codec.mqtt.MqttQoS;
import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.cdzy.gather.mqtt.MqttPoolClient;
import org.slf4j.Logger;
@ -19,22 +19,80 @@ import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
private static final Logger log= LoggerFactory.getLogger(KafkaConsumer.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
private final MqttPoolClient mqttPoolClient;
public KafkaConsumer(MqttPoolClient mqttPoolClient) {
this.mqttPoolClient = mqttPoolClient;
}
@PostConstruct
public void init() {
log.info("KafkaConsumer初始化完成MQTT连接池状态: {}", mqttPoolClient.getPoolStatus());
}
/**
* 消费者监听消息
*
* @param record 消息
*/
@KafkaListener(topics = {"command"})
public void onMessage(ConsumerRecord<?, ?> record) throws JsonProcessingException {
log.info("[KAFKA接收] 主题: {}, 内容: {}", record.topic(), record.value());
String jsonStr = String.valueOf(record.value());
ObjectNode objectNode = objectMapper.readValue(jsonStr, ObjectNode.class);
String topic = objectNode.get("topic").asText();
String command = objectNode.get("command").asText();
MqttPoolClient.sendMessage(topic, MqttQoS.AT_LEAST_ONCE, command);
public void onMessage(ConsumerRecord<?, ?> record) {
log.info("[KAFKA接收] 主题: {}, 内容: {}", record.topic(), record.value());
try {
String jsonStr = String.valueOf(record.value());
ObjectNode objectNode = objectMapper.readValue(jsonStr, ObjectNode.class);
// 提取MQTT主题和消息内容
String topic = objectNode.get("topic").asText();
String command = objectNode.get("command").asText();
// 使用注入的 MqttPoolClient 发送消息
boolean sent = mqttPoolClient.sendMessage(topic, MqttQoS.AT_LEAST_ONCE, command);
log.info("[MQTT发送结果] 主题: {}, 成功: {}", topic, sent);
// 如果发送失败可以添加重试逻辑或告警
if (!sent) {
log.error("[MQTT发送失败] 主题: {}, 内容: {}", topic, command);
// 这里可以添加重试逻辑或发送到死信队列
handleSendFailure(topic, command);
}
} catch (Exception e) {
log.error("[KAFKA处理异常] 消息处理失败: {}", record.value(), e);
// 处理解析或发送异常
handleProcessingError(record.value(), e);
}
}
/**
* 处理发送失败的情况
*/
private void handleSendFailure(String topic, String command) {
// 这里可以实现重试逻辑
log.warn("[重试发送] 主题: {}, 内容: {}", topic, command);
// 示例重试一次
boolean retrySuccess = mqttPoolClient.sendMessage(topic, MqttQoS.AT_LEAST_ONCE, command, 1, 3000);
if (retrySuccess) {
log.info("[重试成功] 主题: {}", topic);
} else {
log.error("[重试失败] 主题: {}, 内容{},需要人工干预", topic, command);
// 可以发送到死信队列或记录到数据库
}
}
/**
* 处理消息处理异常
*/
private void handleProcessingError(Object message, Exception e) {
log.error("[消息处理异常] 原始消息: {}, 异常: {}", message, e.getMessage());
// 这里可以记录错误日志发送告警等
}
}

View File

@ -0,0 +1,487 @@
package org.cdzy.gather.mqtt;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.cdzy.gather.config.MqttConfig;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* MQTT 客户端工厂
*/
@Slf4j
@Component
public class MqttClientFactory {
private final EventLoopGroup workerGroup;
private final AtomicInteger messageIdCounter = new AtomicInteger(1);
public MqttClientFactory() {
this.workerGroup = new NioEventLoopGroup();
}
/**
* 创建MQTT连接通道
*/
public Channel createChannel() throws Exception {
return createChannelWithRetry(3, 2000);
}
/**
* 创建MQTT连接通道带重试
*/
public Channel createChannelWithRetry(int maxRetries, long retryIntervalMs) throws Exception {
int attempts = 0;
Exception lastException = null;
while (attempts <= maxRetries) {
try {
Channel channel = createChannelInternal();
if (channel != null && channel.isActive()) {
log.info("MQTT连接创建成功 (尝试 {}/{})", attempts + 1, maxRetries + 1);
return channel;
}
} catch (Exception e) {
lastException = e;
log.warn("MQTT连接创建失败 (尝试 {}/{}): {}",
attempts + 1, maxRetries + 1, e.getMessage());
}
attempts++;
if (attempts <= maxRetries) {
log.info("等待 {}ms 后重试连接...", retryIntervalMs);
Thread.sleep(retryIntervalMs);
retryIntervalMs = Math.min(retryIntervalMs * 2, 10000);
}
}
throw new Exception("MQTT连接创建失败已达到最大重试次数: " + maxRetries, lastException);
}
/**
* 内部创建通道方法
*/
private Channel createChannelInternal() throws Exception {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, MqttConfig.getTimeout() * 1000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// MQTT编解码器
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
pipeline.addLast("decoder", new MqttDecoder());
// 心跳处理
pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 45, 60));
// 业务处理器 - 使用固定名称便于前序插入
pipeline.addLast("mqttHandler", new MqttHandler());
}
});
// 异步连接
CompletableFuture<Channel> connectFuture = new CompletableFuture<>();
// 添加连接超时监控
ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor();
timeoutScheduler.schedule(() -> {
if (!connectFuture.isDone()) {
connectFuture.completeExceptionally(new TimeoutException("MQTT连接超时"));
}
}, MqttConfig.getTimeout(), TimeUnit.SECONDS);
bootstrap.connect(MqttConfig.getHost(), MqttConfig.getPort())
.addListener((ChannelFuture future) -> {
timeoutScheduler.shutdown(); // 关闭超时监控
if (future.isSuccess()) {
Channel channel = future.channel();
log.debug("TCP连接建立成功开始MQTT握手...");
sendConnectMessage(channel, connectFuture);
} else {
connectFuture.completeExceptionally(
new Exception("TCP连接失败: " + future.cause().getMessage(), future.cause())
);
}
});
// 等待连接完成
try {
return connectFuture.get(MqttConfig.getTimeout() + 2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new Exception("MQTT连接超时: " + e.getMessage(), e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof Exception) {
throw (Exception) cause;
} else {
throw new Exception("MQTT连接失败: " + cause.getMessage(), cause);
}
} finally {
timeoutScheduler.shutdown();
}
}
/**
* 发送MQTT连接消息并等待CONNACK然后自动订阅主题
*/
private void sendConnectMessage(Channel channel, CompletableFuture<Channel> connectFuture) {
try {
String clientId = generateUniqueClientId();
MqttConnectMessage connectMessage = buildConnectMessage(clientId);
// 创建连接响应处理器 - 使用有名称的handler便于移除
ConnAckHandler connAckHandler = new ConnAckHandler(connectFuture, clientId);
// 添加连接响应处理器到pipeline在MqttHandler之前
channel.pipeline().addBefore("mqttHandler", "connAckHandler", connAckHandler);
// 发送连接消息
log.debug("发送MQTT CONNECT消息ClientId: {}", clientId);
channel.writeAndFlush(connectMessage).addListener(future -> {
if (!future.isSuccess()) {
log.error("发送MQTT CONNECT消息失败ClientId: {}", clientId, future.cause());
// 确保移除handler
safelyRemoveHandler(channel.pipeline(), "connAckHandler");
connectFuture.completeExceptionally(
new Exception("发送CONNECT消息失败: " + future.cause().getMessage(), future.cause())
);
}
});
} catch (Exception e) {
log.error("构建MQTT连接消息失败", e);
connectFuture.completeExceptionally(e);
}
}
/**
* 专门的CONNACK处理器
*/
private class ConnAckHandler extends ChannelInboundHandlerAdapter {
private final CompletableFuture<Channel> connectFuture;
private final String clientId;
public ConnAckHandler(CompletableFuture<Channel> connectFuture, String clientId) {
this.connectFuture = connectFuture;
this.clientId = clientId;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof MqttConnAckMessage connAck) {
MqttConnectReturnCode returnCode = connAck.variableHeader().connectReturnCode();
// 立即从pipeline中移除自己防止后续消息继续传递到这里
ctx.pipeline().remove(this);
if (returnCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
log.info("MQTT握手成功ClientId: {}", clientId);
// 连接成功执行自动订阅
autoSubscribeTopics(ctx, clientId).whenComplete((result, ex) -> {
if (ex != null) {
log.warn("自动订阅失败ClientId: {},但连接已建立", clientId, ex);
// 订阅失败不影响连接建立
connectFuture.complete(ctx.channel());
} else {
log.info("自动订阅完成ClientId: {},订阅主题: {}",
clientId, MqttConfig.getAutoSubscribeTopics());
connectFuture.complete(ctx.channel());
}
});
} else {
// 连接被拒绝
String errorMsg = String.format("MQTT连接被拒绝ClientId: %s, 原因: %s",
clientId, returnCode);
log.error(errorMsg);
connectFuture.completeExceptionally(new Exception(errorMsg));
ctx.close();
}
// 重要不调用super.channelRead因为我们已经处理了这个消息
return;
}
// 对于非CONNACK消息传递给下一个handler
super.channelRead(ctx, msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("MQTT握手异常ClientId: {}", clientId, cause);
safelyRemoveHandler(ctx.pipeline(), "connAckHandler");
connectFuture.completeExceptionally(cause);
super.exceptionCaught(ctx, cause);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn("MQTT连接在握手过程中断开ClientId: {}", clientId);
safelyRemoveHandler(ctx.pipeline(), "connAckHandler");
connectFuture.completeExceptionally(new Exception("连接在握手过程中断开"));
super.channelInactive(ctx);
}
}
/**
* 自动订阅配置的主题
*/
private CompletableFuture<Boolean> autoSubscribeTopics(ChannelHandlerContext ctx, String clientId) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
List<String> topics = List.of(MqttConfig.getAutoSubscribeTopics().split(","));
if (topics.isEmpty()) {
// 没有配置自动订阅主题直接返回成功
future.complete(true);
return future;
}
try {
// 准备订阅参数
String[] topicArray = topics.toArray(new String[0]);
for (String topic : topicArray) {
int messageId = generateMessageId();
MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe()
.messageId(messageId)
.addSubscription( MqttQoS.AT_LEAST_ONCE,topic)
.build();
// 添加订阅响应处理器
SubAckHandler subAckHandler = new SubAckHandler(messageId, future, clientId, topicArray);
// 添加订阅响应处理器到pipeline
String handlerName = "subAckHandler-" + messageId;
ctx.pipeline().addBefore("mqttHandler", handlerName, subAckHandler);
// 发送订阅消息
log.debug("发送自动订阅请求ClientId: {}消息ID: {},主题: {}",
clientId, messageId, Arrays.toString(topicArray));
ctx.channel().writeAndFlush(subscribeMessage).addListener(sendFuture -> {
if (!sendFuture.isSuccess()) {
log.error("发送订阅消息失败ClientId: {}消息ID: {}", clientId, messageId, sendFuture.cause());
safelyRemoveHandler(ctx.pipeline(), handlerName);
future.completeExceptionally(
new Exception("发送订阅消息失败: " + sendFuture.cause().getMessage(), sendFuture.cause())
);
}
});
}
} catch (Exception e) {
log.error("构建订阅消息失败ClientId: {}", clientId, e);
future.completeExceptionally(e);
}
return future;
}
/**
* 专门的SUBACK处理器
*/
private static class SubAckHandler extends ChannelInboundHandlerAdapter {
private final int messageId;
private final CompletableFuture<Boolean> future;
private final String clientId;
private final String[] topics;
public SubAckHandler(int messageId, CompletableFuture<Boolean> future, String clientId, String[] topics) {
this.messageId = messageId;
this.future = future;
this.clientId = clientId;
this.topics = topics;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof MqttSubAckMessage subAck) {
if (subAck.variableHeader().messageId() == messageId) {
// 立即从pipeline中移除自己
String handlerName = "subAckHandler-" + messageId;
safelyRemoveHandler(ctx.pipeline(), handlerName);
// 处理订阅结果
handleSubscriptionResult(subAck);
// 不调用super.channelRead因为我们已经处理了这个消息
return;
}
}
// 对于非匹配的SUBACK消息传递给下一个handler
super.channelRead(ctx, msg);
}
private void handleSubscriptionResult(MqttSubAckMessage subAck) {
try {
List<Integer> grantedQosList = subAck.payload().grantedQoSLevels();
boolean allSuccess = true;
List<String> failedTopics = new ArrayList<>();
for (int i = 0; i < grantedQosList.size(); i++) {
int grantedQos = grantedQosList.get(i);
String topic = topics[i];
if (grantedQos == 0x80) { // 0x80 表示订阅失败
allSuccess = false;
failedTopics.add(topic);
log.warn("主题订阅失败ClientId: {},主题: {},错误码: {}",
clientId, topic, grantedQos);
} else {
log.debug("主题订阅成功ClientId: {},主题: {}授予QoS: {}",
clientId, topic, grantedQos);
}
}
if (allSuccess) {
log.info("所有主题订阅成功ClientId: {},主题数量: {}", clientId, topics.length);
future.complete(true);
} else {
log.warn("部分主题订阅失败ClientId: {},失败主题: {}", clientId, failedTopics);
// 部分失败也算成功建立连接
future.complete(true);
}
} catch (Exception e) {
log.error("处理订阅结果异常ClientId: {}", clientId, e);
future.completeExceptionally(e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("自动订阅异常ClientId: {}", clientId, cause);
String handlerName = "subAckHandler-" + messageId;
safelyRemoveHandler(ctx.pipeline(), handlerName);
future.completeExceptionally(cause);
super.exceptionCaught(ctx, cause);
}
}
/**
* 构建MQTT连接消息
*/
private MqttConnectMessage buildConnectMessage(String clientId) {
MqttMessageBuilders.ConnectBuilder connectBuilder = MqttMessageBuilders.connect()
.clientId(clientId)
.protocolVersion(MqttVersion.MQTT_3_1_1)
.cleanSession(MqttConfig.isCleanSession())
.keepAlive(MqttConfig.getKeepAlive());
// 添加用户名密码
if (MqttConfig.getUsername() != null && !MqttConfig.getUsername().isEmpty()) {
connectBuilder.username(MqttConfig.getUsername());
if (MqttConfig.getPassword() != null && !MqttConfig.getPassword().isEmpty()) {
connectBuilder.password(MqttConfig.getPassword().getBytes());
}
}
connectBuilder.willFlag(false);
return connectBuilder.build();
}
/**
* 生成唯一的ClientId
*/
private String generateUniqueClientId() {
// 基于配置前缀 + 时间戳 + 随机数
String baseClientId = MqttConfig.getClientId();
if (baseClientId == null || baseClientId.trim().isEmpty()) {
baseClientId = "java-client";
}
// 移除可能存在的非法字符只保留字母数字和连字符
baseClientId = baseClientId.replaceAll("[^a-zA-Z0-9-]", "");
// 生成唯一后缀时间戳 + 随机数
String timestamp = String.valueOf(System.currentTimeMillis());
String randomSuffix = String.valueOf(ThreadLocalRandom.current().nextInt(1000, 9999));
// 组合成最终的clientId确保不超过23字符(MQTT限制)
String uniqueId = baseClientId + "-" + timestamp.substring(7) + "-" + randomSuffix;
// MQTT协议规定clientId最大长度为23字符
if (uniqueId.length() > 23) {
uniqueId = uniqueId.substring(0, 23);
}
return uniqueId;
}
/**
* 生成消息ID
*/
private int generateMessageId() {
int messageId = messageIdCounter.getAndIncrement();
if (messageId > 65535) {
messageIdCounter.set(1);
messageId = 1;
}
return messageId;
}
/**
* 安全移除handler
*/
private static void safelyRemoveHandler(ChannelPipeline pipeline, String handlerName) {
try {
if (pipeline.get(handlerName) != null) {
pipeline.remove(handlerName);
}
} catch (Exception e) {
log.debug("移除handler失败: {},可能已被移除", handlerName);
}
}
/**
* 关闭工厂释放资源
*/
public void shutdown() {
if (!workerGroup.isShutdown()) {
workerGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
log.info("MQTT客户端工厂已关闭");
}
}
/**
* 诊断MQTT连接问题
*/
public void diagnoseConnectionIssue() {
log.info("=== MQTT连接诊断 ===");
log.info("MQTT服务器: {}:{}", MqttConfig.getHost(), MqttConfig.getPort());
log.info("连接超时: {}秒", MqttConfig.getTimeout());
log.info("ClientId基础: {}", MqttConfig.getClientId());
log.info("用户名: {}", MqttConfig.getUsername() != null ? "已设置" : "未设置");
log.info("自动订阅主题: {}", MqttConfig.getAutoSubscribeTopics());
// 测试网络连通性
testNetworkConnectivity();
}
private void testNetworkConnectivity() {
try (java.net.Socket socket = new java.net.Socket()) {
socket.connect(new java.net.InetSocketAddress(MqttConfig.getHost(), MqttConfig.getPort()), 5000);
log.info("网络连通性: ✓ 可以连接到MQTT服务器");
} catch (Exception e) {
log.error("网络连通性: ✗ 无法连接到MQTT服务器 - {}", e.getMessage());
}
}
}

View File

@ -1,51 +1,38 @@
package org.cdzy.gather.mqtt;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.cdzy.gather.config.MqttConfig;
import java.util.UUID;
import org.springframework.stereotype.Component;
/**
* MQTT 连接工厂负责创建/销毁连接
* MQTT 连接工厂
*/
@Slf4j
@Component
public class MqttConnectionFactory extends BasePooledObjectFactory<Channel> {
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final MqttClientFactory mqttClientFactory;
public MqttConnectionFactory(MqttClientFactory mqttClientFactory) {
this.mqttClientFactory = mqttClientFactory;
}
@Override
public Channel create() throws Exception {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(MqttEncoder.INSTANCE);
pipeline.addLast(new MqttDecoder());
pipeline.addLast(new IdleStateHandler(0, 45, 60));
pipeline.addLast(new MqttHandler());
}
});
ChannelFuture future = bootstrap.connect(MqttConfig.getHost(), MqttConfig.getPort()).sync();
if (!future.isSuccess()) {
throw new IllegalStateException("连接创建失败: " + future.cause());
try {
Channel channel = mqttClientFactory.createChannel();
if (channel != null && channel.isActive()) {
log.debug("创建MQTT连接成功");
return channel;
} else {
throw new RuntimeException("创建的MQTT连接无效");
}
} catch (Exception e) {
log.error("创建MQTT连接失败", e);
throw e;
}
Channel channel = future.channel();
sendConnectMessage(channel); // 发送 CONNECT 消息
return channel;
}
@Override
@ -53,31 +40,32 @@ public class MqttConnectionFactory extends BasePooledObjectFactory<Channel> {
return new DefaultPooledObject<>(channel);
}
// 销毁连接
@Override
public void destroyObject(PooledObject<Channel> p) {
Channel channel = p.getObject();
if (channel.isActive()) {
channel.close().addListener(future -> log.info("连接已关闭"));
}
}
// 验证连接是否有效
@Override
public boolean validateObject(PooledObject<Channel> p) {
Channel channel = p.getObject();
return channel.isActive() && channel.isWritable();
boolean isValid = channel != null &&
channel.isActive() &&
channel.isOpen() &&
channel.isWritable();
if (!isValid && log.isDebugEnabled()) {
log.debug("MQTT连接验证失败");
}
return isValid;
}
private void sendConnectMessage(Channel channel) {
MqttConnectMessage connectMsg = MqttMessageBuilders.connect()
.clientId("manager_" + UUID.randomUUID().toString().substring(0,5))
.username(MqttConfig.getUsername())
.password(MqttConfig.getPassword().getBytes())
.cleanSession(true)
.keepAlive(60)
.protocolVersion(MqttVersion.MQTT_3_1_1)
.build();
channel.writeAndFlush(connectMsg);
@Override
public void destroyObject(PooledObject<Channel> p){
Channel channel = p.getObject();
if (channel != null) {
try {
if (channel.isActive()) {
channel.close().await(5, java.util.concurrent.TimeUnit.SECONDS);
}
log.debug("MQTT连接已销毁");
} catch (Exception e) {
log.warn("销毁MQTT连接异常", e);
}
}
}
}

View File

@ -6,62 +6,203 @@ import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.cdzy.gather.config.MqttConfig;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* MQTT 连接池管理器
*/
@Slf4j
public class MqttConnectionPool implements DisposableBean {
private static GenericObjectPool<Channel> connectionPool;
// 通过构造函数注入配置
static {
@Component
public class MqttConnectionPool implements DisposableBean, InitializingBean {
private GenericObjectPool<Channel> connectionPool;
private final MqttConnectionFactory mqttConnectionFactory;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public MqttConnectionPool(MqttConnectionFactory mqttConnectionFactory) {
this.mqttConnectionFactory = mqttConnectionFactory;
log.info("MqttPoolClient 初始化完成");
}
@Override
public void afterPropertiesSet(){
initPool();
preCreateConnections();
startConnectionMaintenance();
log.info("MQTT连接池初始化完成");
}
private static void initPool() {
MqttConnectionFactory factory = new MqttConnectionFactory();
private void initPool() {
GenericObjectPoolConfig<Channel> config = new GenericObjectPoolConfig<>();
// 从配置读取参数
config.setMaxTotal(MqttConfig.getPoolMaxTotal());// 最大总连接数
config.setMaxIdle(MqttConfig.getPoolMaxIdle());//超过此数会立即回收
config.setMinIdle(MqttConfig.getPoolMinIdle());// 最小空闲连接数必须保留的连接数
// config.setInitialSize(3);
config.setMaxTotal(MqttConfig.getPoolMaxTotal());
config.setMaxIdle(MqttConfig.getPoolMaxIdle());
config.setMinIdle(MqttConfig.getPoolMinIdle());
// 启用空闲连接维护
config.setTimeBetweenEvictionRuns(Duration.ofSeconds(60));
config.setSoftMinEvictableIdleTime(Duration.ofMinutes(5)); // 空闲>30分钟且超过minIdle时回收
config.setNumTestsPerEvictionRun(3); // 限制每次检查数量
config.setTestWhileIdle(true); // 驱逐前先检测有效性
config.setTimeBetweenEvictionRuns(Duration.ofSeconds(30));
config.setSoftMinEvictableIdleTime(Duration.ofMinutes(2));
config.setNumTestsPerEvictionRun(3);
config.setTestWhileIdle(true);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
connectionPool = new GenericObjectPool<>(factory, config);
config.setTestOnReturn(false);
connectionPool = new GenericObjectPool<>(mqttConnectionFactory, config);
log.info("MQTT连接池配置完成: maxTotal={}, maxIdle={}, minIdle={}",
MqttConfig.getPoolMaxTotal(), MqttConfig.getPoolMaxIdle(), MqttConfig.getPoolMinIdle());
}
// 获取连接实例方法
public static Channel getConnection() throws Exception {
Channel channel = connectionPool.borrowObject();
// if (channel == null) {
// connectionPool.addObject();
// }
return channel;
/**
* 预创建连接
*/
private void preCreateConnections() {
int minIdle = MqttConfig.getPoolMinIdle();
if (minIdle <= 0) {
return;
}
log.info("开始预创建 {} 个MQTT连接", minIdle);
int successCount = 0;
for (int i = 0; i < minIdle; i++) {
try {
connectionPool.addObject();
successCount++;
log.debug("成功预创建MQTT连接 {}/{}", i + 1, minIdle);
// 避免短时间内创建大量连接给服务器造成压力
if ((i + 1) % 2 == 0 && i < minIdle - 1) {
Thread.sleep(50);
}
} catch (Exception e) {
log.warn("预创建MQTT连接 {}/{} 失败: {}", i + 1, minIdle, e.getMessage());
}
}
log.info("MQTT连接预创建完成成功创建: {}/{}", successCount, minIdle);
}
// 归还连接实例方法
public static void returnConnection(Channel channel) {
if (channel != null && channel.isActive()) {
connectionPool.returnObject(channel);
/**
* 启动连接维护任务
*/
private void startConnectionMaintenance() {
// 定期检查连接池状态确保最小空闲连接数
scheduler.scheduleAtFixedRate(() -> {
try {
maintainMinIdleConnections();
logPoolStatus();
} catch (Exception e) {
log.error("维护MQTT连接池异常", e);
}
}, 10, 60, TimeUnit.SECONDS);
}
/**
* 维护最小空闲连接数
*/
private void maintainMinIdleConnections() {
int currentIdle = connectionPool.getNumIdle();
int minIdle = MqttConfig.getPoolMinIdle();
int deficit = minIdle - currentIdle;
if (deficit > 0) {
log.debug("检测到空闲连接不足,当前: {},目标: {},需要补充: {}", currentIdle, minIdle, deficit);
for (int i = 0; i < deficit; i++) {
try {
connectionPool.addObject();
log.debug("补充创建空闲MQTT连接 {}/{}", i + 1, deficit);
} catch (Exception e) {
log.warn("补充创建空闲MQTT连接失败: {}", e.getMessage());
break;
}
}
}
}
// 应用关闭时销毁
/**
* 获取连接
*/
public Channel getConnection() throws Exception {
Channel channel = connectionPool.borrowObject();
if (log.isDebugEnabled()) {
log.debug("获取MQTT连接当前活跃: {},空闲: {}",
connectionPool.getNumActive(), connectionPool.getNumIdle());
}
return channel;
}
/**
* 归还连接
*/
public void returnConnection(Channel channel) {
if (channel != null) {
try {
if (channel.isActive() && channel.isOpen()) {
connectionPool.returnObject(channel);
if (log.isDebugEnabled()) {
log.debug("归还MQTT连接当前活跃: {},空闲: {}",
connectionPool.getNumActive(), connectionPool.getNumIdle());
}
} else {
log.warn("检测到不活跃的连接,直接销毁");
connectionPool.invalidateObject(channel);
}
} catch (Exception e) {
log.error("归还MQTT连接异常", e);
}
}
}
/**
* 获取连接池状态
*/
public void logPoolStatus() {
if (connectionPool != null && log.isInfoEnabled()) {
log.info("MQTT连接池状态 - 活跃: {}, 空闲: {}, 等待: {}",
connectionPool.getNumActive(),
connectionPool.getNumIdle(),
connectionPool.getNumWaiters());
}
}
/**
* 获取连接池统计信息
*/
public String getPoolStats() {
if (connectionPool == null) {
return "连接池未初始化";
}
return String.format("活跃: %d, 空闲: %d, 等待: %d",
connectionPool.getNumActive(),
connectionPool.getNumIdle(),
connectionPool.getNumWaiters());
}
@Override
public void destroy() {
connectionPool.close();
log.info("MQTT连接池已关闭");
log.info("开始关闭MQTT连接池...");
try {
scheduler.shutdown();
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
if (connectionPool != null) {
connectionPool.close();
log.info("MQTT连接池已关闭");
}
}
}

View File

@ -1,255 +1,142 @@
package org.cdzy.gather.mqtt;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.channel.Channel;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.cdzy.gather.config.MqttConfig;
import org.cdzy.gather.config.SpringContextHolder;
import org.cdzy.gather.kafka.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author attiya
* MQTT 消息处理器
*/
public class MqttHandler extends SimpleChannelInboundHandler<MqttMessage> {
// 修改MqttHandler中的subscribeToTopic方法
private static final AtomicInteger messageIdCounter = new AtomicInteger(1);
private static final Logger log = LoggerFactory.getLogger(MqttHandler.class);
private static final AtomicBoolean isReconnecting = new AtomicBoolean(false);
private static final int MAX_RETRIES = 5;
private static final AtomicInteger retryCount = new AtomicInteger(0);
private Channel channel;
@Slf4j
public class MqttHandler extends ChannelInboundHandlerAdapter {
private volatile boolean connected = false;
@Override
public void channelActive(ChannelHandlerContext ctx) {
channel = ctx.channel();
public void channelRead(ChannelHandlerContext ctx, Object msg){
if (msg instanceof MqttMessage mqttMessage) {
handleMqttMessage(ctx, mqttMessage);
} else {
log.warn("收到未知类型的消息: {}", msg.getClass().getName());
ctx.fireChannelRead(msg);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) {
private void handleMqttMessage(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
try {
switch (msg.fixedHeader().messageType()) {
case CONNACK -> handleConnAck((MqttConnAckMessage) msg, ctx);
case PUBLISH -> handlePublish((MqttPublishMessage) msg);
case SUBACK -> handleSubAck((MqttSubAckMessage) msg);
// case PUBACK -> handlePubAck(msg);
switch (mqttMessage.fixedHeader().messageType()) {
case CONNACK:
// CONNACK已经在专门的handler中处理这里不应该收到
log.warn("在MqttHandler中收到CONNACK可能配置有误");
break;
case PUBLISH:
handlePublish(ctx, (MqttPublishMessage) mqttMessage);
break;
case PUBACK:
handlePubAck((MqttPubAckMessage) mqttMessage);
break;
case PINGRESP:
log.debug("收到PING响应");
break;
case SUBACK:
handleSubAck((MqttSubAckMessage) mqttMessage);
break;
case UNSUBACK:
handleUnsubAck((MqttUnsubAckMessage) mqttMessage);
break;
default:
log.debug("收到MQTT消息: {}", mqttMessage.fixedHeader().messageType());
break;
}
} catch (Exception e) {
log.error("Error processing message: {}", e.getMessage(), e);
log.error("处理MQTT消息异常", e);
}
}
// private void handlePubAck(MqttMessage pubAck) {
//
// }
private void handleConnAck(ChannelHandlerContext ctx, MqttConnAckMessage connAckMessage) {
MqttConnectReturnCode returnCode = connAckMessage.variableHeader().connectReturnCode();
if (returnCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
log.info("MQTT连接建立成功");
} else {
log.error("MQTT连接被拒绝: {}", returnCode);
ctx.close();
}
}
private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) {
String topic = publishMessage.variableHeader().topicName();
ByteBuf payload = publishMessage.payload();
String content = payload.toString(StandardCharsets.UTF_8);
log.info("收到MQTT消息 - 主题: {}, QoS: {}, 内容: {}",
topic, publishMessage.fixedHeader().qosLevel(), content);
// 根据QoS级别进行响应
MqttQoS qos = publishMessage.fixedHeader().qosLevel();
if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) {
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck()
.packetId(publishMessage.variableHeader().packetId())
.build();
ctx.writeAndFlush(pubAckMessage);
log.debug("已发送PUBACK响应, 消息ID: {}", publishMessage.variableHeader().packetId());
}
processReceivedMessage(topic, content);
}
private void handlePubAck(MqttPubAckMessage pubAckMessage) {
log.debug("收到PUBACK确认, 消息ID: {}", pubAckMessage.variableHeader().messageId());
}
private void handleSubAck(MqttSubAckMessage subAckMessage) {
log.info("订阅确认, 消息ID: {}", subAckMessage.variableHeader().messageId());
}
private void handleUnsubAck(MqttUnsubAckMessage unsubAckMessage) {
log.info("取消订阅确认, 消息ID: {}", unsubAckMessage.variableHeader().messageId());
}
/**
* 心跳
* 处理接收到的消息业务逻辑
*/
@Override
public void userEventTriggered(ChannelHandlerContext context, Object event) {
if (event instanceof IdleStateEvent e) {
// 写空闲时发送 PINGREQ需在 keepAlive 超时前触发
if (e.state() == IdleState.WRITER_IDLE) {
context.writeAndFlush(MqttMessage.PINGREQ);
private void processReceivedMessage(String topic, String content) {
try {
log.info("处理MQTT消息 - 主题: {}, 内容: {}", topic, content);
if (topic.startsWith("sensor/")) {
handleSensorData(topic, content);
} else if (topic.startsWith("control/")) {
handleControlCommand(topic, content);
}
} catch (Exception e) {
log.error("处理MQTT消息业务逻辑异常", e);
}
}
/**
* 订阅
*
* @param channel 通道
* @param topic 主题
*/
private void subscribeToTopic(Channel channel, String topic) {
MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe()
.addSubscription(MqttQoS.AT_LEAST_ONCE, topic)
.messageId(messageIdCounter.getAndIncrement())
.build();
channel.writeAndFlush(subscribeMessage);
private void handleSensorData(String topic, String data) {
log.debug("处理传感器数据: {} -> {}", topic, data);
}
// public static void sendMessage(String topic, MqttQoS qos, String payload) {
// if (channel == null || !channel.isActive()) {
// log.error("通道未就绪,发送失败");
// return;
// }
// if (channel.isActive()) {
// MqttPublishMessage publishMsg = MqttMessageBuilders.publish()
// .messageId(messageIdCounter.getAndIncrement())
// .topicName(topic)
// .qos(qos)
// .payload(Unpooled.copiedBuffer(payload.getBytes()))
// .build();
// channel.writeAndFlush(publishMsg);
//// .addListener(future -> {
//// if (!future.isSuccess()) {
//// //TODO:重发逻辑
//// }
//// });
// log.info("[MQTT发送] 主题: {}, QoS: {}, 内容: {}", topic, qos, payload);
// }
// }
private void handleControlCommand(String topic, String command) {
log.debug("处理控制命令: {} -> {}", topic, command);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage());
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
log.error("MQTT连接异常", cause);
ctx.close();
}
private void sendPubAck(int packetId) {
MqttPubAckMessage pubAck = new MqttPubAckMessage(
new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(packetId)
);
channel.writeAndFlush(pubAck);
log.debug("发送pubAck确认: {}", packetId);
}
private void sendPubRec(int packetId) {
MqttMessage pubRec = new MqttMessage(
new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(packetId)
);
channel.writeAndFlush(pubRec);
}
/**
* 连接反馈
*
* @param connAck 反馈信息
* @param ctx 管道
*/
private void handleConnAck(MqttConnAckMessage connAck, ChannelHandlerContext ctx) {
if (connAck.variableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
log.info("---------------------------------认证成功----------------------------------");
List<String> topic = MqttConfig.getTopic();
topic.forEach(t -> subscribeToTopic(ctx.channel(), t));
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (connected) {
log.warn("MQTT连接断开");
} else {
log.error("--------------------------------认证失败----------------------------------");
log.debug("MQTT连接未完成握手即断开");
}
connected = false;
super.channelInactive(ctx);
}
/**
* 处理上报信息
*
* @param publishMsg 信息
*/
private void handlePublish(MqttPublishMessage publishMsg) {
// 处理PUBLISH消息含Payload
String topic = publishMsg.variableHeader().topicName();
List<String> list = Arrays.stream(topic.split("/")).toList();
String payload = publishMsg.payload().toString(StandardCharsets.UTF_8); // 关键转为UTF-8字符串
// log.info("[MQTT接收] 主题: {}, 内容: {}", topic, payload);
// 根据 QoS 发送确认
if (publishMsg.fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
sendPubAck(publishMsg.variableHeader().packetId());
} else if (publishMsg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
sendPubRec(publishMsg.variableHeader().packetId());
}
KafkaProducer kafkaProducer = SpringContextHolder.getBean(KafkaProducer.class);
ObjectMapper objectMapper = new ObjectMapper();
try {
ObjectNode objectNode = objectMapper.readValue(payload, ObjectNode.class);
objectNode.put("deviceId", list.get(list.size() - 1));
objectNode.put("r_topic", list.get(1));
if (topic.contains("rsp")){
kafkaProducer.send("msg_rsp", objectNode.toString());
}else if (topic.contains("rpt")){
kafkaProducer.send("msg_rpt", objectNode.toString());
}
}catch (JsonProcessingException e){
log.error(e.getMessage());
}
}
private void handleSubAck(MqttSubAckMessage subAck) {
List<Integer> codes = subAck.payload().grantedQoSLevels();
codes.forEach(code -> {
if (code >= 0x80) {
log.error("订阅主题失败 code: 0x{}", Integer.toHexString(code));
} else {
log.info("成功订阅主题 QoS: {}", code);
}
});
}
/**
* 断线重连
*
* @param ctx 管道
*/
public void channelInactive(ChannelHandlerContext ctx) {
//一下代码可能导致问题 Handler 实例导致 Channel 分散
// // 关闭旧连接
// if (channel != null) {
// channel.closeFuture().addListener(f -> log.info("旧连接已释放"));
// channel = null;
// }
//
// // 状态检查避免重复触发
// if (!isReconnecting.compareAndSet(false, true)) {
// log.warn("已有重连任务在进行中,忽略本次触发");
// return;
// }
//
// // 计算退避时间指数退避
// int currentRetry = retryCount.getAndIncrement();
// if (currentRetry >= MAX_RETRIES) {
// log.error("已达最大重试次数 {},放弃重连", MAX_RETRIES);
// isReconnecting.set(false);
// retryCount.set(0);
// return;
// }
//
// long delay = (long) Math.pow(2, currentRetry) * 5; // 5s, 10s, 20s...
// log.info("将在 {} 秒后尝试第 {} 次重连...", delay, currentRetry + 1);
//
// // 创建新的 Bootstrap 并重新连接
// // 调度重连任务
// ctx.channel().eventLoop().schedule(() -> {
// Bootstrap bootstrap = new Bootstrap()
// .group(ctx.channel().eventLoop())
// .channel(NioSocketChannel.class)
// .handler(new MqttHandler());
//
// bootstrap.connect(MqttConfig.getHost(), MqttConfig.getPort())
// .addListener((ChannelFuture future) -> {
// isReconnecting.set(false); // 无论成功与否释放标记
// if (future.isSuccess()) {
// retryCount.set(0); // 成功则重置重试计数器
// log.info("重连成功");
// channel = future.channel();
// } else {
// log.error("重连失败: {}", future.cause().getMessage());
// // 可递归触发 channelInactive() 继续重试需谨慎
// }
// });
// }, delay, TimeUnit.SECONDS);
}
}
}

View File

@ -2,39 +2,240 @@ package org.cdzy.gather.mqtt;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ThreadLocalRandom;
/**
* 支持连接池的 MQTT 客户端
*/
@Slf4j
@Component
public class MqttPoolClient {
private final MqttConnectionPool connectionPool;
private static final AtomicInteger messageIdCounter = new AtomicInteger(1);
public MqttPoolClient(MqttConnectionPool connectionPool) {
this.connectionPool = connectionPool;
}
// 发送消息自动借还连接
public static void sendMessage(String topic, MqttQoS qos, String payload) {
/**
* 发送消息自动借还连接
*/
public boolean sendMessage(String topic, MqttQoS qos, String payload) {
return sendMessage(topic, qos, payload, 0, 0);
}
/**
* 发送消息带重试
*/
public boolean sendMessage(String topic, MqttQoS qos, String payload, int maxRetries, long timeoutMs) {
Channel channel = null;
int attempt = 0;
long startTime = System.currentTimeMillis();
while (attempt <= maxRetries) {
try {
if (timeoutMs > 0 && (System.currentTimeMillis() - startTime) > timeoutMs) {
log.warn("MQTT消息发送超时主题: {}", topic);
return false;
}
channel = connectionPool.getConnection();
if (!isConnectionReady(channel)) {
log.warn("连接不可用,尝试重新获取,主题: {}", topic);
safelyInvalidateConnection(channel);
channel = null;
attempt++;
if (attempt <= maxRetries) {
sleepWithBackoff(attempt);
}
continue;
}
int messageId = generateMessageId();
MqttPublishMessage message = buildPublishMessage(topic, qos, payload, messageId);
boolean success = sendMessageSync(channel, message, topic, messageId);
if (success) {
log.debug("[MQTT发送成功] 主题: {}, QoS: {}, 消息ID: {}", topic, qos, messageId);
return true;
} else {
log.warn("[MQTT发送失败] 主题: {}, 尝试 {}/{}", topic, attempt + 1, maxRetries + 1);
safelyInvalidateConnection(channel);
channel = null;
attempt++;
if (attempt <= maxRetries) {
sleepWithBackoff(attempt);
}
}
} catch (Exception e) {
log.error("MQTT消息发送异常(尝试 {}/{}), 主题: {}, 错误: {}",
attempt + 1, maxRetries + 1, topic, e.getMessage());
safelyInvalidateConnection(channel);
channel = null;
attempt++;
if (attempt <= maxRetries) {
sleepWithBackoff(attempt);
}
} finally {
if (channel != null) {
connectionPool.returnConnection(channel);
}
}
}
log.error("MQTT消息发送失败已达到最大重试次数: {}, 主题: {}", maxRetries + 1, topic);
return false;
}
/**
* 订阅主题
*/
public boolean subscribe(String topic) {
return subscribe(new String[]{topic});
}
/**
* 订阅多个主题
*/
public boolean subscribe(String[] topics) {
Channel channel = null;
try {
channel = MqttConnectionPool.getConnection();
MqttPublishMessage msg = MqttMessageBuilders.publish()
.messageId(messageIdCounter.getAndIncrement())
.topicName(topic)
.qos(qos)
.payload(Unpooled.copiedBuffer(payload.getBytes()))
.build();
channel.writeAndFlush(msg).sync();
log.info("[MQTT发送] 主题: {}, QoS: {}, 内容: {}", topic, qos, payload);
channel = connectionPool.getConnection();
if (!isConnectionReady(channel)) {
return false;
}
for (String topic : topics) {
int messageId = generateMessageId();
MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe()
.messageId(messageId)
.addSubscription(MqttQoS.AT_LEAST_ONCE,topic)
.build();
channel.writeAndFlush(subscribeMessage).sync();
log.info("MQTT订阅成功主题: {}, 消息ID: {}", String.join(",", topics), messageId);
}
return true;
} catch (Exception e) {
log.error("消息发送失败: {}", e.getMessage());
log.error("MQTT订阅失败主题: {}", String.join(",", topics), e);
return false;
} finally {
MqttConnectionPool.returnConnection(channel);
if (channel != null) {
connectionPool.returnConnection(channel);
}
}
}
/**
* 取消订阅
*/
public boolean unsubscribe(String topic) {
return unsubscribe(new String[]{topic});
}
public boolean unsubscribe(String[] topics) {
Channel channel = null;
try {
channel = connectionPool.getConnection();
if (!isConnectionReady(channel)) {
return false;
}
for (String topic : topics) {
int messageId = generateMessageId();
MqttUnsubscribeMessage unsubscribeMessage = MqttMessageBuilders.unsubscribe()
.messageId(messageId)
.addTopicFilter(topic)
.build();
channel.writeAndFlush(unsubscribeMessage).sync();
log.info("MQTT取消订阅成功主题: {}, 消息ID: {}", String.join(",", topics), messageId);
}
return true;
} catch (Exception e) {
log.error("MQTT取消订阅失败主题: {}", String.join(",", topics), e);
return false;
} finally {
if (channel != null) {
connectionPool.returnConnection(channel);
}
}
}
// ========== 私有方法 ==========
private boolean isConnectionReady(Channel channel) {
return channel != null &&
channel.isActive() &&
channel.isOpen() &&
channel.isWritable();
}
private void safelyInvalidateConnection(Channel channel) {
if (channel != null) {
try {
connectionPool.returnConnection(channel);
} catch (Exception e) {
log.warn("安全销毁连接异常", e);
}
}
}
private int generateMessageId() {
return ThreadLocalRandom.current().nextInt(1, 65535);
}
private MqttPublishMessage buildPublishMessage(String topic, MqttQoS qos, String payload, int messageId) {
return MqttMessageBuilders.publish()
.topicName(topic)
.qos(qos)
.retained(false)
.messageId(messageId)
.payload(Unpooled.copiedBuffer(payload.getBytes()))
.build();
}
private boolean sendMessageSync(Channel channel, MqttPublishMessage message, String topic, int messageId) {
try {
channel.writeAndFlush(message).sync();
log.info("[MQTT发送] 主题: {}, QoS: {}, 消息ID: {}, 内容: {}",
topic, message.fixedHeader().qosLevel(), messageId,
message.payload().toString(java.nio.charset.StandardCharsets.UTF_8));
return true;
} catch (Exception e) {
log.error("同步发送MQTT消息异常主题: {}, 消息ID: {}", topic, messageId, e);
return false;
}
}
private void sleepWithBackoff(int attempt) {
try {
long backoffMs = Math.min(1000 * (1L << (attempt - 1)), 10000);
Thread.sleep(backoffMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* 获取连接池状态
*/
public String getPoolStatus() {
return connectionPool.getPoolStats();
}
/**
* 打印连接池状态
*/
public void printPoolStatus() {
connectionPool.logPoolStatus();
}
}

View File

@ -1,36 +0,0 @@
package org.cdzy.gather.mqtt;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.cdzy.gather.config.MqttConfig;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @author attiya
* @since 2025-03-31
*/
@Slf4j
@Component
public class MqttPoolPreloader implements CommandLineRunner {
@Override
public void run(String... args) {
// 预先创建最小空闲连接
int minIdle = MqttConfig.getPoolMinIdle();
List<Channel> list = new ArrayList<>();
for (int i = 0; i < minIdle; i++) {
try {
Channel channel = MqttConnectionPool.getConnection();
list.add(channel);
log.debug("预创建连接: {}", channel.id());
} catch (Exception e) {
log.error("预创建连接失败", e);
}
}
list.forEach(MqttConnectionPool::returnConnection);
}
}

View File

@ -29,13 +29,14 @@ mqtt:
port: 1883
username: admin
password: 970529Wss
topic: ecu/rpt/#,ecu/rsp/#
clientId: manager
# pool:
# max-total: 20
# max-idle: 10
# min-idle: 3
# test-on-borrow: true
auto-subscribe-topics: ecu/rpt/#,ecu/rsp/#
timeout: 10 # 减少超时时间以便更快失败
clean-session: true
keep-alive: 60
pool:
max-total: 10 # 暂时减少连接数
max-idle: 5
min-idle: 2 # 暂时减少最小空闲数
management:
endpoints:
web:

View File

@ -86,4 +86,10 @@ public class EbikeEcuInfo implements Serializable {
*/
private Long updateBy;
/**
* 是否被领取主要针对绑定电池的中控可能被领取进行换电
*/
@Column(onInsertValue = "f")
private Boolean isClaim;
}