车辆轨迹存储

This commit is contained in:
attiya 2025-12-18 15:29:54 +08:00
parent 7a6c7c7aae
commit 7a4c4be90a
12 changed files with 433 additions and 4 deletions

View File

@ -0,0 +1,55 @@
package com.cdzy.common.model.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class EbikeTrackingDto implements Serializable {
/**
* 设备id (ecuSn)
*/
private String deviceId;
/**
* 纬度
*/
private Double latitude;
/**
* 经度
*/
private Double longitude;
/**
* 纬度(GCJ02)
*/
private Double latGCJ02;
/**
* 经度(GCJ02)
*/
private Double lngGCJ02;
/**
* 时间戳
*/
private LocalDateTime time;
@Override
public String toString() {
return "EbikeTracking{" +
"deviceId='" + deviceId + '\'' +
", latitude=" + latitude +
", longitude=" + longitude +
", timestamp=" + time +
'}';
}
}

View File

@ -0,0 +1,40 @@
package com.cdzy.common.model.dto;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* 车辆轨迹查询 请求参数
*
* @author dingchao
* @date 2025/4/2
* @modified by:
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ReqEbikeTrackingDto {
/**
* 车辆编号
*/
@NotBlank(message = "车辆编号不能为空")
private String ebikeCode;
/**
* 时间间隔1s表示1秒1m表示1分钟1h表示1小时1d表示1天
* 默认为 10s
*/
private String interval;
/**
* 开始时间 yyyy-MM-dd HH:mm:ss
*/
@NotBlank(message = "开始时间不能为空")
private LocalDateTime startTime;
/**
* 结束时间 yyyy-MM-dd HH:mm:ss
*/
private LocalDateTime endTime;
}

View File

@ -1,5 +1,6 @@
package com.ebike.feign.clients;
import com.cdzy.common.model.dto.EbikeTracking;
import com.cdzy.common.model.response.JsonResult;
import com.ebike.feign.component.FeignTokenInterceptor;
import com.ebike.feign.config.ExampleFeignConfiguration;
@ -73,4 +74,13 @@ public interface OperationsFeignClient {
@PostMapping("/ebikeBikeOrder/inspectionSwapOrder")
JsonResult<FeignEbikeUserBikeInfo> inspectionSwapOrder(@RequestBody FeignInspectionSwapOrderVo inspectionSwapOrderVo);
/**
* 保存车辆轨迹
*
* @param ebikeTracking 车辆轨迹
* @return 结果数据 返回结果
*/
@PostMapping("ebikeTracking/save")
JsonResult<?> saveEbikeTracking(@RequestBody EbikeTracking ebikeTracking);
}

View File

@ -1,5 +1,8 @@
package org.cdzy.gather.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -17,6 +20,8 @@ import java.nio.charset.StandardCharsets;
@Slf4j
public class MqttHandler extends ChannelInboundHandlerAdapter {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
if (msg instanceof MqttMessage mqttMessage) {
@ -118,7 +123,9 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
kafkaProducer.send("msg_lyy_rsp",content);
} else if (topic.contains("rpt")) {
//乐摇摇上报
kafkaProducer.send("msg_lyy_rpt",content);
String[] parts = topic.split("/");
String result = insertSnField(content, parts[parts.length - 1]);
kafkaProducer.send("msg_lyy_rpt",result);
}
}
log.info("处理MQTT消息 - 主题: {}, 内容: {}", topic, content);
@ -164,4 +171,29 @@ public class MqttHandler extends ChannelInboundHandlerAdapter {
}
super.userEventTriggered(ctx, evt);
}
/**
* 处理JSON字符串插入SN字段
*/
public static String insertSnField(String jsonString, String snValue) {
try {
// 1. 将字符串解析为JsonNode
JsonNode jsonNode = objectMapper.readTree(jsonString);
// 2. 确保是ObjectNode才能添加字段
if (jsonNode.isObject()) {
ObjectNode objectNode = (ObjectNode) jsonNode;
// 3. 添加SN字段
objectNode.put("SN", snValue);
// 4. 将修改后的JSON写回字符串
return objectMapper.writeValueAsString(objectNode);
} else {
throw new IllegalArgumentException("JSON must be an object");
}
} catch (Exception e) {
throw new RuntimeException("Failed to process JSON", e);
}
}
}

View File

@ -188,6 +188,13 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.10.0</version> <!-- 匹配InfluxDB 2.x版本 -->
</dependency>
</dependencies>
<profiles>

View File

@ -0,0 +1,35 @@
package com.cdzy.operations.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.io.Serializable;
/**
* 车辆轨迹时序数据库 配置类
*
* @author attiya
* @since 2025/4/2
*/
@Data
@Component
@ConfigurationProperties(prefix = "influxdb")
public class EbikeTrackingConfg implements Serializable {
/**
* 时序数据库url
*/
private String url;
/**
* 时序数据库访问令牌
*/
private String token;
/**
* 时序数据库组织
*/
private String org;
/**
* 时序数据库数据桶
*/
private String bucket;
}

