Compare commits

...

2 Commits

Author SHA256 Message Date
d462f96f8b Merge remote-tracking branch 'origin/main' 2025-12-24 18:15:55 +08:00
f802afd06d 修复可能潜在的内存泄露风险 2025-12-24 18:15:47 +08:00
2 changed files with 112 additions and 146 deletions

View File

@ -23,7 +23,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
private static final ObjectMapper objectMapper = new ObjectMapper(); private static final ObjectMapper objectMapper = new ObjectMapper();
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg){ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof MqttMessage mqttMessage) { if (msg instanceof MqttMessage mqttMessage) {
handleMqttMessage(ctx, mqttMessage); handleMqttMessage(ctx, mqttMessage);
} else { } else {
@ -38,7 +38,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
case CONNACK: case CONNACK:
// CONNACK已经在专门的handler中处理这里不应该收到 // CONNACK已经在专门的handler中处理这里不应该收到
log.warn("在MqttHandler中收到CONNACK可能配置有误"); log.warn("在MqttHandler中收到CONNACK可能配置有误");
handleConnAck(ctx,(MqttConnAckMessage)mqttMessage); handleConnAck(ctx, (MqttConnAckMessage) mqttMessage);
break; break;
case PUBLISH: case PUBLISH:
handlePublish(ctx, (MqttPublishMessage) mqttMessage); handlePublish(ctx, (MqttPublishMessage) mqttMessage);
@ -77,26 +77,25 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) { private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) {
String topic = publishMessage.variableHeader().topicName(); String topic = publishMessage.variableHeader().topicName();
ByteBuf payload = publishMessage.payload(); ByteBuf payload = publishMessage.payload();
try { try {
// 提取内容
String content = payload.toString(StandardCharsets.UTF_8); String content = payload.toString(StandardCharsets.UTF_8);
log.info("收到MQTT消息 - 主题: {}, QoS: {}, 内容: {}", // QoS响应
topic, publishMessage.fixedHeader().qosLevel(), content);
// 根据QoS级别进行响应
MqttQoS qos = publishMessage.fixedHeader().qosLevel(); MqttQoS qos = publishMessage.fixedHeader().qosLevel();
if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) { if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) {
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck() MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck()
.packetId(publishMessage.variableHeader().packetId()) .packetId(publishMessage.variableHeader().packetId())
.build(); .build();
ctx.writeAndFlush(pubAckMessage); ctx.writeAndFlush(pubAckMessage);
log.debug("已发送PUBACK响应, 消息ID: {}", publishMessage.variableHeader().packetId());
} }
// 处理消息内容
processReceivedMessage(topic, content); processReceivedMessage(topic, content);
}catch (Exception e){ } catch (Exception e) {
log.error("处理MQTT消息异常主题: {}", topic, e);
}finally {
payload.release(); payload.release();
log.error(e.getMessage());
} }
} }
@ -118,15 +117,16 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
private void processReceivedMessage(String topic, String content) { private void processReceivedMessage(String topic, String content) {
try { try {
KafkaProducer kafkaProducer = SpringContextHolder.getBean(KafkaProducer.class); KafkaProducer kafkaProducer = SpringContextHolder.getBean(KafkaProducer.class);
if (topic.contains("cdzybms")){ if (topic.contains("cdzybms")) {
if (topic.contains("rsp")) { if (topic.contains("rsp")) {
//乐摇摇响应 //乐摇摇响应
kafkaProducer.send("msg_lyy_rsp",content); kafkaProducer.send("msg_lyy_rsp", content);
} else if (topic.contains("rpt")) { } else if (topic.contains("rpt")) {
//乐摇摇上报 //乐摇摇上报
String[] parts = topic.split("/"); int lastSlash = topic.lastIndexOf('/');
String result = insertSnField(content, parts[parts.length - 1]); String sn = lastSlash >= 0 ? topic.substring(lastSlash + 1) : topic;
kafkaProducer.send("msg_lyy_rpt",result); String result = insertSnField(content, sn);
kafkaProducer.send("msg_lyy_rpt", result);
} }
} }
log.info("处理MQTT消息 - 主题: {}, 内容: {}", topic, content); log.info("处理MQTT消息 - 主题: {}, 内容: {}", topic, content);
@ -149,7 +149,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("MQTT连接异常", cause); log.error("MQTT连接异常", cause);
ctx.close(); ctx.close();
} }

View File

@ -7,6 +7,7 @@ import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
/** /**
@ -45,7 +46,7 @@ public class MqttPoolClient {
channel = connectionPool.getConnection(); channel = connectionPool.getConnection();
if (!isConnectionReady(channel)) { if (isConnectionNotReady(channel)) {
log.warn("连接不可用,尝试重新获取,主题: {}", topic); log.warn("连接不可用,尝试重新获取,主题: {}", topic);
safelyInvalidateConnection(channel); safelyInvalidateConnection(channel);
channel = null; channel = null;
@ -93,91 +94,18 @@ public class MqttPoolClient {
return false; return false;
} }
/**
* 订阅主题
*/
public boolean subscribe(String topic) {
return subscribe(new String[]{topic});
}
/**
* 订阅多个主题
*/
public boolean subscribe(String[] topics) {
Channel channel = null;
try {
channel = connectionPool.getConnection();
if (!isConnectionReady(channel)) {
return false;
}
for (String topic : topics) {
int messageId = generateMessageId();
MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe()
.messageId(messageId)
.addSubscription(MqttQoS.AT_LEAST_ONCE,topic)
.build();
channel.writeAndFlush(subscribeMessage).sync();
log.info("MQTT订阅成功主题: {}, 消息ID: {}", String.join(",", topics), messageId);
}
return true;
} catch (Exception e) {
log.error("MQTT订阅失败主题: {}", String.join(",", topics), e);
return false;
} finally {
if (channel != null) {
connectionPool.returnConnection(channel);
}
}
}
/**
* 取消订阅
*/
public boolean unsubscribe(String topic) {
return unsubscribe(new String[]{topic});
}
public boolean unsubscribe(String[] topics) {
Channel channel = null;
try {
channel = connectionPool.getConnection();
if (!isConnectionReady(channel)) {
return false;
}
for (String topic : topics) {
int messageId = generateMessageId();
MqttUnsubscribeMessage unsubscribeMessage = MqttMessageBuilders.unsubscribe()
.messageId(messageId)
.addTopicFilter(topic)
.build();
channel.writeAndFlush(unsubscribeMessage).sync();
log.info("MQTT取消订阅成功主题: {}, 消息ID: {}", String.join(",", topics), messageId);
}
return true;
} catch (Exception e) {
log.error("MQTT取消订阅失败主题: {}", String.join(",", topics), e);
return false;
} finally {
if (channel != null) {
connectionPool.returnConnection(channel);
}
}
}
// ========== 私有方法 ========== // ========== 私有方法 ==========
private boolean isConnectionReady(Channel channel) { /**
return channel != null && * 通道是否不活跃
channel.isActive() && * @param channel 通信
channel.isOpen() && * @return true 不活跃 false 活跃
channel.isWritable(); */
private boolean isConnectionNotReady(Channel channel) {
return channel == null ||
!channel.isActive() ||
!channel.isOpen() ||
!channel.isWritable();
} }
private void safelyInvalidateConnection(Channel channel) { private void safelyInvalidateConnection(Channel channel) {
@ -196,9 +124,9 @@ public class MqttPoolClient {
private MqttPublishMessage buildPublishMessage(String topic, MqttQoS qos, String payload, int messageId) { private MqttPublishMessage buildPublishMessage(String topic, MqttQoS qos, String payload, int messageId) {
// 创建独立的 ByteBuf // 创建独立的 ByteBuf
ByteBuf payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8)); ByteBuf payloadBuf = null;
try { try {
payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8));
return MqttMessageBuilders.publish() return MqttMessageBuilders.publish()
.topicName(topic) .topicName(topic)
.qos(qos) .qos(qos)
@ -207,43 +135,48 @@ public class MqttPoolClient {
.payload(payloadBuf) .payload(payloadBuf)
.build(); .build();
} catch (Exception e) { } catch (Exception e) {
if (payloadBuf != null && payloadBuf.refCnt() > 0) {
payloadBuf.release(); payloadBuf.release();
}
log.error("构建MQTT发布消息失败, 主题: {}", topic, e); log.error("构建MQTT发布消息失败, 主题: {}", topic, e);
throw new RuntimeException("构建MQTT消息失败", e); throw new RuntimeException("构建MQTT消息失败", e);
} }
} }
private boolean sendMessageSync(Channel channel, MqttPublishMessage message, String topic, int messageId) { private boolean sendMessageSync(Channel channel, MqttPublishMessage message, String topic, int messageId) {
String content = null; // 使用标志位跟踪是否需要释放
try { boolean needRelease = true;
ByteBuf payload = message.payload();
// 在发送前提取内容用于日志
try {
// 检查引用计数
if (payload.refCnt() <= 0) {
log.warn("消息payload引用计数为0主题: {}, 消息ID: {}", topic, messageId);
return false;
}
content = payload.toString(java.nio.charset.StandardCharsets.UTF_8);
} catch (Exception e) {
payload.release();
log.warn("提取MQTT消息内容失败主题: {}, 消息ID: {}", topic, messageId, e);
content = "[内容提取失败]";
}
// 发送消息 try {
// 1. 提取日志内容安全方式
String logContent = extractMessageContentForLog(message, topic, messageId);
// 2. 发送消息
channel.writeAndFlush(message).sync(); channel.writeAndFlush(message).sync();
log.info("[MQTT发送] 主题: {}, QoS: {}, 消息ID: {}, 内容: {}", // 3. 标记不需要释放Netty会处理
topic, message.fixedHeader().qosLevel(), messageId, content); needRelease = false;
// 4. 记录成功日志
log.info("[MQTT发送成功] 主题: {}, QoS: {}, 消息ID: {}, 内容长度: {}",
topic, message.fixedHeader().qosLevel(), messageId,
logContent != null ? logContent.length() : 0);
if (log.isDebugEnabled() && logContent != null) {
log.debug("[MQTT发送详情] 主题: {}, 内容: {}", topic, logContent);
}
return true; return true;
} catch (Exception e) { } catch (Exception e) {
log.error("同步发送MQTT消息异常主题: {}, 消息ID: {}, 内容: {}", topic, messageId, content, e); log.error("发送MQTT消息失败主题: {}, 消息ID: {}", topic, messageId, e);
// 发生异常时安全释放消息
safelyReleaseMessage(message);
return false; return false;
} finally {
// 只在需要时释放
if (needRelease) {
safelyReleaseMessage(message);
}
} }
} }
@ -277,9 +210,42 @@ public class MqttPoolClient {
} }
/** /**
* 打印连接池状态 * 安全提取消息内容用于日志不影响原消息
*/ */
public void printPoolStatus() { private String extractMessageContentForLog(MqttPublishMessage message, String topic, int messageId) {
connectionPool.logPoolStatus(); if (message == null) {
return null;
} }
ByteBuf payload = message.payload();
if (payload == null) {
return "[空消息]";
}
try {
// 创建视图不影响引用计数
ByteBuf slice = payload.slice();
int maxSize = 200; // 限制日志大小
int bytesToRead = Math.min(slice.readableBytes(), maxSize);
if (bytesToRead <= 0) {
return "[空内容]";
}
byte[] bytes = new byte[bytesToRead];
slice.readBytes(bytes);
String content = new String(bytes, StandardCharsets.UTF_8);
if (slice.readableBytes() > 0) {
content += String.format("...[已截断,总长度:%d]", slice.readableBytes() + bytesToRead);
}
return content;
} catch (Exception e) {
log.warn("提取消息内容用于日志失败,主题: {}, 消息ID: {}", topic, messageId, e);
return "[内容提取失败]";
}
}
} }