From f802afd06dc29f33530b637cee0d97f7654002ab7830ab18ff2fd4877aa6967e Mon Sep 17 00:00:00 2001 From: attiya <2413103649@qq.com> Date: Wed, 24 Dec 2025 18:15:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8F=AF=E8=83=BD=E6=BD=9C?= =?UTF-8?q?=E5=9C=A8=E7=9A=84=E5=86=85=E5=AD=98=E6=B3=84=E9=9C=B2=E9=A3=8E?= =?UTF-8?q?=E9=99=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/cdzy/gather/mqtt/MqttHandler.java | 66 +++--- .../org/cdzy/gather/mqtt/MqttPoolClient.java | 192 +++++++----------- 2 files changed, 112 insertions(+), 146 deletions(-) diff --git a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java index ca0d677..5ceb785 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java @@ -23,7 +23,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { private static final ObjectMapper objectMapper = new ObjectMapper(); @Override - public void channelRead(ChannelHandlerContext ctx, Object msg){ + public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof MqttMessage mqttMessage) { handleMqttMessage(ctx, mqttMessage); } else { @@ -38,7 +38,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { case CONNACK: // CONNACK已经在专门的handler中处理,这里不应该收到 log.warn("在MqttHandler中收到CONNACK,可能配置有误"); - handleConnAck(ctx,(MqttConnAckMessage)mqttMessage); + handleConnAck(ctx, (MqttConnAckMessage) mqttMessage); break; case PUBLISH: handlePublish(ctx, (MqttPublishMessage) mqttMessage); @@ -77,27 +77,26 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) { String topic = publishMessage.variableHeader().topicName(); ByteBuf payload = publishMessage.payload(); - try { - String content = payload.toString(StandardCharsets.UTF_8); - log.info("收到MQTT消息 - 主题: {}, QoS: {}, 内容: {}", - topic, publishMessage.fixedHeader().qosLevel(), content); + try { + // 提取内容 + String content = payload.toString(StandardCharsets.UTF_8); - // 根据QoS级别进行响应 - MqttQoS qos = publishMessage.fixedHeader().qosLevel(); - if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) { - MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck() - .packetId(publishMessage.variableHeader().packetId()) - .build(); - ctx.writeAndFlush(pubAckMessage); - log.debug("已发送PUBACK响应, 消息ID: {}", publishMessage.variableHeader().packetId()); - } - - processReceivedMessage(topic, content); - }catch (Exception e){ - payload.release(); - log.error(e.getMessage()); - } + // QoS响应 + MqttQoS qos = publishMessage.fixedHeader().qosLevel(); + if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) { + MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck() + .packetId(publishMessage.variableHeader().packetId()) + .build(); + ctx.writeAndFlush(pubAckMessage); + } + // 处理消息内容 + processReceivedMessage(topic, content); + } catch (Exception e) { + log.error("处理MQTT消息异常,主题: {}", topic, e); + }finally { + payload.release(); + } } private void handlePubAck(MqttPubAckMessage pubAckMessage) { @@ -118,17 +117,18 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { private void processReceivedMessage(String topic, String content) { try { KafkaProducer kafkaProducer = SpringContextHolder.getBean(KafkaProducer.class); - if (topic.contains("cdzybms")){ - if (topic.contains("rsp")) { - //乐摇摇响应 - kafkaProducer.send("msg_lyy_rsp",content); - } else if (topic.contains("rpt")) { - //乐摇摇上报 - String[] parts = topic.split("/"); - String result = insertSnField(content, parts[parts.length - 1]); - kafkaProducer.send("msg_lyy_rpt",result); - } - } + if (topic.contains("cdzybms")) { + if (topic.contains("rsp")) { + //乐摇摇响应 + kafkaProducer.send("msg_lyy_rsp", content); + } else if (topic.contains("rpt")) { + //乐摇摇上报 + int lastSlash = topic.lastIndexOf('/'); + String sn = lastSlash >= 0 ? topic.substring(lastSlash + 1) : topic; + String result = insertSnField(content, sn); + kafkaProducer.send("msg_lyy_rpt", result); + } + } log.info("处理MQTT消息 - 主题: {}, 内容: {}", topic, content); if (topic.startsWith("sensor/")) { handleSensorData(topic, content); @@ -149,7 +149,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error("MQTT连接异常", cause); ctx.close(); } diff --git a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttPoolClient.java b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttPoolClient.java index a39c344..d3f3a0a 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttPoolClient.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttPoolClient.java @@ -7,6 +7,7 @@ import io.netty.handler.codec.mqtt.*; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.nio.charset.StandardCharsets; import java.util.concurrent.ThreadLocalRandom; /** @@ -45,7 +46,7 @@ public class MqttPoolClient { channel = connectionPool.getConnection(); - if (!isConnectionReady(channel)) { + if (isConnectionNotReady(channel)) { log.warn("连接不可用,尝试重新获取,主题: {}", topic); safelyInvalidateConnection(channel); channel = null; @@ -93,91 +94,18 @@ public class MqttPoolClient { return false; } - /** - * 订阅主题 - */ - public boolean subscribe(String topic) { - return subscribe(new String[]{topic}); - } - - /** - * 订阅多个主题 - */ - public boolean subscribe(String[] topics) { - Channel channel = null; - try { - channel = connectionPool.getConnection(); - if (!isConnectionReady(channel)) { - return false; - } - - for (String topic : topics) { - int messageId = generateMessageId(); - MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe() - .messageId(messageId) - .addSubscription(MqttQoS.AT_LEAST_ONCE,topic) - .build(); - - channel.writeAndFlush(subscribeMessage).sync(); - log.info("MQTT订阅成功,主题: {}, 消息ID: {}", String.join(",", topics), messageId); - } - return true; - - } catch (Exception e) { - log.error("MQTT订阅失败,主题: {}", String.join(",", topics), e); - return false; - } finally { - if (channel != null) { - connectionPool.returnConnection(channel); - } - } - } - - /** - * 取消订阅 - */ - public boolean unsubscribe(String topic) { - return unsubscribe(new String[]{topic}); - } - - public boolean unsubscribe(String[] topics) { - Channel channel = null; - try { - channel = connectionPool.getConnection(); - if (!isConnectionReady(channel)) { - return false; - } - - for (String topic : topics) { - int messageId = generateMessageId(); - - MqttUnsubscribeMessage unsubscribeMessage = MqttMessageBuilders.unsubscribe() - .messageId(messageId) - .addTopicFilter(topic) - .build(); - - channel.writeAndFlush(unsubscribeMessage).sync(); - log.info("MQTT取消订阅成功,主题: {}, 消息ID: {}", String.join(",", topics), messageId); - } - return true; - - } catch (Exception e) { - log.error("MQTT取消订阅失败,主题: {}", String.join(",", topics), e); - return false; - } finally { - if (channel != null) { - connectionPool.returnConnection(channel); - } - } - } - // ========== 私有方法 ========== - private boolean isConnectionReady(Channel channel) { - return channel != null && - channel.isActive() && - channel.isOpen() && - channel.isWritable(); + /** + * 通道是否不活跃 + * @param channel 通信 + * @return true 不活跃 false 活跃 + */ + private boolean isConnectionNotReady(Channel channel) { + return channel == null || + !channel.isActive() || + !channel.isOpen() || + !channel.isWritable(); } private void safelyInvalidateConnection(Channel channel) { @@ -196,9 +124,9 @@ public class MqttPoolClient { private MqttPublishMessage buildPublishMessage(String topic, MqttQoS qos, String payload, int messageId) { // 创建独立的 ByteBuf - ByteBuf payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8)); - + ByteBuf payloadBuf = null; try { + payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8)); return MqttMessageBuilders.publish() .topicName(topic) .qos(qos) @@ -207,43 +135,48 @@ public class MqttPoolClient { .payload(payloadBuf) .build(); } catch (Exception e) { - payloadBuf.release(); + if (payloadBuf != null && payloadBuf.refCnt() > 0) { + payloadBuf.release(); + } log.error("构建MQTT发布消息失败, 主题: {}", topic, e); throw new RuntimeException("构建MQTT消息失败", e); } } private boolean sendMessageSync(Channel channel, MqttPublishMessage message, String topic, int messageId) { - String content = null; - try { - ByteBuf payload = message.payload(); - // 在发送前提取内容用于日志 - try { - // 检查引用计数 - if (payload.refCnt() <= 0) { - log.warn("消息payload引用计数为0,主题: {}, 消息ID: {}", topic, messageId); - return false; - } - content = payload.toString(java.nio.charset.StandardCharsets.UTF_8); - } catch (Exception e) { - payload.release(); - log.warn("提取MQTT消息内容失败,主题: {}, 消息ID: {}", topic, messageId, e); - content = "[内容提取失败]"; - } + // 使用标志位跟踪是否需要释放 + boolean needRelease = true; - // 发送消息 + try { + // 1. 提取日志内容(安全方式) + String logContent = extractMessageContentForLog(message, topic, messageId); + + // 2. 发送消息 channel.writeAndFlush(message).sync(); - log.info("[MQTT发送] 主题: {}, QoS: {}, 消息ID: {}, 内容: {}", - topic, message.fixedHeader().qosLevel(), messageId, content); + // 3. 标记不需要释放(Netty会处理) + needRelease = false; + + // 4. 记录成功日志 + log.info("[MQTT发送成功] 主题: {}, QoS: {}, 消息ID: {}, 内容长度: {}", + topic, message.fixedHeader().qosLevel(), messageId, + logContent != null ? logContent.length() : 0); + + if (log.isDebugEnabled() && logContent != null) { + log.debug("[MQTT发送详情] 主题: {}, 内容: {}", topic, logContent); + } + return true; } catch (Exception e) { - log.error("同步发送MQTT消息异常,主题: {}, 消息ID: {}, 内容: {}", topic, messageId, content, e); - - // 发生异常时安全释放消息 - safelyReleaseMessage(message); + log.error("发送MQTT消息失败,主题: {}, 消息ID: {}", topic, messageId, e); return false; + + } finally { + // 只在需要时释放 + if (needRelease) { + safelyReleaseMessage(message); + } } } @@ -277,9 +210,42 @@ public class MqttPoolClient { } /** - * 打印连接池状态 + * 安全提取消息内容用于日志,不影响原消息 */ - public void printPoolStatus() { - connectionPool.logPoolStatus(); + private String extractMessageContentForLog(MqttPublishMessage message, String topic, int messageId) { + if (message == null) { + return null; + } + + ByteBuf payload = message.payload(); + if (payload == null) { + return "[空消息]"; + } + + try { + // 创建视图,不影响引用计数 + ByteBuf slice = payload.slice(); + int maxSize = 200; // 限制日志大小 + int bytesToRead = Math.min(slice.readableBytes(), maxSize); + + if (bytesToRead <= 0) { + return "[空内容]"; + } + + byte[] bytes = new byte[bytesToRead]; + slice.readBytes(bytes); + String content = new String(bytes, StandardCharsets.UTF_8); + + if (slice.readableBytes() > 0) { + content += String.format("...[已截断,总长度:%d]", slice.readableBytes() + bytesToRead); + } + + return content; + + } catch (Exception e) { + log.warn("提取消息内容用于日志失败,主题: {}, 消息ID: {}", topic, messageId, e); + return "[内容提取失败]"; + } } + } \ No newline at end of file