View File

@ -0,0 +1,28 @@
package com.cdzy.operations.config;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class InfluxDbConfig {
@Value("${influxdb.url}")
private String url;
@Value("${influxdb.token}")
private String token;
@Value("${influxdb.org}")
private String org;
@Value("${influxdb.bucket}")
private String bucket;
@Bean
public InfluxDBClient influxDBClient() {
return InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);
}
}

View File

@ -0,0 +1,59 @@
package com.cdzy.operations.controller;
import com.cdzy.common.model.dto.EbikeTracking;
import com.cdzy.common.model.response.JsonResult;
import com.cdzy.common.model.dto.EbikeTrackingDto;
import com.cdzy.common.model.dto.ReqEbikeTrackingDto;
import com.cdzy.operations.service.EbikeTrackingService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* 车辆轨迹查询 控制层
*
* @author attiya
* @since 2025/4/2
*/
@Slf4j
@RestController
@RequestMapping("/ebikeTracking")
public class EbikeTrackingController {
@Resource
private EbikeTrackingService ebikeTrackingService;
/**
* 保存车辆轨迹
*
* @param ebikeTracking 车辆轨迹
*/
@PostMapping("save")
public JsonResult<?> save(@RequestBody EbikeTracking ebikeTracking) {
boolean result = ebikeTrackingService.save(ebikeTracking);
return result ? JsonResult.success("保存车辆轨迹成功") : JsonResult.failed("保存车辆轨迹失败");
}
/**
* 车辆轨迹查询
*
* @param reqEbikeTrackingDto 查询参数
*/
@PostMapping("query")
public JsonResult<?> query(@RequestBody ReqEbikeTrackingDto reqEbikeTrackingDto) {
if (reqEbikeTrackingDto.getInterval() == null|| reqEbikeTrackingDto.getInterval().isEmpty()) {
reqEbikeTrackingDto.setInterval("10s");
}
List<EbikeTrackingDto> result = ebikeTrackingService.query(reqEbikeTrackingDto);
return JsonResult.success(result);
}
}

View File

@ -0,0 +1,32 @@
package com.cdzy.operations.service;
import com.cdzy.common.model.dto.EbikeTracking;
import com.cdzy.common.model.dto.EbikeTrackingDto;
import com.cdzy.common.model.dto.ReqEbikeTrackingDto;
import java.util.List;
/**
* 车辆轨迹 服务层
*
* @author dingchao
* @date 2025/4/2
* @modified by:
*/
public interface EbikeTrackingService {
/**
* 保存车辆轨迹
*
* @param ebikeTracking 车辆轨迹
* @return
*/
Boolean save(EbikeTracking ebikeTracking);
/**
* 车辆轨迹查询
*
* @param reqEbikeTrackingDto 查询参数
* @return
*/
List<EbikeTrackingDto> query(ReqEbikeTrackingDto reqEbikeTrackingDto);
}

View File

