diff --git a/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java b/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java index 0c0ac52..c088852 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java @@ -64,7 +64,6 @@ public class KafkaConsumer { } mqttPoolClient.getPoolStatus(); // 使用注入的 MqttPoolClient 发送消息 - Thread.sleep(10000); boolean sent = mqttPoolClient.sendMessage(topic, MqttQoS.AT_LEAST_ONCE, command); log.info("[MQTT发送结果] 主题: {}, 成功: {}", topic, sent); diff --git a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java index 4d8920a..1eeeff4 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java @@ -6,6 +6,8 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.*; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; +import org.cdzy.gather.config.SpringContextHolder; +import org.cdzy.gather.kafka.KafkaProducer; import java.nio.charset.StandardCharsets; @@ -104,6 +106,12 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { */ private void processReceivedMessage(String topic, String content) { try { + KafkaProducer kafkaProducer = SpringContextHolder.getBean(KafkaProducer.class); + if (topic.contains("rsp")) { + kafkaProducer.send("msg_rsp",content); + } else if (topic.contains("rpt")) { + kafkaProducer.send("msg_rpt",content); + } log.info("处理MQTT消息 - 主题: {}, 内容: {}", topic, content); if (topic.startsWith("sensor/")) { handleSensorData(topic, content);