diff --git a/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java b/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java index 074a47b..1aeafc0 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/kafka/KafkaConsumer.java @@ -1,7 +1,11 @@ package org.cdzy.gather.kafka; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.netty.handler.codec.mqtt.MqttQoS; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.cdzy.gather.mqtt.MqttPoolClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; @@ -17,6 +21,8 @@ public class KafkaConsumer { private static final Logger log= LoggerFactory.getLogger(KafkaConsumer.class); + private static final ObjectMapper objectMapper = new ObjectMapper(); + /** * 消费者监听消息 * @@ -25,8 +31,10 @@ public class KafkaConsumer { @KafkaListener(topics = {"command"}) public void onMessage(ConsumerRecord record) throws JsonProcessingException { log.info("[KAFKA接收] 主题: {}, 内容: {}", record.topic(), record.value()); -// ObjectMapper objectMapper = new ObjectMapper(); -// CMDMsg cmdMsg = objectMapper.readValue(String.valueOf(record.value()), CMDMsg.class); -// MqttPoolClient.sendMessage(cmdMsg.getPrefix()+"/"+cmdMsg.getGroup()+"/"+cmdMsg.getDeviceId(), MqttQoS.valueOf(cmdMsg.getQos()), cmdMsg.getCommand()); + String jsonStr = String.valueOf(record.value()); + ObjectNode objectNode = objectMapper.readValue(jsonStr, ObjectNode.class); + String topic = objectNode.get("topic").asText(); + String command = objectNode.get("command").asText(); + MqttPoolClient.sendMessage(topic, MqttQoS.AT_LEAST_ONCE, command); } } diff --git a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java index 0ebdbf9..ed15c23 100644 --- a/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java +++ b/ebike-gather/src/main/java/org/cdzy/gather/mqtt/MqttHandler.java @@ -1,7 +1,6 @@ package org.cdzy.gather.mqtt; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.channel.Channel; @@ -157,7 +156,7 @@ public class MqttHandler extends SimpleChannelInboundHandler { * * @param publishMsg 信息 */ - private void handlePublish(MqttPublishMessage publishMsg) throws JsonProcessingException { + private void handlePublish(MqttPublishMessage publishMsg) { // 处理PUBLISH消息(含Payload) String topic = publishMsg.variableHeader().topicName(); List list = Arrays.stream(topic.split("/")).toList(); @@ -171,14 +170,18 @@ public class MqttHandler extends SimpleChannelInboundHandler { } KafkaProducer kafkaProducer = SpringContextHolder.getBean(KafkaProducer.class); ObjectMapper objectMapper = new ObjectMapper(); - ObjectNode objectNode = objectMapper.readValue(payload, ObjectNode.class); - objectNode.put("deviceId", list.get(list.size() - 1)); - objectNode.put("r_topic", list.get(1)); - if (topic.contains("rsp")){ - kafkaProducer.send("msg_rsp", objectNode.textValue()); - }else if (topic.contains("rpt")){ - kafkaProducer.send("msg_rpt", objectNode.textValue()); - } + try { + ObjectNode objectNode = objectMapper.readValue(payload, ObjectNode.class); + objectNode.put("deviceId", list.get(list.size() - 1)); + objectNode.put("r_topic", list.get(1)); + if (topic.contains("rsp")){ + kafkaProducer.send("msg_rsp", objectNode.toString()); + }else if (topic.contains("rpt")){ + kafkaProducer.send("msg_rpt", objectNode.toString()); + } + }catch (JsonProcessingException e){ + log.error(e.getMessage()); + } } private void handleSubAck(MqttSubAckMessage subAck) { diff --git a/ebike-operations/src/main/java/com/cdzy/operations/controller/EbikeEcuInfoController.java b/ebike-operations/src/main/java/com/cdzy/operations/controller/EbikeEcuInfoController.java index d1ccfc3..7827d97 100644 --- a/ebike-operations/src/main/java/com/cdzy/operations/controller/EbikeEcuInfoController.java +++ b/ebike-operations/src/main/java/com/cdzy/operations/controller/EbikeEcuInfoController.java @@ -1,6 +1,5 @@ package com.cdzy.operations.controller; -import com.cdzy.common.ex.EbikeException; import com.cdzy.common.model.request.PageParam; import com.cdzy.common.model.response.JsonResult; import com.cdzy.operations.model.dto.EbikeEcuInOverview; @@ -8,19 +7,14 @@ import com.cdzy.operations.model.entity.EbikeEcuInfo; import com.cdzy.operations.model.vo.EbikeEcuInfoBatchVo; import com.cdzy.operations.model.vo.EbikeEcuInfoVo; import com.cdzy.operations.service.EbikeEcuInfoService; -import com.cdzy.operations.utils.EmqxApiClient; import com.mybatisflex.core.paginate.Page; -import com.mybatisflex.core.query.QueryWrapper; import jakarta.annotation.Resource; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; -import org.springframework.web.context.request.async.DeferredResult; import java.io.IOException; import java.util.List; -import static com.cdzy.operations.model.entity.table.EbikeEcuInfoTableDef.EBIKE_ECU_INFO; - /** * 中控基本信息 控制层。 * @@ -132,6 +126,18 @@ public class EbikeEcuInfoController { } + /** + * 校验SN或BikeCode + * + * @return 执行结构 + */ + @GetMapping("checkSnOrBikeCode") + public JsonResult checkSnOrBikeCode(String ecuSn,String bikeCode) throws IOException { + boolean result = ebikeEcuInfoService.checkSnOrBikeCode(ecuSn,bikeCode); + return JsonResult.success(result); + } + + /** * 设备是否在线 * diff --git a/ebike-operations/src/main/java/com/cdzy/operations/enums/CommandType.java b/ebike-operations/src/main/java/com/cdzy/operations/enums/CommandType.java new file mode 100644 index 0000000..e047f57 --- /dev/null +++ b/ebike-operations/src/main/java/com/cdzy/operations/enums/CommandType.java @@ -0,0 +1,9 @@ +package com.cdzy.operations.enums; + +/** + * @author attiya + * @since 2025-10-30 + */ +public interface CommandType { + String FIND_BIKE = "FIND_BIKE"; +} diff --git a/ebike-operations/src/main/java/com/cdzy/operations/service/EbikeEcuInfoService.java b/ebike-operations/src/main/java/com/cdzy/operations/service/EbikeEcuInfoService.java index e5cc1e3..3c1559c 100644 --- a/ebike-operations/src/main/java/com/cdzy/operations/service/EbikeEcuInfoService.java +++ b/ebike-operations/src/main/java/com/cdzy/operations/service/EbikeEcuInfoService.java @@ -50,4 +50,12 @@ public interface EbikeEcuInfoService extends IService { * @return 结果 */ boolean findBike(String ecuSn, String bikeCode); + + /** + * 校验参数 + * @param ecuSn 中控SN码 + * @param bikeCode 车辆编号 + * @return 结果 + */ + boolean checkSnOrBikeCode(String ecuSn, String bikeCode); } diff --git a/ebike-operations/src/main/java/com/cdzy/operations/service/impl/CommandServiceImpl.java b/ebike-operations/src/main/java/com/cdzy/operations/service/impl/CommandServiceImpl.java index cef60b1..eafbe77 100644 --- a/ebike-operations/src/main/java/com/cdzy/operations/service/impl/CommandServiceImpl.java +++ b/ebike-operations/src/main/java/com/cdzy/operations/service/impl/CommandServiceImpl.java @@ -2,9 +2,12 @@ package com.cdzy.operations.service.impl; import com.cdzy.common.ex.EbikeException; import com.cdzy.operations.component.KafkaProducer; +import com.cdzy.operations.enums.CommandType; import com.cdzy.operations.enums.EcuBrand; import com.cdzy.operations.model.entity.EbikeEcuInfo; import com.cdzy.operations.service.CommandService; +import com.cdzy.operations.utils.CommandUtil; +import com.fasterxml.jackson.core.JsonProcessingException; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -22,6 +25,8 @@ import java.util.concurrent.*; @Service public class CommandServiceImpl implements CommandService { + private final String topic = "command"; + private final ExecutorService executor = Executors.newFixedThreadPool(10); private final ConcurrentHashMap> pendingRequests = new ConcurrentHashMap<>(); @@ -32,9 +37,19 @@ public class CommandServiceImpl implements CommandService { @Override public boolean findBike(EbikeEcuInfo ebikeEcuInfo) { boolean result = false; + String taskId = createTaskId(); switch (ebikeEcuInfo.getEcuBrand()){ - case EcuBrand.GUANG_HE_TONG -> result = submitTaskAndWait("command",createTaskId(),"测试"); - default -> throw new EbikeException("该品牌中控暂未接入"); + case EcuBrand.GUANG_HE_TONG: + String command = null; + try { + command = CommandUtil.guang_he_tong(ebikeEcuInfo.getEcuSn(),taskId, CommandType.FIND_BIKE); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + result = submitTaskAndWait(topic, taskId,command); + break; + default : + throw new EbikeException("该品牌中控暂未接入"); } return result; } diff --git a/ebike-operations/src/main/java/com/cdzy/operations/service/impl/EbikeEcuInfoServiceImpl.java b/ebike-operations/src/main/java/com/cdzy/operations/service/impl/EbikeEcuInfoServiceImpl.java index c99bdce..b9a34b1 100644 --- a/ebike-operations/src/main/java/com/cdzy/operations/service/impl/EbikeEcuInfoServiceImpl.java +++ b/ebike-operations/src/main/java/com/cdzy/operations/service/impl/EbikeEcuInfoServiceImpl.java @@ -132,6 +132,22 @@ public class EbikeEcuInfoServiceImpl extends ServiceImpl