From 4a7e9c10c47bc3f9047c386081b54f021a6b410a5bfeaf65b87494abd59da364 Mon Sep 17 00:00:00 2001 From: attiya <2413103649@qq.com> Date: Thu, 13 Nov 2025 15:37:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=A5=E5=91=8A=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/cdzy/common/model/CMDMsg.java | 35 --------- .../cdzy/common/model/dto/EbikeTracking.java | 61 ++++++++++++++++ .../ebikereport/component/KafkaConsumer.java | 14 ++-- .../ebikereport/component/KafkaProducer.java | 31 -------- .../ebikereport/component/ReoprtHandler.java | 72 +++++++++---------- .../src/main/resources/application-dev.yml | 6 +- 6 files changed, 109 insertions(+), 110 deletions(-) delete mode 100644 ebike-common/src/main/java/com/cdzy/common/model/CMDMsg.java create mode 100644 ebike-common/src/main/java/com/cdzy/common/model/dto/EbikeTracking.java delete mode 100644 ebike-report/src/main/java/com/cdzy/ebikereport/component/KafkaProducer.java diff --git a/ebike-common/src/main/java/com/cdzy/common/model/CMDMsg.java b/ebike-common/src/main/java/com/cdzy/common/model/CMDMsg.java deleted file mode 100644 index f2f9fb8..0000000 --- a/ebike-common/src/main/java/com/cdzy/common/model/CMDMsg.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.cdzy.common.model; - -import lombok.Data; - -/** - * @author attiya - * @since 2025-03-12 - */ -@Data -public class CMDMsg { - - /** - * 分组命令主题前缀 - */ - private String prefix = "ecu/cmd"; - - /** - * 分组code(运营商code - */ - private String group; - - /** - * 中控设备id - */ - private String deviceId; - - /** - * 具体命令内容(json字符串 - */ - private String command; - /** - * QoS 级别:0、1、2 - */ - private int qos = 1; -} diff --git a/ebike-common/src/main/java/com/cdzy/common/model/dto/EbikeTracking.java b/ebike-common/src/main/java/com/cdzy/common/model/dto/EbikeTracking.java new file mode 100644 index 0000000..15d57f5 --- /dev/null +++ b/ebike-common/src/main/java/com/cdzy/common/model/dto/EbikeTracking.java @@ -0,0 +1,61 @@ +package com.cdzy.common.model.dto; + +import com.influxdb.annotations.Column; +import com.influxdb.annotations.Measurement; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.time.Instant; + +@Data +@NoArgsConstructor +@Measurement(name = "ebike_tracking") +public class EbikeTracking implements Serializable { + + /** + * 设备id (ecuSn) + */ + @NotBlank(message = "设备id不能为空") + @Column(tag = true) + private String deviceId; + + /** + * 纬度 + */ + @NotNull(message = "纬度不能为空") + @Column + private Double latitude; + + /** + * 经度 + */ + @NotNull(message = "经度不能为空") + @Column + private Double longitude; + + /** + * 时间戳 + */ + @Column(timestamp = true) + private Instant _time = Instant.now(); + + @Override + public String toString() { + return "EbikeTracking{" + + "deviceId='" + deviceId + '\'' + + ", latitude=" + latitude + + ", longitude=" + longitude + + ", timestamp=" + _time + + '}'; + } + + public EbikeTracking(String deviceId, Double latitude, Double longitude) { + this.deviceId = deviceId; + this.latitude = latitude; + this.longitude = longitude; + } + +} \ No newline at end of file diff --git a/ebike-report/src/main/java/com/cdzy/ebikereport/component/KafkaConsumer.java b/ebike-report/src/main/java/com/cdzy/ebikereport/component/KafkaConsumer.java index a63592b..a0309b2 100644 --- a/ebike-report/src/main/java/com/cdzy/ebikereport/component/KafkaConsumer.java +++ b/ebike-report/src/main/java/com/cdzy/ebikereport/component/KafkaConsumer.java @@ -1,7 +1,9 @@ package com.cdzy.ebikereport.component; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,6 +23,8 @@ public class KafkaConsumer { private ReoprtHandler reoprtHandler; + private final ObjectMapper mapper = new ObjectMapper(); + @Autowired public void ResponseConsumerService(ReoprtHandler reoprtHandler) { this.reoprtHandler = reoprtHandler; @@ -32,9 +36,9 @@ public class KafkaConsumer { * @param record 消息 */ @KafkaListener(topics = {"msg_rpt"}) - public void onMessage(ConsumerRecord record){ + public void onMessage(ConsumerRecord record) throws JsonProcessingException { log.info("[KAFKA接收] 主题: {}, 内容: {}", record.topic(), record.value()); - JSONObject json = JSON.parseObject(record.value().toString(), JSONObject.class); - reoprtHandler.reportHandler(json); + JsonNode jsonNode = mapper.readValue(record.value().toString(), JsonNode.class); + reoprtHandler.reportHandler(jsonNode); } } diff --git a/ebike-report/src/main/java/com/cdzy/ebikereport/component/KafkaProducer.java b/ebike-report/src/main/java/com/cdzy/ebikereport/component/KafkaProducer.java deleted file mode 100644 index 2b21cd0..0000000 --- a/ebike-report/src/main/java/com/cdzy/ebikereport/component/KafkaProducer.java +++ /dev/null @@ -1,31 +0,0 @@ -//package com.cdzy.ebikereport.component; -// -//import jakarta.annotation.Resource; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.stereotype.Component; -// -///** -// * Kafka生产者 -// * -// * @author xiafan -// */ -//@Component -//public class KafkaProducer { -// -// private static final Logger log= LoggerFactory.getLogger(KafkaProducer.class); -// -// @Resource -// private KafkaTemplate kafkaTemplate; -// -// /** -// * 生产者简单发送消息 -// * @param topic 主题 -// * @param msg 信息 -// */ -// public void send(String topic,String msg){ -// log.info("[KAFKA发送] 主题: {}, 内容: {}", topic, msg); -// kafkaTemplate.send(topic,msg); -// } -//} diff --git a/ebike-report/src/main/java/com/cdzy/ebikereport/component/ReoprtHandler.java b/ebike-report/src/main/java/com/cdzy/ebikereport/component/ReoprtHandler.java index ecb11a8..c518aa5 100644 --- a/ebike-report/src/main/java/com/cdzy/ebikereport/component/ReoprtHandler.java +++ b/ebike-report/src/main/java/com/cdzy/ebikereport/component/ReoprtHandler.java @@ -1,16 +1,17 @@ package com.cdzy.ebikereport.component; -import com.alibaba.fastjson2.JSONObject; -import com.cdzy.common.model.EbikeTracking; -import com.cdzy.common.model.ResGPSDto; + +import com.cdzy.common.model.dto.EbikeTracking; +import com.cdzy.common.model.dto.ResGPSDto; import com.cdzy.common.utils.CoordinateUtil; import com.cdzy.ebikereport.enums.BitSwitch; import com.cdzy.ebikereport.utils.BinaryUtil; import com.cdzy.ebikereport.utils.RedisUtil; -import com.ebike.feign.clients.MaintenanceFeignClient; -import com.ebike.feign.clients.OperateFeignClient; +import com.ebike.feign.clients.OperationsFeignClient; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.annotation.Resource; -import org.springframework.data.geo.Point; import org.springframework.stereotype.Component; @@ -21,38 +22,35 @@ import org.springframework.stereotype.Component; @Component public class ReoprtHandler { + + private final ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + @Resource private RedisUtil redisUtil; @Resource - private OperateFeignClient operateFeignClient; - - @Resource - private MaintenanceFeignClient maintenanceFeignClient; + private OperationsFeignClient operateFeignClient; /** * 报文处理 */ - public void reportHandler(JSONObject response) { - Integer c = response.getInteger("c"); - if (c != null) { - String deviceId = response.getString("deviceId"); - JSONObject param = response.getJSONObject("param"); - switch (c){ - case 56: - gpsMsgHandler(param,deviceId); - break; - case 66: - bmsMsgHandler(param,deviceId); - break; - } + public void reportHandler(JsonNode response) { + int c = response.get("c").asInt(); + JsonNode param = response.get("param"); + String deviceId = param.get("sn").asText(); + switch (c) { + case 56: + gpsMsgHandler(param, deviceId); + break; + case 66: + bmsMsgHandler(param, deviceId); + break; } } - public void gpsMsgHandler(JSONObject param, String deviceId) { - Integer number = param.getInteger("sw"); + public void gpsMsgHandler(JsonNode param, String deviceId) { + int number = param.get("sw").asInt(); String binary = BinaryUtil.to32BitBinary(number); - //修复高低bug binary = new StringBuilder(binary).reverse().toString(); char helmet = binary.charAt(BitSwitch.IS_HELMET_EXIT); char acc = binary.charAt(BitSwitch.ACC_ON); @@ -61,9 +59,9 @@ public class ReoprtHandler { char isHelmetLocked = binary.charAt(BitSwitch.IS_HELMET_LOCKED); char isWheelSpin = binary.charAt(BitSwitch.IS_WHEEL_SPIN); char isMoving = binary.charAt(BitSwitch.IS_MOVING); - ResGPSDto resGpsDto = param.toJavaObject(ResGPSDto.class); + ResGPSDto resGpsDto = objectMapper.convertValue(param, ResGPSDto.class); if (resGpsDto.getSoc() > 0 && resGpsDto.getSoc() < 20) { - maintenanceFeignClient.changeBattery(deviceId); + //TODO:生成换电工单 } resGpsDto.setEcuSn(deviceId); resGpsDto.setHelmetExit(helmet); @@ -75,20 +73,22 @@ public class ReoprtHandler { resGpsDto.setIsMoving(isMoving); redisUtil.set(deviceId, resGpsDto); double[] doubles = CoordinateUtil.WGS84ToGCJ02(resGpsDto.getLongitude(), resGpsDto.getLatitude()); - redisUtil.addLocation(new Point(doubles[0], doubles[1]), deviceId); boolean outOfChina = CoordinateUtil.outOfChina(doubles[0], doubles[1]); if (!outOfChina) { EbikeTracking ebikeTracking = new EbikeTracking(deviceId, doubles[1], doubles[0]); - operateFeignClient.saveEbikeTracking(ebikeTracking); +// operateFeignClient.saveEbikeTracking(ebikeTracking); + //TODO:保存轨迹 } } - public void bmsMsgHandler(JSONObject param, String deviceId) { - ResGPSDto resGpsDto = (ResGPSDto)redisUtil.get(deviceId); - Integer mosState = param.getInteger("MOSState"); - resGpsDto.setMosState(mosState); - redisUtil.delete(deviceId); - redisUtil.set(deviceId,resGpsDto); + public void bmsMsgHandler(JsonNode param, String deviceId) { + ResGPSDto resGpsDto = (ResGPSDto) redisUtil.get(deviceId); + if (resGpsDto != null) { + Integer mosState = param.get("MOSState").asInt(); + resGpsDto.setMosState(mosState); + redisUtil.delete(deviceId); + redisUtil.set(deviceId, resGpsDto); + } } } diff --git a/ebike-report/src/main/resources/application-dev.yml b/ebike-report/src/main/resources/application-dev.yml index 9402a94..1e41de2 100644 --- a/ebike-report/src/main/resources/application-dev.yml +++ b/ebike-report/src/main/resources/application-dev.yml @@ -14,7 +14,7 @@ spring: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 kafka: - bootstrap-servers: 192.168.2.226:9092 + bootstrap-servers: 192.168.101.40:9092 producer: retries: 0 key-serializer: org.apache.kafka.common.serialization.StringSerializer @@ -31,11 +31,11 @@ spring: # Redis数据库索引(默认为0) database: 2 # Redis服务器地址 - host: 192.168.2.226 + host: 47.109.71.130 # Redis服务器连接端口 port: 6379 # Redis服务器连接密码(默认为空) - # password: + password: 970529 # 连接超时时间 lettuce: pool: