Compare commits

..

No commits in common. "d462f96f8ba10160e5df0613414d0dd14ac3f0db878ed3914d4ba7e67c371833" and "a47d23722eaf4370a138a1a401f8cbde243af4a6d788a947b3d6fc710e3fcc82" have entirely different histories.

2 changed files with 145 additions and 111 deletions

View File

@ -77,25 +77,26 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) { private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) {
String topic = publishMessage.variableHeader().topicName(); String topic = publishMessage.variableHeader().topicName();
ByteBuf payload = publishMessage.payload(); ByteBuf payload = publishMessage.payload();
try { try {
// 提取内容
String content = payload.toString(StandardCharsets.UTF_8); String content = payload.toString(StandardCharsets.UTF_8);
// QoS响应 log.info("收到MQTT消息 - 主题: {}, QoS: {}, 内容: {}",
topic, publishMessage.fixedHeader().qosLevel(), content);
// 根据QoS级别进行响应
MqttQoS qos = publishMessage.fixedHeader().qosLevel(); MqttQoS qos = publishMessage.fixedHeader().qosLevel();
if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) { if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) {
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck() MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck()
.packetId(publishMessage.variableHeader().packetId()) .packetId(publishMessage.variableHeader().packetId())
.build(); .build();
ctx.writeAndFlush(pubAckMessage); ctx.writeAndFlush(pubAckMessage);
log.debug("已发送PUBACK响应, 消息ID: {}", publishMessage.variableHeader().packetId());
} }
// 处理消息内容
processReceivedMessage(topic, content); processReceivedMessage(topic, content);
}catch (Exception e){ }catch (Exception e){
log.error("处理MQTT消息异常主题: {}", topic, e);
}finally {
payload.release(); payload.release();
log.error(e.getMessage());
} }
} }
@ -123,9 +124,8 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
kafkaProducer.send("msg_lyy_rsp",content); kafkaProducer.send("msg_lyy_rsp",content);
} else if (topic.contains("rpt")) { } else if (topic.contains("rpt")) {
//乐摇摇上报 //乐摇摇上报
int lastSlash = topic.lastIndexOf('/'); String[] parts = topic.split("/");
String sn = lastSlash >= 0 ? topic.substring(lastSlash + 1) : topic; String result = insertSnField(content, parts[parts.length - 1]);
String result = insertSnField(content, sn);
kafkaProducer.send("msg_lyy_rpt",result); kafkaProducer.send("msg_lyy_rpt",result);
} }
} }

View File

@ -7,7 +7,6 @@ import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
/** /**
@ -46,7 +45,7 @@ public class MqttPoolClient {
channel = connectionPool.getConnection(); channel = connectionPool.getConnection();
if (isConnectionNotReady(channel)) { if (!isConnectionReady(channel)) {
log.warn("连接不可用,尝试重新获取,主题: {}", topic); log.warn("连接不可用,尝试重新获取,主题: {}", topic);
safelyInvalidateConnection(channel); safelyInvalidateConnection(channel);
channel = null; channel = null;
@ -94,18 +93,91 @@ public class MqttPoolClient {
return false; return false;
} }
// ========== 私有方法 ========== /**
* 订阅主题
*/
public boolean subscribe(String topic) {
return subscribe(new String[]{topic});
}
/** /**
* 通道是否不活跃 * 订阅多个主题
* @param channel 通信
* @return true 不活跃 false 活跃
*/ */
private boolean isConnectionNotReady(Channel channel) { public boolean subscribe(String[] topics) {
return channel == null || Channel channel = null;
!channel.isActive() || try {
!channel.isOpen() || channel = connectionPool.getConnection();
!channel.isWritable(); if (!isConnectionReady(channel)) {
return false;
}
for (String topic : topics) {
int messageId = generateMessageId();
MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe()
.messageId(messageId)
.addSubscription(MqttQoS.AT_LEAST_ONCE,topic)
.build();
channel.writeAndFlush(subscribeMessage).sync();
log.info("MQTT订阅成功主题: {}, 消息ID: {}", String.join(",", topics), messageId);
}
return true;
} catch (Exception e) {
log.error("MQTT订阅失败主题: {}", String.join(",", topics), e);
return false;
} finally {
if (channel != null) {
connectionPool.returnConnection(channel);
}
}
}
/**
* 取消订阅
*/
public boolean unsubscribe(String topic) {
return unsubscribe(new String[]{topic});
}
public boolean unsubscribe(String[] topics) {
Channel channel = null;
try {
channel = connectionPool.getConnection();
if (!isConnectionReady(channel)) {
return false;
}
for (String topic : topics) {
int messageId = generateMessageId();
MqttUnsubscribeMessage unsubscribeMessage = MqttMessageBuilders.unsubscribe()
.messageId(messageId)
.addTopicFilter(topic)
.build();
channel.writeAndFlush(unsubscribeMessage).sync();
log.info("MQTT取消订阅成功主题: {}, 消息ID: {}", String.join(",", topics), messageId);
}
return true;
} catch (Exception e) {
log.error("MQTT取消订阅失败主题: {}", String.join(",", topics), e);
return false;
} finally {
if (channel != null) {
connectionPool.returnConnection(channel);
}
}
}
// ========== 私有方法 ==========
private boolean isConnectionReady(Channel channel) {
return channel != null &&
channel.isActive() &&
channel.isOpen() &&
channel.isWritable();
} }
private void safelyInvalidateConnection(Channel channel) { private void safelyInvalidateConnection(Channel channel) {
@ -124,9 +196,9 @@ public class MqttPoolClient {
private MqttPublishMessage buildPublishMessage(String topic, MqttQoS qos, String payload, int messageId) { private MqttPublishMessage buildPublishMessage(String topic, MqttQoS qos, String payload, int messageId) {
// 创建独立的 ByteBuf // 创建独立的 ByteBuf
ByteBuf payloadBuf = null; ByteBuf payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8));
try { try {
payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8));
return MqttMessageBuilders.publish() return MqttMessageBuilders.publish()
.topicName(topic) .topicName(topic)
.qos(qos) .qos(qos)
@ -135,48 +207,43 @@ public class MqttPoolClient {
.payload(payloadBuf) .payload(payloadBuf)
.build(); .build();
} catch (Exception e) { } catch (Exception e) {
if (payloadBuf != null && payloadBuf.refCnt() > 0) {
payloadBuf.release(); payloadBuf.release();
}
log.error("构建MQTT发布消息失败, 主题: {}", topic, e); log.error("构建MQTT发布消息失败, 主题: {}", topic, e);
throw new RuntimeException("构建MQTT消息失败", e); throw new RuntimeException("构建MQTT消息失败", e);
} }
} }
private boolean sendMessageSync(Channel channel, MqttPublishMessage message, String topic, int messageId) { private boolean sendMessageSync(Channel channel, MqttPublishMessage message, String topic, int messageId) {
// 使用标志位跟踪是否需要释放 String content = null;
boolean needRelease = true;
try { try {
// 1. 提取日志内容安全方式 ByteBuf payload = message.payload();
String logContent = extractMessageContentForLog(message, topic, messageId); // 在发送前提取内容用于日志
try {
// 2. 发送消息 // 检查引用计数
channel.writeAndFlush(message).sync(); if (payload.refCnt() <= 0) {
log.warn("消息payload引用计数为0主题: {}, 消息ID: {}", topic, messageId);
// 3. 标记不需要释放Netty会处理 return false;
needRelease = false; }
content = payload.toString(java.nio.charset.StandardCharsets.UTF_8);
// 4. 记录成功日志 } catch (Exception e) {
log.info("[MQTT发送成功] 主题: {}, QoS: {}, 消息ID: {}, 内容长度: {}", payload.release();
topic, message.fixedHeader().qosLevel(), messageId, log.warn("提取MQTT消息内容失败主题: {}, 消息ID: {}", topic, messageId, e);
logContent != null ? logContent.length() : 0); content = "[内容提取失败]";
if (log.isDebugEnabled() && logContent != null) {
log.debug("[MQTT发送详情] 主题: {}, 内容: {}", topic, logContent);
} }
// 发送消息
channel.writeAndFlush(message).sync();
log.info("[MQTT发送] 主题: {}, QoS: {}, 消息ID: {}, 内容: {}",
topic, message.fixedHeader().qosLevel(), messageId, content);
return true; return true;
} catch (Exception e) { } catch (Exception e) {
log.error("发送MQTT消息失败主题: {}, 消息ID: {}", topic, messageId, e); log.error("同步发送MQTT消息异常主题: {}, 消息ID: {}, 内容: {}", topic, messageId, content, e);
return false;
} finally { // 发生异常时安全释放消息
// 只在需要时释放
if (needRelease) {
safelyReleaseMessage(message); safelyReleaseMessage(message);
} return false;
} }
} }
@ -210,42 +277,9 @@ public class MqttPoolClient {
} }
/** /**
* 安全提取消息内容用于日志不影响原消息 * 打印连接池状态
*/ */
private String extractMessageContentForLog(MqttPublishMessage message, String topic, int messageId) { public void printPoolStatus() {
if (message == null) { connectionPool.logPoolStatus();
return null;
}
ByteBuf payload = message.payload();
if (payload == null) {
return "[空消息]";
}
try {
// 创建视图不影响引用计数
ByteBuf slice = payload.slice();
int maxSize = 200; // 限制日志大小
int bytesToRead = Math.min(slice.readableBytes(), maxSize);
if (bytesToRead <= 0) {
return "[空内容]";
}
byte[] bytes = new byte[bytesToRead];
slice.readBytes(bytes);
String content = new String(bytes, StandardCharsets.UTF_8);
if (slice.readableBytes() > 0) {
content += String.format("...[已截断,总长度:%d]", slice.readableBytes() + bytesToRead);
}
return content;
} catch (Exception e) {
log.warn("提取消息内容用于日志失败,主题: {}, 消息ID: {}", topic, messageId, e);
return "[内容提取失败]";
} }
} }
}