@ -0,0 +1,121 @@
package com.cdzy.operations.service.impl;
import com.cdzy.common.ex.EbikeException;
import com.cdzy.common.model.dto.EbikeTracking;
import com.cdzy.operations.config.EbikeTrackingConfg;
import com.cdzy.operations.mapper.EbikeBikeInfoMapper;
import com.cdzy.common.model.dto.EbikeTrackingDto;
import com.cdzy.common.model.dto.ReqEbikeTrackingDto;
import com.cdzy.common.utils.ConvertUtil;
import com.cdzy.operations.service.EbikeTrackingService;
import com.influxdb.annotations.Measurement;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApi;
import com.influxdb.client.domain.WritePrecision;
import com.mybatisflex.core.query.QueryWrapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.time.*;
import java.util.List;
import java.util.StringJoiner;
import static com.cdzy.operations.model.entity.table.EbikeBikeInfoTableDef.EBIKE_BIKE_INFO;
import static com.cdzy.operations.model.entity.table.EbikeEcuInfoTableDef.EBIKE_ECU_INFO;
/**
* 车辆轨迹查询 服务层
*
* @author attiya
* @since 2025/4/2
*/
@Slf4j
@Service
public class EbikeTrackingServiceImpl implements EbikeTrackingService {
@Resource
private InfluxDBClient influxDBClient;
@Resource
private EbikeTrackingConfg ebikeTrackingConfg;
@Resource
private EbikeBikeInfoMapper bikeInfoMapper;
@Override
public Boolean save(EbikeTracking ebikeTracking) {
try (WriteApi writeApi = influxDBClient.makeWriteApi()) {
writeApi.writeMeasurement(WritePrecision.NS, ebikeTracking);
log.info("保存车辆轨迹成功, {}", ebikeTracking);
return true;
} catch (Exception e) {
log.error("保存车辆轨迹失败, {}", ebikeTracking, e);
return false;
}
}
@Override
public List<EbikeTrackingDto> query(ReqEbikeTrackingDto reqEbikeTrackingDto) {
try {
QueryWrapper queryWrapper = QueryWrapper.create()
.select(EBIKE_ECU_INFO.ECU_SN)
.where(EBIKE_BIKE_INFO.BIKE_CODE.eq(reqEbikeTrackingDto.getEbikeCode()))
.leftJoin(EBIKE_ECU_INFO).on(EBIKE_ECU_INFO.ECU_ID.eq(EBIKE_BIKE_INFO.ECU_ID));
String ecuSn = bikeInfoMapper.selectOneByQueryAs(queryWrapper, String.class);
if (ecuSn == null) {
throw new EbikeException("车辆编号错误,中控信息查询异常");
}
StringJoiner fluxQuery = new StringJoiner("\n");
fluxQuery.add(String.format("from(bucket: \"%s\")", ebikeTrackingConfg.getBucket()));
String startTime = ConvertUtil.localDateTimeToUTC(reqEbikeTrackingDto.getStartTime());
if (reqEbikeTrackingDto.getEndTime() != null) {
String endTime = ConvertUtil.localDateTimeToUTC(reqEbikeTrackingDto.getEndTime());
fluxQuery.add(String.format(" |> range(start: %s, stop: %s)", startTime, endTime));
} else {
fluxQuery.add(String.format(" |> range(start: %s)", startTime));
}
fluxQuery.add(String.format(" |> filter(fn: (r) => r._measurement == \"%s\")", getMeasurementName()));
fluxQuery.add(String.format(" |> filter(fn: (r) => r.deviceId == \"%s\")", ecuSn));
// 按照时间间隔窗口分组但保留原始时间戳
fluxQuery.add(String.format(" |> window(every: %s, timeColumn: \"_time\", stopColumn: \"_stop\", createEmpty: false)", reqEbikeTrackingDto.getInterval()));
// 在窗口内取第一条数据 last() 取最后一条
fluxQuery.add(" |> first()");
// 恢复原始时间范围避免窗口裁剪时间戳
fluxQuery.add(" |> timeShift(duration: 0s)");
// 解除窗口分组
fluxQuery.add(" |> window(every: inf)");
fluxQuery.add(" |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")");
QueryApi queryApi = influxDBClient.getQueryApi();
List<EbikeTracking> points = queryApi.query(fluxQuery.toString(), EbikeTracking.class);
return points.stream().map(point -> {
EbikeTrackingDto ebikeTrackingDto = new EbikeTrackingDto();
BeanUtils.copyProperties(point, ebikeTrackingDto);
Instant _time = point.get_time();
ebikeTrackingDto.setTime(ConvertUtil.instantToDatetime(_time));
// GPS入库的时候已经转为GCJ02坐标系
ebikeTrackingDto.setLngGCJ02(point.getLongitude());
ebikeTrackingDto.setLatGCJ02(point.getLatitude());
//double[] gcj02 = CoordinateUtil.WGS84ToGCJ02(point.getLongitude(), point.getLatitude());
//ebikeTrackingDto.setLngGCJ02(gcj02[0]);
//ebikeTrackingDto.setLatGCJ02(gcj02[1]);
return ebikeTrackingDto;
}).toList();
} catch (Exception e) {
log.error("车辆轨迹查询失败, {}", reqEbikeTrackingDto, e);
}
return null;
}
private String getMeasurementName() {
// 获取注解实例
Measurement annotation = EbikeTracking.class.getAnnotation(Measurement.class);
// 返回 name 属性值空字符串处理逻辑可自定义
return annotation.name().isEmpty() ? null : annotation.name();
}
}

View File

@ -121,3 +121,9 @@ geo-coding:
wechat:
qr-url: https://www.cdzhuojing.cn/ebike?bikeCode=
influxdb:
url: http://47.109.71.130:8086
token: -bbo5zEJ7NIZyZk4uwOg1Y9XVS5pXHhyBWvH7Y0VNp0lMpGizWnFPiGLzgmZeDm_kNj1pnJRgjXFbs1FKszk7w==
org: admin
bucket: gps

View File

@ -87,11 +87,15 @@ public class ReoprtHandler {
boolean outOfChina = CoordinateUtil.outOfChina(doubles[0], doubles[1]);
if (!outOfChina) {
EbikeTracking ebikeTracking = new EbikeTracking(deviceId, doubles[1], doubles[0]);
// operateFeignClient.saveEbikeTracking(ebikeTracking);
//TODO:保存轨迹
operateFeignClient.saveEbikeTracking(ebikeTracking);
//车辆静止,直接落库
if (isMoving == '0'){
}
}
}
public void bmsMsgHandler(JsonNode param, String deviceId) {
ResGPSDto resGpsDto = (ResGPSDto) redisUtil.getEcu(deviceId);
if (resGpsDto != null) {