diff --git a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java index 368e0ab..8f3fbf7 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java @@ -94,7 +94,7 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { processReceivedMessage(topic, content); } catch (Exception e) { log.error("处理MQTT消息异常,主题: {}", topic, e); - }finally { + } finally { payload.release(); } } @@ -134,6 +134,19 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { } else if (topic.startsWith("control/")) { handleControlCommand(topic, content); } + if (topic.startsWith("$SYS/brokers/")) { + boolean isOnline = isConnectedEvent(topic); + String clientId = objectMapper.readValue(content, ObjectNode.class).get("clientid").asText(); + if (isOnline) { + log.info("设备上线: {}", clientId); + } else { + log.info("设备下线: {}", clientId); + } + ObjectNode jsonNode = objectMapper.createObjectNode(); + jsonNode.put("deviceId", clientId); + jsonNode.put("isOnline", isOnline); + kafkaProducer.send("device_online", objectMapper.writeValueAsString(jsonNode)); + } } catch (Exception e) { log.error("处理MQTT消息业务逻辑异常", e); } @@ -196,4 +209,23 @@ public class MqttHandler extends ChannelInboundHandlerAdapter { throw new RuntimeException("Failed to process JSON", e); } } + + /** + * 判断MQTT主题是否为上线事件 + * + * @param topic 主题字符串 + * @return true: 上线事件, false: 下线事件(若主题无法识别则返回false) + */ + private boolean isConnectedEvent(String topic) { + if (topic == null) return false; + if (topic.endsWith("/connected")) { + return true; + } else if (topic.endsWith("/disconnected")) { + return false; + } else { + // 既不是connected也不是disconnected,可根据需要处理(例如抛异常或记录警告) + log.warn("未知的上下线主题格式: {}", topic); + return false; // 默认返回false(下线)或根据业务决定 + } + } } \ No newline at end of file diff --git a/ebike-gather/src/main/resources/application-prod.yml b/ebike-gather/src/main/resources/application-prod.yml index fb5774b..97dff05 100644 --- a/ebike-gather/src/main/resources/application-prod.yml +++ b/ebike-gather/src/main/resources/application-prod.yml @@ -29,7 +29,7 @@ mqtt: port: 1883 username: admin password: admin970529 - auto-subscribe-topics: ecu/rpt/#,ecu/rsp/# + auto-subscribe-topics: ecu/rpt/#,ecu/rsp/#,$SYS/brokers/+/clients/+/connected,$SYS/brokers/+/clients/+/disconnected timeout: 10 # 减少超时时间以便更快失败 clean-session: true keep-alive: 60