Compare commits
No commits in common. "fe5766db355488fbb32732fa8685a0a425989e80cb1f171c2fc8188af863fe55" and "981e3b9fe7fe9105032edf8521ab49aee091560ccbb51eec8f3df341b15bb282" have entirely different histories.
fe5766db35
...
981e3b9fe7
@ -64,6 +64,7 @@ public class KafkaConsumer {
|
|||||||
}
|
}
|
||||||
mqttPoolClient.getPoolStatus();
|
mqttPoolClient.getPoolStatus();
|
||||||
// 使用注入的 MqttPoolClient 发送消息
|
// 使用注入的 MqttPoolClient 发送消息
|
||||||
|
Thread.sleep(10000);
|
||||||
boolean sent = mqttPoolClient.sendMessage(topic, MqttQoS.AT_LEAST_ONCE, command);
|
boolean sent = mqttPoolClient.sendMessage(topic, MqttQoS.AT_LEAST_ONCE, command);
|
||||||
|
|
||||||
log.info("[MQTT发送结果] 主题: {}, 成功: {}", topic, sent);
|
log.info("[MQTT发送结果] 主题: {}, 成功: {}", topic, sent);
|
||||||
|
|||||||
@ -6,8 +6,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
|
|||||||
import io.netty.handler.codec.mqtt.*;
|
import io.netty.handler.codec.mqtt.*;
|
||||||
import io.netty.handler.timeout.IdleStateEvent;
|
import io.netty.handler.timeout.IdleStateEvent;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.cdzy.gather.config.SpringContextHolder;
|
|
||||||
import org.cdzy.gather.kafka.KafkaProducer;
|
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
@ -106,12 +104,6 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
|
|||||||
*/
|
*/
|
||||||
private void processReceivedMessage(String topic, String content) {
|
private void processReceivedMessage(String topic, String content) {
|
||||||
try {
|
try {
|
||||||
KafkaProducer kafkaProducer = SpringContextHolder.getBean(KafkaProducer.class);
|
|
||||||
if (topic.contains("rsp")) {
|
|
||||||
kafkaProducer.send("msg_rsp",content);
|
|
||||||
} else if (topic.contains("rpt")) {
|
|
||||||
kafkaProducer.send("msg_rpt",content);
|
|
||||||
}
|
|
||||||
log.info("处理MQTT消息 - 主题: {}, 内容: {}", topic, content);
|
log.info("处理MQTT消息 - 主题: {}, 内容: {}", topic, content);
|
||||||
if (topic.startsWith("sensor/")) {
|
if (topic.startsWith("sensor/")) {
|
||||||
handleSensorData(topic, content);
|
handleSensorData(topic, content);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user