Added:修改解析字段长度

This commit is contained in:
zzp 2024-09-11 13:56:32 +08:00
parent 94b164062b
commit f34de7e5f4
9 changed files with 80 additions and 196 deletions

View File

@ -140,7 +140,7 @@ public class CaliperGateway extends PulledSocketGatewayBase {
CaliperClient fireClient = new CaliperClient(startVo);
CaliperDaemonTask daemonTask = new CaliperDaemonTask(fireClient, startVo);
startVo.setRfidDaemonTask(daemonTask);
startVo.setCaliperDaemonTask(daemonTask);
Thread thread = new Thread(daemonTask, "Fire-Daemon-Task-runid-" + instanceVo.getRunId() + "-s" + index++);
thread.start();
}

View File

@ -8,23 +8,27 @@ import com.cisdi.data.sdk.service.SendService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @className: CaliperClientHandler
* @description: caliper客户端处理器
* @author: zhang.zhipeng
* @date: 2024/9/11 8:50
*/
@Slf4j
public class CaliperClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(CaliperClientHandler.class);
private static final Charset Encode_Charset = Charset.forName("UTF-8");
private static final Charset Encode_Charset = StandardCharsets.UTF_8;
private static final String HEARTBEAT_CODE = "HeartBea";
private CaliperClient caliperClient;
private CaliperTaskStartVo startVo;
private final CaliperTaskStartVo startVo;
public CaliperClientHandler(CaliperTaskStartVo startVo, CaliperClient caliperClient) {
this.startVo = startVo;
@ -37,8 +41,8 @@ public class CaliperClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
try {
List<CaliperItemVo> decode = CaliperVoDecoder.Decode(msg);
if(decode.size() == 1 && HEARTBEAT_CODE.equalsIgnoreCase(decode.get(0).getCode())) {
logger.info("Caliper {},启用心跳:{} 心跳 vo:{}", caliperClient.getChannel(),
startVo.getCustomParam() == null ? false : startVo.getCustomParam().getEnableHeartbeat(),
log.info("Caliper {},启用心跳:{} 心跳 vo:{}", caliperClient.getChannel(),
startVo.getCustomParam() != null && startVo.getCustomParam().getEnableHeartbeat(),
decode);
caliperClient.setLastHeartbeatTime(System.currentTimeMillis());
return;
@ -51,7 +55,7 @@ public class CaliperClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
if (startVo.getGatewayBase() != null && Boolean.TRUE.equals(startVo.getGatewayBase().getInstanceVo().getLogOpen())) {
logger.info("Caliper {} vo:{} ", caliperClient.getChannel(), decode);
log.info("Caliper {} vo:{} ", caliperClient.getChannel(), decode);
}
if (startVo.getGatewayBase() != null && startVo.getServiceProvider() != null) {
@ -65,28 +69,28 @@ public class CaliperClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
String jsonString = JSON.toJSONString(data, SerializerFeature.WriteMapNullValue);
byte[] body = jsonString.getBytes(Encode_Charset);
socketMessage.setData(body);
logger.info("socketMess" + socketMessage.toString());
log.info("socketMess{}", socketMessage);
service.sendMessage(socketMessage);
} else {
logger.info("Caliper收到消息{}", decode);
log.info("Caliper收到消息{}", decode);
}
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
log.error(e.getLocalizedMessage(), e);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("{} channelActive active", ctx.channel());
public void channelActive(ChannelHandlerContext ctx) {
log.info("{} channelActive active", ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("{} channelInactive inactive", ctx.channel());
log.info("{} channelInactive inactive", ctx.channel());
try {
caliperClient.setChannel(null);
logger.info("set channel to null raise reconnect");
log.info("set channel to null raise reconnect");
} finally {
// 释放引用
caliperClient = null;
@ -97,13 +101,13 @@ public class CaliperClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
logger.error("exceptionCaught:{}", t);
log.error("exceptionCaught:", t);
super.exceptionCaught(ctx, t);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.error("userEventTriggered:{}", evt);
log.error("userEventTriggered:{}", evt);
super.userEventTriggered(ctx, evt);
}
}

View File

@ -15,7 +15,7 @@ public class CaliperClientInitializer extends ChannelInitializer<SocketChannel>
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
protected void initChannel(SocketChannel socketChannel) {
//注册管道
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new CaliperFrameDecoder(startVo));

View File

@ -1,6 +1,8 @@
package com.cisdi.data.caliper.gateway;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.ArrayList;
@ -8,74 +10,29 @@ import java.util.List;
import java.util.stream.Collectors;
/**
* Rfid server有多个所以需要支持配置多个服务端地址的
* @author cup
* @date 2021/03/03
*/
* @className: CaliperCustomParam
* @description: caliper参数
* @author: zhang.zhipeng
* @date: 2024/9/10 17:35
*/
@Setter
@Getter
public class CaliperCustomParam implements Serializable {
private static final long serialVersionUID = -5017472517219960138L;
/**
* 每个String 192.168.1.5:8823 形式
*/
private List<String> serverList = new ArrayList<String>();
private List<String> serverList = new ArrayList<>();
private Boolean enableHeartbeat = Boolean.FALSE;
private Integer heartbeatTimeInMs = 5000;
private Integer heartbeatMaxLostCount = 3;
public List<String> getServerList() {
return serverList;
}
public void setServerList(List<String> serverList) {
this.serverList = serverList;
}
public Boolean getEnableHeartbeat() {
return enableHeartbeat;
}
public void setEnableHeartbeat(Boolean enableHeartbeat) {
this.enableHeartbeat = enableHeartbeat;
}
public Integer getHeartbeatTimeInMs() {
return heartbeatTimeInMs;
}
public void setHeartbeatTimeInMs(Integer heartbeatTimeInMs) {
this.heartbeatTimeInMs = heartbeatTimeInMs;
}
public Integer getHeartbeatMaxLostCount() {
return heartbeatMaxLostCount;
}
public void setHeartbeatMaxLostCount(Integer heartbeatMaxLostCount) {
this.heartbeatMaxLostCount = heartbeatMaxLostCount;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
public static void main(String[] args){
CaliperCustomParam dto = new CaliperCustomParam();
dto.getServerList().add("192.168.6.55:10000");
dto.getServerList().add("192.168.6.56:10000");
dto.getServerList().add("192.165.20.111:10000");
dto.getServerList().add("192.165.20.112:10000");
dto.getServerList().add("192.165.20.113:10000");
dto.getServerList().add("192.165.20.119:10000");
dto.getServerList().add("192.165.20.118:10000");
dto.getServerList().add("192.165.20.120:10000");
dto.getServerList().add("192.165.20.121:10000");
dto.getServerList().add("192.165.20.121:10000");
List<String> customParams=dto.getServerList().stream().distinct().collect(Collectors.toList());
System.out.println(customParams);
System.out.println(dto);
}
}

View File

@ -5,10 +5,7 @@ import com.cisdi.data.sdk.consts.ServiceName;
import com.cisdi.data.sdk.service.AlarmService;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.*;
@Slf4j
public class CaliperDaemonTask implements Runnable {
@ -62,7 +59,7 @@ public class CaliperDaemonTask implements Runnable {
log.warn("当前Caliper拉取连接通道心跳启用且心跳超时触发重连连接id:{},连接信息:{}",
startVo.getGatewayBase().getInstanceVo().getRunId(), channelInfo);
//启用了心跳且心跳时间超时
reconnectToRfid(channelInfo, true);
reconnect(channelInfo, true);
}
} else {
assert caliperClient != null;
@ -71,7 +68,7 @@ public class CaliperDaemonTask implements Runnable {
caliperClient.getPort(), channelInfo);
log.warn(errorMsg);
sendAlarmToPlatform(errorMsg);
reconnectToRfid(channelInfo, false);
reconnect(channelInfo, false);
} else {
String errorMsg = String.format("Caliper到%s:%s的连接未建立或已关闭, %s", caliperClient.getIp(),
caliperClient.getPort(), channelInfo);
@ -86,7 +83,7 @@ public class CaliperDaemonTask implements Runnable {
log.warn("当前Caliper拉取连接通道心跳启用且心跳超时触发重连连接id:{},连接信息:{}",
startVo.getGatewayBase().getInstanceVo().getRunId(), channelInfo);
//启用了心跳且心跳时间超时
reconnectToRfid(channelInfo, true);
reconnect(channelInfo, true);
}
}
}
@ -96,7 +93,7 @@ public class CaliperDaemonTask implements Runnable {
return channelInfo;
}
private void reconnectToRfid(final String copyChannelInfo, boolean forceConnect) throws InterruptedException {
private void reconnect(final String copyChannelInfo, boolean forceConnect) throws InterruptedException {
Thread.sleep(5000);
Thread thread = new Thread(() -> {
try {
@ -112,31 +109,6 @@ public class CaliperDaemonTask implements Runnable {
thread.start();
}
public void forceReconnectToRfid() {
String channelInfo = String.format("channel:%s, active:%s,open:%s,writable:%s", caliperClient.getChannel(),
caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isActive(),
caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isOpen(),
caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isWritable());
try {
Thread.sleep(500);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
log.info("Caliper触发重连强制:{}....{}:{} {}", true, caliperClient.getIp(), caliperClient.getPort(),
channelInfo);
caliperClient.reconnect();
} catch (Exception e) {
log.info(e.getMessage(), e);
}
}
});
thread.start();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
private void sendAlarmToPlatform(String detail) {
if (startVo.getServiceProvider() != null && startVo.getGatewayBase() != null) {
AlarmService alarmService = (AlarmService) startVo.getServiceProvider().getByName(ServiceName.Alarm);
@ -152,7 +124,7 @@ public class CaliperDaemonTask implements Runnable {
alarmDto.setUpper(Boolean.TRUE);
alarmDto.setTime(new Date());
alarmService.raiseAlarm(Arrays.asList(alarmDto));
alarmService.raiseAlarm(Collections.singletonList(alarmDto));
}
}
}

View File

@ -8,10 +8,7 @@ import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
/**
* @Description: 获取完整的报文
@ -20,14 +17,12 @@ import java.util.Set;
*/
@Slf4j
public class CaliperFrameDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(CaliperFrameDecoder.class);
private CaliperTaskStartVo startVo;
private final static Integer MIN_LENGTH = 10;
private final byte START_SPLIT = 0x02;
private final byte END_SPLIT = 0x03;
private final Set<Integer> validDataSizeInByte = new HashSet<Integer>(Arrays.asList(10));
public CaliperFrameDecoder(CaliperTaskStartVo startVo) {
this.startVo = startVo;

View File

@ -1,6 +1,9 @@
package com.cisdi.data.caliper.gateway;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
@ -10,6 +13,8 @@ import java.io.Serializable;
* @author cup
* @date 2021/03/03
*/
@Setter
@Getter
public class CaliperItemVo implements Serializable {
private static final long serialVersionUID = -4290553035349526464L;
/**
@ -27,30 +32,6 @@ public class CaliperItemVo implements Serializable {
*/
private String code;
public String getServerIp() {
return serverIp;
}
public void setServerIp(String serverIp) {
this.serverIp = serverIp;
}
public Integer getServerPort() {
return serverPort;
}
public void setServerPort(Integer serverPort) {
this.serverPort = serverPort;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
@Override
public String toString() {
return JSON.toJSONString(this);

View File

@ -3,10 +3,15 @@ package com.cisdi.data.caliper.gateway;
import com.cisdi.data.sdk.gateway.base.PulledSocketGatewayBase;
import com.cisdi.data.sdk.param.PullSocketParam;
import com.cisdi.data.sdk.service.ServiceProvider;
import lombok.Getter;
import lombok.Setter;
import java.util.concurrent.atomic.AtomicBoolean;
@Setter
@Getter
public class CaliperTaskStartVo {
private AtomicBoolean shouldRun;
private ServiceProvider serviceProvider;
private String deviceId;
@ -14,49 +19,4 @@ public class CaliperTaskStartVo {
private PullSocketParam socketParam;
private CaliperCustomParam customParam;
private CaliperDaemonTask caliperDaemonTask;
public AtomicBoolean getShouldRun() {
return shouldRun;
}
public void setShouldRun(AtomicBoolean shouldRun) {
this.shouldRun = shouldRun;
}
public ServiceProvider getServiceProvider() {
return serviceProvider;
}
public void setServiceProvider(ServiceProvider serviceProvider) {
this.serviceProvider = serviceProvider;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public PulledSocketGatewayBase getGatewayBase() {
return gatewayBase;
}
public void setGatewayBase(PulledSocketGatewayBase gatewayBase) {
this.gatewayBase = gatewayBase;
}
public PullSocketParam getSocketParam() {
return socketParam;
}
public void setSocketParam(PullSocketParam socketParam) {
this.socketParam = socketParam;
}
public CaliperCustomParam getCustomParam() {
return customParam;
}
public void setCustomParam(CaliperCustomParam customParam) {
this.customParam = customParam;
}
public CaliperDaemonTask getRfidDaemonTask() {
return caliperDaemonTask;
}
public void setRfidDaemonTask(CaliperDaemonTask caliperDaemonTask) {
this.caliperDaemonTask = caliperDaemonTask;
}
}

View File

@ -10,23 +10,39 @@ import io.netty.util.ReferenceCountUtil;
import java.util.ArrayList;
import java.util.List;
/**
* 采用TCP/IP通信本机做客户端连上服务器后主动上报采集数据数据采用大端单精度浮点数上传具体如下
* 上传数据: f5 f5 f5 02 3f 9e 04 19 40 0f 02 0c f6 f6 f6
* <p>
* 起始帧 0xF5 0xF5 0xF5
* 数据长度(字节) 0x08
* 宽度(直径1.2345mm) 0x3F/63 0x9E/158 0x04/4 0x19/25
* 厚度(直径2.2345mm) 0x40 0x0F 0x02 0x0C
* 校验码0x57 数据之和取低八位数据 0x3f+0x9e+0x04+0x19+0x40+0x0f+0x02+0x0c=0x157
* 结束帧 0xF6 0xF6 0xF6
*
* 3F 9E 04 19
*
* @className: CaliperVoDecoder
* @description: CaliperVo解码器
* @author: zhang.zhipeng
* @date: 2024/9/11 8:53
*/
public class CaliperVoDecoder {
public static List<CaliperItemVo> Decode(ByteBuf in) {
CaliperItemVo vo = new CaliperItemVo();
List<CaliperItemVo> result = new ArrayList<>();
//消息体部分
int length;
if (in == null) {
throw new BusinessException("ByteBuf 入参为null");
} else if (in.readableBytes() == 10) {
} else if (in.readableBytes() == 14) {//起始帧3位结束帧3位中间8位
length = 8;
}
else {
throw new BusinessException("报文长度不匹配 ,期望10实际" + in.readableBytes());
} else {
throw new BusinessException("报文长度不匹配 ,期望14实际" + in.readableBytes());
}
//去掉报文头 结束符
ByteBuf proceedIn = in.retainedSlice(1, length);
ByteBuf proceedIn = in.retainedSlice(3, length);
result.add(split(proceedIn));
ReferenceCountUtil.release(proceedIn);
return result;
@ -36,8 +52,8 @@ public class CaliperVoDecoder {
/**
* @param in 去掉头尾的报文
* @Description: 拆分报文
* @author: Mo
* @Date: 2021/6/2 10:51
* @author: zhang.zhipeng
* @Date: 2024/9/11 8:53
* @return: com.cisdi.data.RFID.gateway.RfidItemVo
*/
public static CaliperItemVo split(ByteBuf in) {
@ -64,9 +80,8 @@ public class CaliperVoDecoder {
// logger.info("vo1={}", decoderVo);
//标签读取
byte[] readBytes1 = HexUtil.hexStringToBytes("02524630313030303103");
ByteBuf copiedBuff = Unpooled.copiedBuffer(readBytes1);
List<CaliperItemVo> caliperItemVoList = CaliperVoDecoder.Decode(copiedBuff);
@ -82,6 +97,6 @@ public class CaliperVoDecoder {
readBytes1 = HexUtil.hexStringToBytes("0248656172744265617403");
copiedBuff = Unpooled.copiedBuffer(readBytes1);
decoder.decode(null, copiedBuff, new ArrayList<Object>());
}
}