kafka基础调试

This commit is contained in:
attiya 2025-10-30 14:54:39 +08:00
parent 357c09c1ea
commit 7a87016761
8 changed files with 190 additions and 56 deletions

View File

@ -1,11 +1,7 @@
package org.cdzy.gather.kafka; package org.cdzy.gather.kafka;
import com.cdzy.common.model.CMDMsg;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.cdzy.gather.mqtt.MqttPoolClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
@ -28,9 +24,9 @@ public class KafkaConsumer {
*/ */
@KafkaListener(topics = {"command"}) @KafkaListener(topics = {"command"})
public void onMessage(ConsumerRecord<?, ?> record) throws JsonProcessingException { public void onMessage(ConsumerRecord<?, ?> record) throws JsonProcessingException {
// log.info("[KAFKA接收] 主题: {}, 内容: {}", record.topic(), record.value()); log.info("[KAFKA接收] 主题: {}, 内容: {}", record.topic(), record.value());
ObjectMapper objectMapper = new ObjectMapper(); // ObjectMapper objectMapper = new ObjectMapper();
CMDMsg cmdMsg = objectMapper.readValue(String.valueOf(record.value()), CMDMsg.class); // CMDMsg cmdMsg = objectMapper.readValue(String.valueOf(record.value()), CMDMsg.class);
MqttPoolClient.sendMessage(cmdMsg.getPrefix()+"/"+cmdMsg.getGroup()+"/"+cmdMsg.getDeviceId(), MqttQoS.valueOf(cmdMsg.getQos()), cmdMsg.getCommand()); // MqttPoolClient.sendMessage(cmdMsg.getPrefix()+"/"+cmdMsg.getGroup()+"/"+cmdMsg.getDeviceId(), MqttQoS.valueOf(cmdMsg.getQos()), cmdMsg.getCommand());
} }
} }

View File

@ -13,7 +13,7 @@ spring:
username: nacos username: nacos
password: nacos password: nacos
kafka: kafka:
bootstrap-servers: 192.168.2.226:9092 bootstrap-servers: 192.168.101.40:9092
producer: producer:
retries: 0 retries: 0
key-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer

View File

