From f34de7e5f4801148db38988c573ebb2ce6ad671b Mon Sep 17 00:00:00 2001 From: zzp <472876707@qq.com> Date: Wed, 11 Sep 2024 13:56:32 +0800 Subject: [PATCH] =?UTF-8?q?Added:=E4=BF=AE=E6=94=B9=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=E9=95=BF=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cisdi/data/caliper/CaliperGateway.java | 2 +- .../caliper/gateway/CaliperClientHandler.java | 42 ++++++------ .../gateway/CaliperClientInitializer.java | 2 +- .../caliper/gateway/CaliperCustomParam.java | 65 ++++--------------- .../caliper/gateway/CaliperDaemonTask.java | 40 ++---------- .../caliper/gateway/CaliperFrameDecoder.java | 7 +- .../data/caliper/gateway/CaliperItemVo.java | 29 ++------- .../caliper/gateway/CaliperTaskStartVo.java | 50 ++------------ .../caliper/gateway/CaliperVoDecoder.java | 39 +++++++---- 9 files changed, 80 insertions(+), 196 deletions(-) diff --git a/src/main/java/com/cisdi/data/caliper/CaliperGateway.java b/src/main/java/com/cisdi/data/caliper/CaliperGateway.java index 3da40fb..d0fc768 100644 --- a/src/main/java/com/cisdi/data/caliper/CaliperGateway.java +++ b/src/main/java/com/cisdi/data/caliper/CaliperGateway.java @@ -140,7 +140,7 @@ public class CaliperGateway extends PulledSocketGatewayBase { CaliperClient fireClient = new CaliperClient(startVo); CaliperDaemonTask daemonTask = new CaliperDaemonTask(fireClient, startVo); - startVo.setRfidDaemonTask(daemonTask); + startVo.setCaliperDaemonTask(daemonTask); Thread thread = new Thread(daemonTask, "Fire-Daemon-Task-runid-" + instanceVo.getRunId() + "-s" + index++); thread.start(); } diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientHandler.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientHandler.java index 5d4a60d..01d552e 100644 --- a/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientHandler.java +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientHandler.java @@ -8,23 +8,27 @@ import com.cisdi.data.sdk.service.SendService; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; - +/** +* @className: CaliperClientHandler +* @description: caliper客户端处理器 +* @author: zhang.zhipeng +* @date: 2024/9/11 8:50 +*/ +@Slf4j public class CaliperClientHandler extends SimpleChannelInboundHandler { - - private static final Logger logger = LoggerFactory.getLogger(CaliperClientHandler.class); - private static final Charset Encode_Charset = Charset.forName("UTF-8"); + private static final Charset Encode_Charset = StandardCharsets.UTF_8; private static final String HEARTBEAT_CODE = "HeartBea"; private CaliperClient caliperClient; - private CaliperTaskStartVo startVo; + private final CaliperTaskStartVo startVo; public CaliperClientHandler(CaliperTaskStartVo startVo, CaliperClient caliperClient) { this.startVo = startVo; @@ -37,8 +41,8 @@ public class CaliperClientHandler extends SimpleChannelInboundHandler { try { List decode = CaliperVoDecoder.Decode(msg); if(decode.size() == 1 && HEARTBEAT_CODE.equalsIgnoreCase(decode.get(0).getCode())) { - logger.info("Caliper {},启用心跳:{} , 心跳 vo:{}", caliperClient.getChannel(), - startVo.getCustomParam() == null ? false : startVo.getCustomParam().getEnableHeartbeat(), + log.info("Caliper {},启用心跳:{} , 心跳 vo:{}", caliperClient.getChannel(), + startVo.getCustomParam() != null && startVo.getCustomParam().getEnableHeartbeat(), decode); caliperClient.setLastHeartbeatTime(System.currentTimeMillis()); return; @@ -51,7 +55,7 @@ public class CaliperClientHandler extends SimpleChannelInboundHandler { if (startVo.getGatewayBase() != null && Boolean.TRUE.equals(startVo.getGatewayBase().getInstanceVo().getLogOpen())) { - logger.info("Caliper {} vo:{} ", caliperClient.getChannel(), decode); + log.info("Caliper {} vo:{} ", caliperClient.getChannel(), decode); } if (startVo.getGatewayBase() != null && startVo.getServiceProvider() != null) { @@ -65,28 +69,28 @@ public class CaliperClientHandler extends SimpleChannelInboundHandler { String jsonString = JSON.toJSONString(data, SerializerFeature.WriteMapNullValue); byte[] body = jsonString.getBytes(Encode_Charset); socketMessage.setData(body); - logger.info("socketMess" + socketMessage.toString()); + log.info("socketMess{}", socketMessage); service.sendMessage(socketMessage); } else { - logger.info("Caliper收到消息{}", decode); + log.info("Caliper收到消息{}", decode); } } catch (Exception e) { - logger.error(e.getLocalizedMessage(), e); + log.error(e.getLocalizedMessage(), e); } } @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - logger.info("{} channelActive active", ctx.channel()); + public void channelActive(ChannelHandlerContext ctx) { + log.info("{} channelActive active", ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - logger.info("{} channelInactive inactive", ctx.channel()); + log.info("{} channelInactive inactive", ctx.channel()); try { caliperClient.setChannel(null); - logger.info("set channel to null raise reconnect"); + log.info("set channel to null raise reconnect"); } finally { // 释放引用 caliperClient = null; @@ -97,13 +101,13 @@ public class CaliperClientHandler extends SimpleChannelInboundHandler { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { - logger.error("exceptionCaught:{}", t); + log.error("exceptionCaught:", t); super.exceptionCaught(ctx, t); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - logger.error("userEventTriggered:{}", evt); + log.error("userEventTriggered:{}", evt); super.userEventTriggered(ctx, evt); } } diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientInitializer.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientInitializer.java index 4d4b16d..d528f4b 100644 --- a/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientInitializer.java +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientInitializer.java @@ -15,7 +15,7 @@ public class CaliperClientInitializer extends ChannelInitializer } @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { + protected void initChannel(SocketChannel socketChannel) { //注册管道 ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new CaliperFrameDecoder(startVo)); diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperCustomParam.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperCustomParam.java index d6e0811..2aad9d0 100644 --- a/src/main/java/com/cisdi/data/caliper/gateway/CaliperCustomParam.java +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperCustomParam.java @@ -1,6 +1,8 @@ package com.cisdi.data.caliper.gateway; import com.alibaba.fastjson.JSON; +import lombok.Getter; +import lombok.Setter; import java.io.Serializable; import java.util.ArrayList; @@ -8,74 +10,29 @@ import java.util.List; import java.util.stream.Collectors; /** - * Rfid server有多个,所以需要支持配置多个服务端地址的 - * @author cup - * @date 2021/03/03 - */ +* @className: CaliperCustomParam +* @description: caliper参数 +* @author: zhang.zhipeng +* @date: 2024/9/10 17:35 +*/ +@Setter +@Getter public class CaliperCustomParam implements Serializable { private static final long serialVersionUID = -5017472517219960138L; /** * 每个String 如 192.168.1.5:8823 形式 */ - private List serverList = new ArrayList(); + private List serverList = new ArrayList<>(); private Boolean enableHeartbeat = Boolean.FALSE; private Integer heartbeatTimeInMs = 5000; private Integer heartbeatMaxLostCount = 3; - public List getServerList() { - return serverList; - } - - public void setServerList(List serverList) { - this.serverList = serverList; - } - - public Boolean getEnableHeartbeat() { - return enableHeartbeat; - } - - public void setEnableHeartbeat(Boolean enableHeartbeat) { - this.enableHeartbeat = enableHeartbeat; - } - - public Integer getHeartbeatTimeInMs() { - return heartbeatTimeInMs; - } - - public void setHeartbeatTimeInMs(Integer heartbeatTimeInMs) { - this.heartbeatTimeInMs = heartbeatTimeInMs; - } - - public Integer getHeartbeatMaxLostCount() { - return heartbeatMaxLostCount; - } - - public void setHeartbeatMaxLostCount(Integer heartbeatMaxLostCount) { - this.heartbeatMaxLostCount = heartbeatMaxLostCount; - } - @Override public String toString() { return JSON.toJSONString(this); } - - public static void main(String[] args){ - CaliperCustomParam dto = new CaliperCustomParam(); - dto.getServerList().add("192.168.6.55:10000"); - dto.getServerList().add("192.168.6.56:10000"); - dto.getServerList().add("192.165.20.111:10000"); - dto.getServerList().add("192.165.20.112:10000"); - dto.getServerList().add("192.165.20.113:10000"); - dto.getServerList().add("192.165.20.119:10000"); - dto.getServerList().add("192.165.20.118:10000"); - dto.getServerList().add("192.165.20.120:10000"); - dto.getServerList().add("192.165.20.121:10000"); - dto.getServerList().add("192.165.20.121:10000"); - List customParams=dto.getServerList().stream().distinct().collect(Collectors.toList()); - System.out.println(customParams); - System.out.println(dto); - } + } diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperDaemonTask.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperDaemonTask.java index 59c4f29..df3dc2b 100644 --- a/src/main/java/com/cisdi/data/caliper/gateway/CaliperDaemonTask.java +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperDaemonTask.java @@ -5,10 +5,7 @@ import com.cisdi.data.sdk.consts.ServiceName; import com.cisdi.data.sdk.service.AlarmService; import lombok.extern.slf4j.Slf4j; -import java.util.Arrays; -import java.util.Date; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; @Slf4j public class CaliperDaemonTask implements Runnable { @@ -62,7 +59,7 @@ public class CaliperDaemonTask implements Runnable { log.warn("当前Caliper拉取连接通道心跳启用,且心跳超时,触发重连,连接id:{},连接信息:{}", startVo.getGatewayBase().getInstanceVo().getRunId(), channelInfo); //启用了心跳,且心跳时间超时 - reconnectToRfid(channelInfo, true); + reconnect(channelInfo, true); } } else { assert caliperClient != null; @@ -71,7 +68,7 @@ public class CaliperDaemonTask implements Runnable { caliperClient.getPort(), channelInfo); log.warn(errorMsg); sendAlarmToPlatform(errorMsg); - reconnectToRfid(channelInfo, false); + reconnect(channelInfo, false); } else { String errorMsg = String.format("Caliper到%s:%s的连接未建立或已关闭, %s", caliperClient.getIp(), caliperClient.getPort(), channelInfo); @@ -86,7 +83,7 @@ public class CaliperDaemonTask implements Runnable { log.warn("当前Caliper拉取连接通道心跳启用,且心跳超时,触发重连,连接id:{},连接信息:{}", startVo.getGatewayBase().getInstanceVo().getRunId(), channelInfo); //启用了心跳,且心跳时间超时 - reconnectToRfid(channelInfo, true); + reconnect(channelInfo, true); } } } @@ -96,7 +93,7 @@ public class CaliperDaemonTask implements Runnable { return channelInfo; } - private void reconnectToRfid(final String copyChannelInfo, boolean forceConnect) throws InterruptedException { + private void reconnect(final String copyChannelInfo, boolean forceConnect) throws InterruptedException { Thread.sleep(5000); Thread thread = new Thread(() -> { try { @@ -112,31 +109,6 @@ public class CaliperDaemonTask implements Runnable { thread.start(); } - public void forceReconnectToRfid() { - String channelInfo = String.format("channel:%s, active:%s,open:%s,writable:%s", caliperClient.getChannel(), - caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isActive(), - caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isOpen(), - caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isWritable()); - try { - Thread.sleep(500); - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try { - log.info("Caliper触发重连强制:{}....{}:{} {}", true, caliperClient.getIp(), caliperClient.getPort(), - channelInfo); - caliperClient.reconnect(); - } catch (Exception e) { - log.info(e.getMessage(), e); - } - } - }); - thread.start(); - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - private void sendAlarmToPlatform(String detail) { if (startVo.getServiceProvider() != null && startVo.getGatewayBase() != null) { AlarmService alarmService = (AlarmService) startVo.getServiceProvider().getByName(ServiceName.Alarm); @@ -152,7 +124,7 @@ public class CaliperDaemonTask implements Runnable { alarmDto.setUpper(Boolean.TRUE); alarmDto.setTime(new Date()); - alarmService.raiseAlarm(Arrays.asList(alarmDto)); + alarmService.raiseAlarm(Collections.singletonList(alarmDto)); } } } diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperFrameDecoder.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperFrameDecoder.java index d9c9534..79c8840 100644 --- a/src/main/java/com/cisdi/data/caliper/gateway/CaliperFrameDecoder.java +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperFrameDecoder.java @@ -8,10 +8,7 @@ import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; /** * @Description: 获取完整的报文 @@ -20,14 +17,12 @@ import java.util.Set; */ @Slf4j public class CaliperFrameDecoder extends ByteToMessageDecoder { - private static final Logger logger = LoggerFactory.getLogger(CaliperFrameDecoder.class); private CaliperTaskStartVo startVo; private final static Integer MIN_LENGTH = 10; private final byte START_SPLIT = 0x02; private final byte END_SPLIT = 0x03; - private final Set validDataSizeInByte = new HashSet(Arrays.asList(10)); public CaliperFrameDecoder(CaliperTaskStartVo startVo) { this.startVo = startVo; diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperItemVo.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperItemVo.java index 9f29ab0..48afe3b 100644 --- a/src/main/java/com/cisdi/data/caliper/gateway/CaliperItemVo.java +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperItemVo.java @@ -1,6 +1,9 @@ package com.cisdi.data.caliper.gateway; import com.alibaba.fastjson.JSON; +import lombok.Data; +import lombok.Getter; +import lombok.Setter; import java.io.Serializable; @@ -10,6 +13,8 @@ import java.io.Serializable; * @author cup * @date 2021/03/03 */ +@Setter +@Getter public class CaliperItemVo implements Serializable { private static final long serialVersionUID = -4290553035349526464L; /** @@ -27,30 +32,6 @@ public class CaliperItemVo implements Serializable { */ private String code; - public String getServerIp() { - return serverIp; - } - - public void setServerIp(String serverIp) { - this.serverIp = serverIp; - } - - public Integer getServerPort() { - return serverPort; - } - - public void setServerPort(Integer serverPort) { - this.serverPort = serverPort; - } - - public String getCode() { - return code; - } - - public void setCode(String code) { - this.code = code; - } - @Override public String toString() { return JSON.toJSONString(this); diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperTaskStartVo.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperTaskStartVo.java index c9e8537..f2e4fe3 100644 --- a/src/main/java/com/cisdi/data/caliper/gateway/CaliperTaskStartVo.java +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperTaskStartVo.java @@ -3,10 +3,15 @@ package com.cisdi.data.caliper.gateway; import com.cisdi.data.sdk.gateway.base.PulledSocketGatewayBase; import com.cisdi.data.sdk.param.PullSocketParam; import com.cisdi.data.sdk.service.ServiceProvider; +import lombok.Getter; +import lombok.Setter; import java.util.concurrent.atomic.AtomicBoolean; +@Setter +@Getter public class CaliperTaskStartVo { + private AtomicBoolean shouldRun; private ServiceProvider serviceProvider; private String deviceId; @@ -14,49 +19,4 @@ public class CaliperTaskStartVo { private PullSocketParam socketParam; private CaliperCustomParam customParam; private CaliperDaemonTask caliperDaemonTask; - - public AtomicBoolean getShouldRun() { - return shouldRun; - } - public void setShouldRun(AtomicBoolean shouldRun) { - this.shouldRun = shouldRun; - } - public ServiceProvider getServiceProvider() { - return serviceProvider; - } - public void setServiceProvider(ServiceProvider serviceProvider) { - this.serviceProvider = serviceProvider; - } - public String getDeviceId() { - return deviceId; - } - public void setDeviceId(String deviceId) { - this.deviceId = deviceId; - } - public PulledSocketGatewayBase getGatewayBase() { - return gatewayBase; - } - public void setGatewayBase(PulledSocketGatewayBase gatewayBase) { - this.gatewayBase = gatewayBase; - } - public PullSocketParam getSocketParam() { - return socketParam; - } - public void setSocketParam(PullSocketParam socketParam) { - this.socketParam = socketParam; - } - public CaliperCustomParam getCustomParam() { - return customParam; - } - public void setCustomParam(CaliperCustomParam customParam) { - this.customParam = customParam; - } - - public CaliperDaemonTask getRfidDaemonTask() { - return caliperDaemonTask; - } - - public void setRfidDaemonTask(CaliperDaemonTask caliperDaemonTask) { - this.caliperDaemonTask = caliperDaemonTask; - } } diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperVoDecoder.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperVoDecoder.java index d17cf90..edd9026 100644 --- a/src/main/java/com/cisdi/data/caliper/gateway/CaliperVoDecoder.java +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperVoDecoder.java @@ -10,23 +10,39 @@ import io.netty.util.ReferenceCountUtil; import java.util.ArrayList; import java.util.List; +/** + * 采用TCP/IP通信,本机做客户端,连上服务器后主动上报采集数据,数据采用大端单精度浮点数上传。具体如下: + * 上传数据: f5 f5 f5 02 3f 9e 04 19 40 0f 02 0c f6 f6 f6 + *

+ * 起始帧 0xF5 0xF5 0xF5 + * 数据长度(字节) 0x08 + * 宽度(直径1.2345mm) 0x3F/63 0x9E/158 0x04/4 0x19/25 + * 厚度(直径2.2345mm) 0x40 0x0F 0x02 0x0C + * 校验码0x57 数据之和取低八位数据 0x3f+0x9e+0x04+0x19+0x40+0x0f+0x02+0x0c=0x157 + * 结束帧 0xF6 0xF6 0xF6 + * + * 3F 9E 04 19 + * + * @className: CaliperVoDecoder + * @description: CaliperVo解码器 + * @author: zhang.zhipeng + * @date: 2024/9/11 8:53 + */ public class CaliperVoDecoder { public static List Decode(ByteBuf in) { - CaliperItemVo vo = new CaliperItemVo(); List result = new ArrayList<>(); //消息体部分 int length; if (in == null) { throw new BusinessException("ByteBuf 入参为null"); - } else if (in.readableBytes() == 10) { + } else if (in.readableBytes() == 14) {//起始帧3位,结束帧3位,中间8位 length = 8; - } - else { - throw new BusinessException("报文长度不匹配 ,期望:10,实际:" + in.readableBytes()); + } else { + throw new BusinessException("报文长度不匹配 ,期望:14,实际:" + in.readableBytes()); } //去掉报文头 和 结束符 - ByteBuf proceedIn = in.retainedSlice(1, length); + ByteBuf proceedIn = in.retainedSlice(3, length); result.add(split(proceedIn)); ReferenceCountUtil.release(proceedIn); return result; @@ -36,8 +52,8 @@ public class CaliperVoDecoder { /** * @param in 去掉头尾的报文 * @Description: 拆分报文 - * @author: Mo - * @Date: 2021/6/2 10:51 + * @author: zhang.zhipeng + * @Date: 2024/9/11 8:53 * @return: com.cisdi.data.RFID.gateway.RfidItemVo */ public static CaliperItemVo split(ByteBuf in) { @@ -64,9 +80,8 @@ public class CaliperVoDecoder { // logger.info("vo1={}", decoderVo); //标签读取 - - - + + byte[] readBytes1 = HexUtil.hexStringToBytes("02524630313030303103"); ByteBuf copiedBuff = Unpooled.copiedBuffer(readBytes1); List caliperItemVoList = CaliperVoDecoder.Decode(copiedBuff); @@ -82,6 +97,6 @@ public class CaliperVoDecoder { readBytes1 = HexUtil.hexStringToBytes("0248656172744265617403"); copiedBuff = Unpooled.copiedBuffer(readBytes1); decoder.decode(null, copiedBuff, new ArrayList()); - + } }