kafka优化
This commit is contained in:
parent
e9198ae852
commit
492422acf1
@ -39,6 +39,6 @@ public class KafkaConsumer {
|
|||||||
public void onMessage(ConsumerRecord<?, ?> record) throws JsonProcessingException {
|
public void onMessage(ConsumerRecord<?, ?> record) throws JsonProcessingException {
|
||||||
log.info("[KAFKA接收] 主题: {}, 内容: {}", record.topic(), record.value());
|
log.info("[KAFKA接收] 主题: {}, 内容: {}", record.topic(), record.value());
|
||||||
JsonNode jsonNode = mapper.readValue(record.value().toString(), JsonNode.class);
|
JsonNode jsonNode = mapper.readValue(record.value().toString(), JsonNode.class);
|
||||||
reoprtHandler.reportHandler(jsonNode,record.topic());
|
reoprtHandler.reportHandler(jsonNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -36,11 +36,10 @@ public class ReoprtHandler {
|
|||||||
/**
|
/**
|
||||||
* 报文处理
|
* 报文处理
|
||||||
*/
|
*/
|
||||||
public void reportHandler(JsonNode response,String topic) {
|
public void reportHandler(JsonNode response) {
|
||||||
int c = response.get("c").asInt();
|
int c = response.get("c").asInt();
|
||||||
JsonNode param = response.get("param");
|
JsonNode param = response.get("param");
|
||||||
String[] parts = topic.split("/");
|
String deviceId = param.get("SN").asText();
|
||||||
String deviceId = parts[parts.length - 1];
|
|
||||||
switch (c) {
|
switch (c) {
|
||||||
case 56:
|
case 56:
|
||||||
gpsMsgHandler(param, deviceId);
|
gpsMsgHandler(param, deviceId);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user