@ -1,8 +1,10 @@
package com.cdzy.operations.component; package com.cdzy.operations.component;
import com.cdzy.operations.service.CommandService;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
@ -17,6 +19,10 @@ import org.springframework.stereotype.Component;
@Component @Component
public class KafkaConsumer { public class KafkaConsumer {
@Resource
CommandService commandService;
/** /**
* 消费者监听消息 * 消费者监听消息
* *
@ -27,6 +33,7 @@ public class KafkaConsumer {
log.info("[KAFKA接收] 主题: {}, 内容: {}", record.topic(), record.value()); log.info("[KAFKA接收] 主题: {}, 内容: {}", record.topic(), record.value());
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readValue(record.value().toString(), JsonNode.class); JsonNode jsonNode = objectMapper.readValue(record.value().toString(), JsonNode.class);
//TODO处理结果 String tid = jsonNode.get("tid").asText();
commandService.onComplete(tid,true);
} }
} }

View File

@ -0,0 +1,9 @@
package com.cdzy.operations.enums;
/**
* @author attiya
* @since 2025-10-30
*/
public interface EcuBrand {
int GUANG_HE_TONG = 1;
}

View File

@ -0,0 +1,26 @@
package com.cdzy.operations.service;
import com.cdzy.operations.model.entity.EbikeEcuInfo;
/**
* MQTT命令 服务层
*
* @author attiya
* @since 2025-09-15
*/
public interface CommandService{
/**
* 寻车铃
* @param ebikeEcuInfo 中控信息
* @return 执行结果
*/
boolean findBike(EbikeEcuInfo ebikeEcuInfo);
/**
* 回调事件
* @param taskId 任务ID
* @param success 结果
*/
void onComplete(String taskId, boolean success);
}

View File

@ -0,0 +1,100 @@
package com.cdzy.operations.service.impl;
import com.cdzy.common.ex.EbikeException;
import com.cdzy.operations.component.KafkaProducer;
import com.cdzy.operations.enums.EcuBrand;
import com.cdzy.operations.model.entity.EbikeEcuInfo;
import com.cdzy.operations.service.CommandService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.*;
/**
* MQTT命令 服务层实现
*
* @author attiya
* @since 2025-09-15
*/
@Slf4j
@Service
public class CommandServiceImpl implements CommandService {
private final ExecutorService executor = Executors.newFixedThreadPool(10);
private final ConcurrentHashMap<String, CompletableFuture<Boolean>> pendingRequests = new ConcurrentHashMap<>();
@Resource
KafkaProducer kafkaProducer;
@Override
public boolean findBike(EbikeEcuInfo ebikeEcuInfo) {
boolean result = false;
switch (ebikeEcuInfo.getEcuBrand()){
case EcuBrand.GUANG_HE_TONG -> result = submitTaskAndWait("command",createTaskId(),"测试");
default -> throw new EbikeException("该品牌中控暂未接入");
}
return result;
}
@Override
public void onComplete(String taskId, boolean success) {
completeRequest(taskId,success);
}
/**
* 提交异步任务并等待布尔结果
*
* @return true-成功, false-失败或超时
*/
public boolean submitTaskAndWait(String topic, String taskId, String taskData) {
// 创建异步任务
CompletableFuture<Boolean> future = new CompletableFuture<>();
pendingRequests.put(taskId, future);
try {
// 发送消息到消息队列
sendMessageToQueue(topic, taskData);
// 等待结果5秒超时
return future.orTimeout(5, TimeUnit.SECONDS).join();
} catch (Exception e) {
pendingRequests.remove(taskId);
log.info("任务提交失败: {}", e.getMessage());
return false;
}
}
/**
* 发送消息到消息队列
*/
private void sendMessageToQueue(String topic, String taskData) {
executor.submit(() -> {
kafkaProducer.send(topic, taskData);
});
}
/**
* 完成请求从消息回调
*/
public void completeRequest(String taskId, boolean success) {
CompletableFuture<Boolean> future = pendingRequests.remove(taskId);
if (future != null) {
future.complete(success);
log.info("请求 {} 完成,结果: {}", taskId, success);
} else {
log.info("请求 {} 已超时", taskId);
}
}
String createTaskId(){
return UUID.randomUUID().toString().replaceAll("-", "");
}
}

View File

@ -7,6 +7,7 @@ import com.cdzy.operations.model.dto.EbikeEcuInOverview;
import com.cdzy.operations.model.entity.EbikeBikeInfo; import com.cdzy.operations.model.entity.EbikeBikeInfo;
import com.cdzy.operations.model.vo.EbikeEcuInfoBatchVo; import com.cdzy.operations.model.vo.EbikeEcuInfoBatchVo;
import com.cdzy.operations.model.vo.EbikeEcuInfoVo; import com.cdzy.operations.model.vo.EbikeEcuInfoVo;
import com.cdzy.operations.service.CommandService;
import com.cdzy.operations.utils.EmqxApiClient; import com.cdzy.operations.utils.EmqxApiClient;
import com.mybatisflex.core.query.QueryMethods; import com.mybatisflex.core.query.QueryMethods;
import com.mybatisflex.core.query.QueryWrapper; import com.mybatisflex.core.query.QueryWrapper;
@ -40,6 +41,9 @@ public class EbikeEcuInfoServiceImpl extends ServiceImpl<EbikeEcuInfoMapper, Ebi
@Resource @Resource
private EbikeBikeInfoMapper bikeInfoMapper; private EbikeBikeInfoMapper bikeInfoMapper;
@Resource
private CommandService commandService;
@Override @Override
public void saveEcu(EbikeEcuInfoVo ebikeEcuInfo) { public void saveEcu(EbikeEcuInfoVo ebikeEcuInfo) {
EbikeEcuInfo ebikeEcuInfoEntity = EbikeEcuInfo.builder() EbikeEcuInfo ebikeEcuInfoEntity = EbikeEcuInfo.builder()
@ -108,64 +112,56 @@ public class EbikeEcuInfoServiceImpl extends ServiceImpl<EbikeEcuInfoMapper, Ebi
public boolean online(String ecuSn, String bikeCode) throws IOException { public boolean online(String ecuSn, String bikeCode) throws IOException {
check(ecuSn, bikeCode); check(ecuSn, bikeCode);
if (StringUtil.hasText(ecuSn)) { if (StringUtil.hasText(ecuSn)) {
QueryWrapper queryWrapper = QueryWrapper.create() checkEcuSn(ecuSn);
.where(EBIKE_ECU_INFO.ECU_SN.eq(ecuSn));
EbikeEcuInfo ebikeEcuInfo = ebikeEcuInfoMapper.selectOneByQuery(queryWrapper);
if (ebikeEcuInfo == null) {
throw new EbikeException("中控编号错误");
}
return EmqxApiClient.isClientOnline(ebikeEcuInfo.getEcuSn());
} else { } else {
QueryWrapper queryWrapper = QueryWrapper.create() EbikeEcuInfo ebikeEcuInfo = getEcu(bikeCode);
.where(EBIKE_BIKE_INFO.BIKE_CODE.eq(bikeCode)); ecuSn = ebikeEcuInfo.getEcuSn();
EbikeBikeInfo ebikeBikeInfo = bikeInfoMapper.selectOneByQuery(queryWrapper);
if (ebikeBikeInfo == null) {
throw new EbikeException("车辆编号错误");
}
queryWrapper.clear();
queryWrapper.where(EBIKE_ECU_INFO.ECU_ID.eq(ebikeBikeInfo.getEcuId()));
EbikeEcuInfo ebikeEcuInfo = ebikeEcuInfoMapper.selectOneByQuery(queryWrapper);
if (ebikeEcuInfo == null) {
throw new EbikeException("车辆所绑定中控不存在");
}
return EmqxApiClient.isClientOnline(ebikeEcuInfo.getEcuSn());
} }
return EmqxApiClient.isClientOnline(ecuSn);
} }
@Override @Override
public boolean findBike(String ecuSn, String bikeCode) { public boolean findBike(String ecuSn, String bikeCode) {
check(ecuSn, bikeCode); check(ecuSn, bikeCode);
if (StringUtil.hasText(ecuSn)) { if (StringUtil.hasText(ecuSn)) {
QueryWrapper queryWrapper = QueryWrapper.create() EbikeEcuInfo ebikeEcuInfo = checkEcuSn(ecuSn);
.where(EBIKE_ECU_INFO.ECU_SN.eq(ecuSn)); return commandService.findBike(ebikeEcuInfo);
EbikeEcuInfo ebikeEcuInfo = ebikeEcuInfoMapper.selectOneByQuery(queryWrapper);
if (ebikeEcuInfo == null) {
throw new EbikeException("中控编号错误");
}
} else { } else {
QueryWrapper queryWrapper = QueryWrapper.create() EbikeEcuInfo ebikeEcuInfo = getEcu(bikeCode);
.where(EBIKE_BIKE_INFO.BIKE_CODE.eq(bikeCode)); return commandService.findBike(ebikeEcuInfo);
EbikeBikeInfo ebikeBikeInfo = bikeInfoMapper.selectOneByQuery(queryWrapper);
if (ebikeBikeInfo == null) {
throw new EbikeException("车辆编号错误");
}
queryWrapper.clear();
queryWrapper.where(EBIKE_ECU_INFO.ECU_ID.eq(ebikeBikeInfo.getEcuId()));
EbikeEcuInfo ebikeEcuInfo = ebikeEcuInfoMapper.selectOneByQuery(queryWrapper);
if (ebikeEcuInfo == null) {
throw new EbikeException("车辆所绑定中控不存在");
}
} }
return false;
} }
void check(String ecuSn, String bikeCode) { private EbikeEcuInfo checkEcuSn(String ecuSn) {
if (ecuSn == null || bikeCode == null) { QueryWrapper queryWrapper = QueryWrapper.create()
.where(EBIKE_ECU_INFO.ECU_SN.eq(ecuSn));
EbikeEcuInfo ebikeEcuInfo = ebikeEcuInfoMapper.selectOneByQuery(queryWrapper);
if (ebikeEcuInfo == null) {
throw new EbikeException("中控编号错误");
}
return ebikeEcuInfo;
}
private void check(String ecuSn, String bikeCode) {
if (ecuSn == null && bikeCode == null) {
throw new EbikeException("中控编号、车辆编号均为空"); throw new EbikeException("中控编号、车辆编号均为空");
} }
} }
private EbikeEcuInfo getEcu(String bikeCode) {
QueryWrapper queryWrapper = QueryWrapper.create()
.where(EBIKE_BIKE_INFO.BIKE_CODE.eq(bikeCode));
EbikeBikeInfo ebikeBikeInfo = bikeInfoMapper.selectOneByQuery(queryWrapper);
if (ebikeBikeInfo == null) {
throw new EbikeException("车辆编号错误");
}
queryWrapper.clear();
queryWrapper.where(EBIKE_ECU_INFO.ECU_ID.eq(ebikeBikeInfo.getEcuId()));
EbikeEcuInfo ebikeEcuInfo = ebikeEcuInfoMapper.selectOneByQuery(queryWrapper);
if (ebikeEcuInfo == null) {
throw new EbikeException("车辆所绑定中控不存在");
}
return ebikeEcuInfo;
}
} }

View File

@ -9,7 +9,7 @@ spring:
username: nacos username: nacos
password: nacos password: nacos
kafka: kafka:
bootstrap-servers: 192.168.2.226:9092 bootstrap-servers: 192.168.101.40:9092
producer: producer:
retries: 0 retries: 0
key-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer