diff --git a/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java b/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java index dd4a362..0c0ac52 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java @@ -37,9 +37,8 @@ public class KafkaConsumer { } /** - * 消费者监听消息 - * - * @param record 消息 + * 消费者监听信息 + * @param record 记录 */ @KafkaListener(topics = {"command"}) public void onMessage(ConsumerRecord record) { @@ -53,21 +52,30 @@ public class KafkaConsumer { String topic = objectNode.get("topic").asText(); String command = objectNode.get("command").asText(); + // 验证消息内容 + if (topic == null || topic.trim().isEmpty()) { + log.error("[KAFKA处理异常] MQTT主题为空: {}", record.value()); + return; + } + + if (command == null || command.trim().isEmpty()) { + log.error("[KAFKA处理异常] MQTT命令内容为空,主题: {}", topic); + return; + } + mqttPoolClient.getPoolStatus(); // 使用注入的 MqttPoolClient 发送消息 + Thread.sleep(10000); boolean sent = mqttPoolClient.sendMessage(topic, MqttQoS.AT_LEAST_ONCE, command); log.info("[MQTT发送结果] 主题: {}, 成功: {}", topic, sent); - // 如果发送失败,可以添加重试逻辑或告警 if (!sent) { log.error("[MQTT发送失败] 主题: {}, 内容: {}", topic, command); - // 这里可以添加重试逻辑或发送到死信队列 handleSendFailure(topic, command); } } catch (Exception e) { log.error("[KAFKA处理异常] 消息处理失败: {}", record.value(), e); - // 处理解析或发送异常 handleProcessingError(record.value(), e); } } diff --git a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttPoolClient.java b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttPoolClient.java index c5351fe..0e6b498 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttPoolClient.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttPoolClient.java @@ -1,5 +1,6 @@ package org.cdzy.gather.mqtt; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.handler.codec.mqtt.*; @@ -194,28 +195,69 @@ public class MqttPoolClient { } private MqttPublishMessage buildPublishMessage(String topic, MqttQoS qos, String payload, int messageId) { - return MqttMessageBuilders.publish() - .topicName(topic) - .qos(qos) - .retained(false) - .messageId(messageId) - .payload(Unpooled.copiedBuffer(payload.getBytes())) - .build(); + try { + // 创建独立的 ByteBuf + ByteBuf payloadBuf = Unpooled.copiedBuffer(payload.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + + return MqttMessageBuilders.publish() + .topicName(topic) + .qos(qos) + .retained(false) + .messageId(messageId) + .payload(payloadBuf) + .build(); + } catch (Exception e) { + log.error("构建MQTT发布消息失败, 主题: {}", topic, e); + throw new RuntimeException("构建MQTT消息失败", e); + } } private boolean sendMessageSync(Channel channel, MqttPublishMessage message, String topic, int messageId) { + String content = null; try { + // 在发送前提取内容用于日志 + try { + ByteBuf payload = message.payload(); + // 检查引用计数 + if (payload.refCnt() <= 0) { + log.warn("消息payload引用计数为0,主题: {}, 消息ID: {}", topic, messageId); + return false; + } + content = payload.toString(java.nio.charset.StandardCharsets.UTF_8); + } catch (Exception e) { + log.warn("提取MQTT消息内容失败,主题: {}, 消息ID: {}", topic, messageId, e); + content = "[内容提取失败]"; + } + + // 发送消息 channel.writeAndFlush(message).sync(); + log.info("[MQTT发送] 主题: {}, QoS: {}, 消息ID: {}, 内容: {}", - topic, message.fixedHeader().qosLevel(), messageId, - message.payload().toString(java.nio.charset.StandardCharsets.UTF_8)); + topic, message.fixedHeader().qosLevel(), messageId, content); return true; + } catch (Exception e) { - log.error("同步发送MQTT消息异常,主题: {}, 消息ID: {}", topic, messageId, e); + log.error("同步发送MQTT消息异常,主题: {}, 消息ID: {}, 内容: {}", topic, messageId, content, e); + + // 发生异常时安全释放消息 + safelyReleaseMessage(message); return false; } } + /** + * 安全释放MQTT消息 + */ + private void safelyReleaseMessage(MqttPublishMessage message) { + try { + if (message != null && message.payload() != null && message.payload().refCnt() > 0) { + message.payload().release(); + } + } catch (Exception e) { + log.debug("释放MQTT消息资源时发生异常", e); + } + } + private void sleepWithBackoff(int attempt) { try { long backoffMs = Math.min(1000 * (1L << (attempt - 1)), 10000);