Compare commits
2 Commits
a47d23722e
...
d462f96f8b
| Author | SHA256 | Date | |
|---|---|---|---|
| d462f96f8b | |||
| f802afd06d |
@ -23,7 +23,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
|
||||
private static final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg){
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
if (msg instanceof MqttMessage mqttMessage) {
|
||||
handleMqttMessage(ctx, mqttMessage);
|
||||
} else {
|
||||
@ -38,7 +38,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
|
||||
case CONNACK:
|
||||
// CONNACK已经在专门的handler中处理,这里不应该收到
|
||||
log.warn("在MqttHandler中收到CONNACK,可能配置有误");
|
||||
handleConnAck(ctx,(MqttConnAckMessage)mqttMessage);
|
||||
handleConnAck(ctx, (MqttConnAckMessage) mqttMessage);
|
||||
break;
|
||||
case PUBLISH:
|
||||
handlePublish(ctx, (MqttPublishMessage) mqttMessage);
|
||||
@ -77,27 +77,26 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
|
||||
private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) {
|
||||
String topic = publishMessage.variableHeader().topicName();
|
||||
ByteBuf payload = publishMessage.payload();
|
||||
try {
|
||||
String content = payload.toString(StandardCharsets.UTF_8);
|
||||
|
||||
log.info("收到MQTT消息 - 主题: {}, QoS: {}, 内容: {}",
|
||||
topic, publishMessage.fixedHeader().qosLevel(), content);
|
||||
try {
|
||||
// 提取内容
|
||||
String content = payload.toString(StandardCharsets.UTF_8);
|
||||
|
||||
// 根据QoS级别进行响应
|
||||
MqttQoS qos = publishMessage.fixedHeader().qosLevel();
|
||||
if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) {
|
||||
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck()
|
||||
.packetId(publishMessage.variableHeader().packetId())
|
||||
.build();
|
||||
ctx.writeAndFlush(pubAckMessage);
|
||||
log.debug("已发送PUBACK响应, 消息ID: {}", publishMessage.variableHeader().packetId());
|
||||
}
|
||||
|
||||
processReceivedMessage(topic, content);
|
||||
}catch (Exception e){
|
||||
payload.release();
|
||||
log.error(e.getMessage());
|
||||
}
|
||||
// QoS响应
|
||||
MqttQoS qos = publishMessage.fixedHeader().qosLevel();
|
||||
if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) {
|
||||
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck()
|
||||
.packetId(publishMessage.variableHeader().packetId())
|
||||
.build();
|
||||
ctx.writeAndFlush(pubAckMessage);
|
||||
}
|
||||
// 处理消息内容
|
||||
processReceivedMessage(topic, content);
|
||||
} catch (Exception e) {
|
||||
log.error("处理MQTT消息异常,主题: {}", topic, e);
|
||||
}finally {
|
||||
payload.release();
|
||||
}
|
||||
}
|
||||
|
||||
private void handlePubAck(MqttPubAckMessage pubAckMessage) {
|
||||
@ -118,17 +117,18 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
|
||||
private void processReceivedMessage(String topic, String content) {
|
||||
try {
|
||||
KafkaProducer kafkaProducer = SpringContextHolder.getBean(KafkaProducer.class);
|
||||
if (topic.contains("cdzybms")){
|
||||
if (topic.contains("rsp")) {
|
||||
//乐摇摇响应
|
||||
kafkaProducer.send("msg_lyy_rsp",content);
|
||||
} else if (topic.contains("rpt")) {
|
||||
//乐摇摇上报
|
||||
String[] parts = topic.split("/");
|
||||
String result = insertSnField(content, parts[parts.length - 1]);
|
||||
kafkaProducer.send("msg_lyy_rpt",result);
|
||||
}
|
||||
}
|
||||
if (topic.contains("cdzybms")) {
|
||||
if (topic.contains("rsp")) {
|
||||
//乐摇摇响应
|
||||
kafkaProducer.send("msg_lyy_rsp", content);
|
||||
} else if (topic.contains("rpt")) {
|
||||
//乐摇摇上报
|
||||
int lastSlash = topic.lastIndexOf('/');
|
||||
String sn = lastSlash >= 0 ? topic.substring(lastSlash + 1) : topic;
|
||||
String result = insertSnField(content, sn);
|
||||
kafkaProducer.send("msg_lyy_rpt", result);
|
||||
}
|
||||
}
|
||||
log.info("处理MQTT消息 - 主题: {}, 内容: {}", topic, content);
|
||||
if (topic.startsWith("sensor/")) {
|
||||
handleSensorData(topic, content);
|
||||
@ -149,7 +149,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
log.error("MQTT连接异常", cause);
|
||||
ctx.close();
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ import io.netty.handler.codec.mqtt.*;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
/**
|
||||
@ -45,7 +46,7 @@ public class MqttPoolClient {
|
||||
|
||||
channel = connectionPool.getConnection();
|
||||
|
||||
if (!isConnectionReady(channel)) {
|
||||
if (isConnectionNotReady(channel)) {
|
||||
log.warn("连接不可用,尝试重新获取,主题: {}", topic);
|
||||
safelyInvalidateConnection(channel);
|
||||
channel = null;
|
||||
@ -93,91 +94,18 @@ public class MqttPoolClient {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 订阅主题
|
||||
*/
|
||||
public boolean subscribe(String topic) {
|
||||
return subscribe(new String[]{topic});
|
||||
}
|
||||
|
||||
/**
|
||||
* 订阅多个主题
|
||||
*/
|
||||
public boolean subscribe(String[] topics) {
|
||||
Channel channel = null;
|
||||
try {
|
||||
channel = connectionPool.getConnection();
|
||||
if (!isConnectionReady(channel)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (String topic : topics) {
|
||||
int messageId = generateMessageId();
|
||||
MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe()
|
||||
.messageId(messageId)
|
||||
.addSubscription(MqttQoS.AT_LEAST_ONCE,topic)
|
||||
.build();
|
||||
|
||||
channel.writeAndFlush(subscribeMessage).sync();
|
||||
log.info("MQTT订阅成功,主题: {}, 消息ID: {}", String.join(",", topics), messageId);
|
||||
}
|
||||
return true;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("MQTT订阅失败,主题: {}", String.join(",", topics), e);
|
||||
return false;
|
||||
} finally {
|
||||
if (channel != null) {
|
||||
connectionPool.returnConnection(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 取消订阅
|
||||
*/
|
||||
public boolean unsubscribe(String topic) {
|
||||
return unsubscribe(new String[]{topic});
|
||||
}
|
||||
|
||||
public boolean unsubscribe(String[] topics) {
|
||||
Channel channel = null;
|
||||
try {
|
||||
channel = connectionPool.getConnection();
|
||||
if (!isConnectionReady(channel)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (String topic : topics) {
|
||||
int messageId = generateMessageId();
|
||||
|
||||
MqttUnsubscribeMessage unsubscribeMessage = MqttMessageBuilders.unsubscribe()
|
||||
.messageId(messageId)
|
||||
.addTopicFilter(topic)
|
||||
.build();
|
||||
|
||||
channel.writeAndFlush(unsubscribeMessage).sync();
|
||||
log.info("MQTT取消订阅成功,主题: {}, 消息ID: {}", String.join(",", topics), messageId);
|
||||
}
|
||||
return true;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("MQTT取消订阅失败,主题: {}", String.join(",", topics), e);
|
||||
return false;
|
||||
} finally {
|
||||
if (channel != null) {
|
||||
connectionPool.returnConnection(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ========== 私有方法 ==========
|
||||
|
||||
private boolean isConnectionReady(Channel channel) {
|
||||
return channel != null &&
|
||||
channel.isActive() &&
|
||||
channel.isOpen() &&
|
||||
channel.isWritable();
|
||||
/**
|
||||
* 通道是否不活跃
|
||||
* @param channel 通信
|
||||
* @return true 不活跃 false 活跃
|
||||
*/
|
||||
private boolean isConnectionNotReady(Channel channel) {
|
||||
return channel == null ||
|
||||
!channel.isActive() ||
|
||||
!channel.isOpen() ||
|
||||
!channel.isWritable();
|
||||
}
|
||||
|
||||
private void safelyInvalidateConnection(Channel channel) {
|
||||
@ -196,9 +124,9 @@ public class MqttPoolClient {
|
||||
|
||||
private MqttPublishMessage buildPublishMessage(String topic, MqttQoS qos, String payload, int messageId) {
|
||||
// 创建独立的 ByteBuf
|
||||
ByteBuf payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8));
|
||||
|
||||
ByteBuf payloadBuf = null;
|
||||
try {
|
||||
payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8));
|
||||
return MqttMessageBuilders.publish()
|
||||
.topicName(topic)
|
||||
.qos(qos)
|
||||
@ -207,43 +135,48 @@ public class MqttPoolClient {
|
||||
.payload(payloadBuf)
|
||||
.build();
|
||||
} catch (Exception e) {
|
||||
payloadBuf.release();
|
||||
if (payloadBuf != null && payloadBuf.refCnt() > 0) {
|
||||
payloadBuf.release();
|
||||
}
|
||||
log.error("构建MQTT发布消息失败, 主题: {}", topic, e);
|
||||
throw new RuntimeException("构建MQTT消息失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean sendMessageSync(Channel channel, MqttPublishMessage message, String topic, int messageId) {
|
||||
String content = null;
|
||||
try {
|
||||
ByteBuf payload = message.payload();
|
||||
// 在发送前提取内容用于日志
|
||||
try {
|
||||
// 检查引用计数
|
||||
if (payload.refCnt() <= 0) {
|
||||
log.warn("消息payload引用计数为0,主题: {}, 消息ID: {}", topic, messageId);
|
||||
return false;
|
||||
}
|
||||
content = payload.toString(java.nio.charset.StandardCharsets.UTF_8);
|
||||
} catch (Exception e) {
|
||||
payload.release();
|
||||
log.warn("提取MQTT消息内容失败,主题: {}, 消息ID: {}", topic, messageId, e);
|
||||
content = "[内容提取失败]";
|
||||
}
|
||||
// 使用标志位跟踪是否需要释放
|
||||
boolean needRelease = true;
|
||||
|
||||
// 发送消息
|
||||
try {
|
||||
// 1. 提取日志内容(安全方式)
|
||||
String logContent = extractMessageContentForLog(message, topic, messageId);
|
||||
|
||||
// 2. 发送消息
|
||||
channel.writeAndFlush(message).sync();
|
||||
|
||||
log.info("[MQTT发送] 主题: {}, QoS: {}, 消息ID: {}, 内容: {}",
|
||||
topic, message.fixedHeader().qosLevel(), messageId, content);
|
||||
// 3. 标记不需要释放(Netty会处理)
|
||||
needRelease = false;
|
||||
|
||||
// 4. 记录成功日志
|
||||
log.info("[MQTT发送成功] 主题: {}, QoS: {}, 消息ID: {}, 内容长度: {}",
|
||||
topic, message.fixedHeader().qosLevel(), messageId,
|
||||
logContent != null ? logContent.length() : 0);
|
||||
|
||||
if (log.isDebugEnabled() && logContent != null) {
|
||||
log.debug("[MQTT发送详情] 主题: {}, 内容: {}", topic, logContent);
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("同步发送MQTT消息异常,主题: {}, 消息ID: {}, 内容: {}", topic, messageId, content, e);
|
||||
|
||||
// 发生异常时安全释放消息
|
||||
safelyReleaseMessage(message);
|
||||
log.error("发送MQTT消息失败,主题: {}, 消息ID: {}", topic, messageId, e);
|
||||
return false;
|
||||
|
||||
} finally {
|
||||
// 只在需要时释放
|
||||
if (needRelease) {
|
||||
safelyReleaseMessage(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -277,9 +210,42 @@ public class MqttPoolClient {
|
||||
}
|
||||
|
||||
/**
|
||||
* 打印连接池状态
|
||||
* 安全提取消息内容用于日志,不影响原消息
|
||||
*/
|
||||
public void printPoolStatus() {
|
||||
connectionPool.logPoolStatus();
|
||||
private String extractMessageContentForLog(MqttPublishMessage message, String topic, int messageId) {
|
||||
if (message == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
ByteBuf payload = message.payload();
|
||||
if (payload == null) {
|
||||
return "[空消息]";
|
||||
}
|
||||
|
||||
try {
|
||||
// 创建视图,不影响引用计数
|
||||
ByteBuf slice = payload.slice();
|
||||
int maxSize = 200; // 限制日志大小
|
||||
int bytesToRead = Math.min(slice.readableBytes(), maxSize);
|
||||
|
||||
if (bytesToRead <= 0) {
|
||||
return "[空内容]";
|
||||
}
|
||||
|
||||
byte[] bytes = new byte[bytesToRead];
|
||||
slice.readBytes(bytes);
|
||||
String content = new String(bytes, StandardCharsets.UTF_8);
|
||||
|
||||
if (slice.readableBytes() > 0) {
|
||||
content += String.format("...[已截断,总长度:%d]", slice.readableBytes() + bytesToRead);
|
||||
}
|
||||
|
||||
return content;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("提取消息内容用于日志失败,主题: {}, 消息ID: {}", topic, messageId, e);
|
||||
return "[内容提取失败]";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user