Compare commits
No commits in common. "d462f96f8ba10160e5df0613414d0dd14ac3f0db878ed3914d4ba7e67c371833" and "a47d23722eaf4370a138a1a401f8cbde243af4a6d788a947b3d6fc710e3fcc82" have entirely different histories.
d462f96f8b
...
a47d23722e
@ -77,25 +77,26 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
|
|||||||
private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) {
|
private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) {
|
||||||
String topic = publishMessage.variableHeader().topicName();
|
String topic = publishMessage.variableHeader().topicName();
|
||||||
ByteBuf payload = publishMessage.payload();
|
ByteBuf payload = publishMessage.payload();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 提取内容
|
|
||||||
String content = payload.toString(StandardCharsets.UTF_8);
|
String content = payload.toString(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
// QoS响应
|
log.info("收到MQTT消息 - 主题: {}, QoS: {}, 内容: {}",
|
||||||
|
topic, publishMessage.fixedHeader().qosLevel(), content);
|
||||||
|
|
||||||
|
// 根据QoS级别进行响应
|
||||||
MqttQoS qos = publishMessage.fixedHeader().qosLevel();
|
MqttQoS qos = publishMessage.fixedHeader().qosLevel();
|
||||||
if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) {
|
if (qos.value() >= MqttQoS.AT_LEAST_ONCE.value()) {
|
||||||
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck()
|
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageBuilders.pubAck()
|
||||||
.packetId(publishMessage.variableHeader().packetId())
|
.packetId(publishMessage.variableHeader().packetId())
|
||||||
.build();
|
.build();
|
||||||
ctx.writeAndFlush(pubAckMessage);
|
ctx.writeAndFlush(pubAckMessage);
|
||||||
|
log.debug("已发送PUBACK响应, 消息ID: {}", publishMessage.variableHeader().packetId());
|
||||||
}
|
}
|
||||||
// 处理消息内容
|
|
||||||
processReceivedMessage(topic, content);
|
processReceivedMessage(topic, content);
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
log.error("处理MQTT消息异常,主题: {}", topic, e);
|
|
||||||
}finally {
|
|
||||||
payload.release();
|
payload.release();
|
||||||
|
log.error(e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -123,9 +124,8 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
|
|||||||
kafkaProducer.send("msg_lyy_rsp",content);
|
kafkaProducer.send("msg_lyy_rsp",content);
|
||||||
} else if (topic.contains("rpt")) {
|
} else if (topic.contains("rpt")) {
|
||||||
//乐摇摇上报
|
//乐摇摇上报
|
||||||
int lastSlash = topic.lastIndexOf('/');
|
String[] parts = topic.split("/");
|
||||||
String sn = lastSlash >= 0 ? topic.substring(lastSlash + 1) : topic;
|
String result = insertSnField(content, parts[parts.length - 1]);
|
||||||
String result = insertSnField(content, sn);
|
|
||||||
kafkaProducer.send("msg_lyy_rpt",result);
|
kafkaProducer.send("msg_lyy_rpt",result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,7 +7,6 @@ import io.netty.handler.codec.mqtt.*;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -46,7 +45,7 @@ public class MqttPoolClient {
|
|||||||
|
|
||||||
channel = connectionPool.getConnection();
|
channel = connectionPool.getConnection();
|
||||||
|
|
||||||
if (isConnectionNotReady(channel)) {
|
if (!isConnectionReady(channel)) {
|
||||||
log.warn("连接不可用,尝试重新获取,主题: {}", topic);
|
log.warn("连接不可用,尝试重新获取,主题: {}", topic);
|
||||||
safelyInvalidateConnection(channel);
|
safelyInvalidateConnection(channel);
|
||||||
channel = null;
|
channel = null;
|
||||||
@ -94,18 +93,91 @@ public class MqttPoolClient {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ========== 私有方法 ==========
|
/**
|
||||||
|
* 订阅主题
|
||||||
|
*/
|
||||||
|
public boolean subscribe(String topic) {
|
||||||
|
return subscribe(new String[]{topic});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 通道是否不活跃
|
* 订阅多个主题
|
||||||
* @param channel 通信
|
|
||||||
* @return true 不活跃 false 活跃
|
|
||||||
*/
|
*/
|
||||||
private boolean isConnectionNotReady(Channel channel) {
|
public boolean subscribe(String[] topics) {
|
||||||
return channel == null ||
|
Channel channel = null;
|
||||||
!channel.isActive() ||
|
try {
|
||||||
!channel.isOpen() ||
|
channel = connectionPool.getConnection();
|
||||||
!channel.isWritable();
|
if (!isConnectionReady(channel)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String topic : topics) {
|
||||||
|
int messageId = generateMessageId();
|
||||||
|
MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe()
|
||||||
|
.messageId(messageId)
|
||||||
|
.addSubscription(MqttQoS.AT_LEAST_ONCE,topic)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
channel.writeAndFlush(subscribeMessage).sync();
|
||||||
|
log.info("MQTT订阅成功,主题: {}, 消息ID: {}", String.join(",", topics), messageId);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("MQTT订阅失败,主题: {}", String.join(",", topics), e);
|
||||||
|
return false;
|
||||||
|
} finally {
|
||||||
|
if (channel != null) {
|
||||||
|
connectionPool.returnConnection(channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 取消订阅
|
||||||
|
*/
|
||||||
|
public boolean unsubscribe(String topic) {
|
||||||
|
return unsubscribe(new String[]{topic});
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean unsubscribe(String[] topics) {
|
||||||
|
Channel channel = null;
|
||||||
|
try {
|
||||||
|
channel = connectionPool.getConnection();
|
||||||
|
if (!isConnectionReady(channel)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String topic : topics) {
|
||||||
|
int messageId = generateMessageId();
|
||||||
|
|
||||||
|
MqttUnsubscribeMessage unsubscribeMessage = MqttMessageBuilders.unsubscribe()
|
||||||
|
.messageId(messageId)
|
||||||
|
.addTopicFilter(topic)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
channel.writeAndFlush(unsubscribeMessage).sync();
|
||||||
|
log.info("MQTT取消订阅成功,主题: {}, 消息ID: {}", String.join(",", topics), messageId);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("MQTT取消订阅失败,主题: {}", String.join(",", topics), e);
|
||||||
|
return false;
|
||||||
|
} finally {
|
||||||
|
if (channel != null) {
|
||||||
|
connectionPool.returnConnection(channel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ========== 私有方法 ==========
|
||||||
|
|
||||||
|
private boolean isConnectionReady(Channel channel) {
|
||||||
|
return channel != null &&
|
||||||
|
channel.isActive() &&
|
||||||
|
channel.isOpen() &&
|
||||||
|
channel.isWritable();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void safelyInvalidateConnection(Channel channel) {
|
private void safelyInvalidateConnection(Channel channel) {
|
||||||
@ -124,9 +196,9 @@ public class MqttPoolClient {
|
|||||||
|
|
||||||
private MqttPublishMessage buildPublishMessage(String topic, MqttQoS qos, String payload, int messageId) {
|
private MqttPublishMessage buildPublishMessage(String topic, MqttQoS qos, String payload, int messageId) {
|
||||||
// 创建独立的 ByteBuf
|
// 创建独立的 ByteBuf
|
||||||
ByteBuf payloadBuf = null;
|
ByteBuf payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8));
|
|
||||||
return MqttMessageBuilders.publish()
|
return MqttMessageBuilders.publish()
|
||||||
.topicName(topic)
|
.topicName(topic)
|
||||||
.qos(qos)
|
.qos(qos)
|
||||||
@ -135,48 +207,43 @@ public class MqttPoolClient {
|
|||||||
.payload(payloadBuf)
|
.payload(payloadBuf)
|
||||||
.build();
|
.build();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (payloadBuf != null && payloadBuf.refCnt() > 0) {
|
|
||||||
payloadBuf.release();
|
payloadBuf.release();
|
||||||
}
|
|
||||||
log.error("构建MQTT发布消息失败, 主题: {}", topic, e);
|
log.error("构建MQTT发布消息失败, 主题: {}", topic, e);
|
||||||
throw new RuntimeException("构建MQTT消息失败", e);
|
throw new RuntimeException("构建MQTT消息失败", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean sendMessageSync(Channel channel, MqttPublishMessage message, String topic, int messageId) {
|
private boolean sendMessageSync(Channel channel, MqttPublishMessage message, String topic, int messageId) {
|
||||||
// 使用标志位跟踪是否需要释放
|
String content = null;
|
||||||
boolean needRelease = true;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// 1. 提取日志内容(安全方式)
|
ByteBuf payload = message.payload();
|
||||||
String logContent = extractMessageContentForLog(message, topic, messageId);
|
// 在发送前提取内容用于日志
|
||||||
|
try {
|
||||||
// 2. 发送消息
|
// 检查引用计数
|
||||||
channel.writeAndFlush(message).sync();
|
if (payload.refCnt() <= 0) {
|
||||||
|
log.warn("消息payload引用计数为0,主题: {}, 消息ID: {}", topic, messageId);
|
||||||
// 3. 标记不需要释放(Netty会处理)
|
return false;
|
||||||
needRelease = false;
|
}
|
||||||
|
content = payload.toString(java.nio.charset.StandardCharsets.UTF_8);
|
||||||
// 4. 记录成功日志
|
} catch (Exception e) {
|
||||||
log.info("[MQTT发送成功] 主题: {}, QoS: {}, 消息ID: {}, 内容长度: {}",
|
payload.release();
|
||||||
topic, message.fixedHeader().qosLevel(), messageId,
|
log.warn("提取MQTT消息内容失败,主题: {}, 消息ID: {}", topic, messageId, e);
|
||||||
logContent != null ? logContent.length() : 0);
|
content = "[内容提取失败]";
|
||||||
|
|
||||||
if (log.isDebugEnabled() && logContent != null) {
|
|
||||||
log.debug("[MQTT发送详情] 主题: {}, 内容: {}", topic, logContent);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 发送消息
|
||||||
|
channel.writeAndFlush(message).sync();
|
||||||
|
|
||||||
|
log.info("[MQTT发送] 主题: {}, QoS: {}, 消息ID: {}, 内容: {}",
|
||||||
|
topic, message.fixedHeader().qosLevel(), messageId, content);
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("发送MQTT消息失败,主题: {}, 消息ID: {}", topic, messageId, e);
|
log.error("同步发送MQTT消息异常,主题: {}, 消息ID: {}, 内容: {}", topic, messageId, content, e);
|
||||||
return false;
|
|
||||||
|
|
||||||
} finally {
|
// 发生异常时安全释放消息
|
||||||
// 只在需要时释放
|
|
||||||
if (needRelease) {
|
|
||||||
safelyReleaseMessage(message);
|
safelyReleaseMessage(message);
|
||||||
}
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,42 +277,9 @@ public class MqttPoolClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 安全提取消息内容用于日志,不影响原消息
|
* 打印连接池状态
|
||||||
*/
|
*/
|
||||||
private String extractMessageContentForLog(MqttPublishMessage message, String topic, int messageId) {
|
public void printPoolStatus() {
|
||||||
if (message == null) {
|
connectionPool.logPoolStatus();
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
ByteBuf payload = message.payload();
|
|
||||||
if (payload == null) {
|
|
||||||
return "[空消息]";
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 创建视图,不影响引用计数
|
|
||||||
ByteBuf slice = payload.slice();
|
|
||||||
int maxSize = 200; // 限制日志大小
|
|
||||||
int bytesToRead = Math.min(slice.readableBytes(), maxSize);
|
|
||||||
|
|
||||||
if (bytesToRead <= 0) {
|
|
||||||
return "[空内容]";
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[] bytes = new byte[bytesToRead];
|
|
||||||
slice.readBytes(bytes);
|
|
||||||
String content = new String(bytes, StandardCharsets.UTF_8);
|
|
||||||
|
|
||||||
if (slice.readableBytes() > 0) {
|
|
||||||
content += String.format("...[已截断,总长度:%d]", slice.readableBytes() + bytesToRead);
|
|
||||||
}
|
|
||||||
|
|
||||||
return content;
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("提取消息内容用于日志失败,主题: {}, 消息ID: {}", topic, messageId, e);
|
|
||||||
return "[内容提取失败]";
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user