diff --git a/src/main/java/com/cisdi/data/caliper/CaliperGateway.java b/src/main/java/com/cisdi/data/caliper/CaliperGateway.java new file mode 100644 index 0000000..3da40fb --- /dev/null +++ b/src/main/java/com/cisdi/data/caliper/CaliperGateway.java @@ -0,0 +1,182 @@ +package com.cisdi.data.caliper; + +import com.alibaba.fastjson.JSON; +import com.cisdi.data.caliper.gateway.CaliperClient; +import com.cisdi.data.caliper.gateway.CaliperCustomParam; +import com.cisdi.data.caliper.gateway.CaliperDaemonTask; +import com.cisdi.data.caliper.gateway.CaliperTaskStartVo; +import com.cisdi.data.common.exception.BusinessException; +import com.cisdi.data.common.utils.Strings; +import com.cisdi.data.sdk.consts.ServiceName; +import com.cisdi.data.sdk.enums.GatewayState; +import com.cisdi.data.sdk.gateway.base.PulledSocketGatewayBase; +import com.cisdi.data.sdk.param.PullSocketParam; +import com.cisdi.data.sdk.procotol.message.SocketReturnMessage; +import com.cisdi.data.sdk.service.RouteService; +import com.cisdi.data.sdk.vo.DeviceVo; +import com.cisdi.data.sdk.vo.ExeResultVo; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** + * 采用TCP/IP通信,本机做客户端,连上服务器后主动上报采集数据,数据采用大端单精度浮点数上传。具体如下: + * 上传数据: f5 f5 f5 02 3f 9e 04 19 40 0f 02 0c f6 f6 f6 + *

+ * 起始帧 0xF5 0xF5 0xF5 + * 数据长度(字节) 0x08 + * 宽度(直径1.2345mm) 0x3F 0x9E 0x04 0x19 + * 厚度(直径2.2345mm) 0x40 0x0F 0x02 0x0C + * 校验码0x57 数据之和取低八位数据 0x3f+0x9e+0x04+0x19+0x40+0x0f+0x02+0x0c=0x157 + * 结束帧 0xF6 0xF6 0xF6 + * + * @className: CaliperGateway + * @description: TODO + * @author: zhang.zhipeng + * @date: 2024/9/10 15:19 + */ +@Slf4j +public class CaliperGateway extends PulledSocketGatewayBase { + + private String deviceId; + private AtomicBoolean shouldRun = null; + private static final int Min_Pull_Internal = 1; // 允许最小的拉取间隔为1毫秒 + private static final int Min_Timeout = 1000; // 允许最小的超时时间间隔,为1000毫秒 + + @Override + public ExeResultVo sendReturnMessage(SocketReturnMessage returnMsg) { + throw new BusinessException("Caliper协议不支持下发命令。"); + } + + @Override + public void start() { + if (state == GatewayState.RUNNING) { + return; + } + + try { + startInternal(); + state = GatewayState.RUNNING; + log.info("Caliper socket拉取网关:{}启动成功", instanceVo.getRunId()); + } catch (Exception e) { + log.error("Caliper socket拉取网关:{}启动失败", instanceVo.getRunId()); + log.error(e.getLocalizedMessage(), e); + } + } + + private void startInternal() { + socketParam = JSON.parseObject(getInstanceVo().getParameter(), PullSocketParam.class); + + if (StringUtils.isEmpty(socketParam.getPeerIp()) || socketParam.getPeerPort() == null) { + throw new BusinessException("未正确配置拉取对端ip地址和对端端口。"); + } + + if (socketParam.getPullInternal() == null) { + throw new BusinessException("未正确配置拉取间隔"); + } + + if (socketParam.getTimeout() == null) { + throw new BusinessException("未正确配置拉取超时时间"); + } + + socketParam.setPullInternal(socketParam.getPullInternal() < Min_Pull_Internal ? Min_Pull_Internal : socketParam.getPullInternal()); + + socketParam.setTimeout(socketParam.getTimeout() < Min_Timeout ? Min_Timeout : socketParam.getTimeout()); + + RouteService routeService = (RouteService) serviceProvider.getByName(ServiceName.Route); + List deviceVos = routeService.findByRunId(instanceVo.getRunId()); + + if (deviceVos.size() > 1) { + throw new BusinessException("网关Id:" + instanceVo.getRunId() + "为Caliper socket拉取网关,只允许关联一个设备Id"); + } + + deviceId = !deviceVos.isEmpty() ? deviceVos.get(0).getDeviceId() : null; + + if (StringUtils.isEmpty(deviceId)) { + throw new BusinessException("runId=" + getInstanceVo().getRunId() + "未正确配置网关和设备关联"); + } + + log.info("Caliper socket拉取网关:{}读取启动配置参数:{}", getInstanceVo().getRunId(), socketParam); + + closePrevState(); + shouldRun = new AtomicBoolean(true); + + String parameter = socketParam.getOtherParameter(); + Set targetServerSet = new HashSet<>(); + targetServerSet.add(String.format("%s:%s", socketParam.getPeerIp(), socketParam.getPeerPort())); + CaliperCustomParam customParam = null; + if (parameter != null) { + customParam = JSON.parseObject(parameter, CaliperCustomParam.class); + if (customParam != null) { + //Caliper服务端参数去重 + customParam.setServerList(customParam.getServerList().stream().distinct().collect(Collectors.toList())); + for (String server : customParam.getServerList()) { + targetServerSet.add(server.trim()); + } + } + } + + int index = 1; + for (String server : targetServerSet) { + // 复制已有参数 + PullSocketParam serverParam = JSON.parseObject(JSON.toJSONString(socketParam), PullSocketParam.class); + + serverParam.setPeerIp(Strings.split(server, ":")[0]); + serverParam.setPeerPort(Integer.valueOf(Strings.split(server, ":")[1])); + + // 启动线程 + CaliperTaskStartVo startVo = new CaliperTaskStartVo(); + startVo.setDeviceId(deviceId); + startVo.setGatewayBase(this); + startVo.setServiceProvider(serviceProvider); + startVo.setSocketParam(serverParam); + startVo.setShouldRun(shouldRun); + startVo.setCustomParam(customParam); + + CaliperClient fireClient = new CaliperClient(startVo); + CaliperDaemonTask daemonTask = new CaliperDaemonTask(fireClient, startVo); + startVo.setRfidDaemonTask(daemonTask); + Thread thread = new Thread(daemonTask, "Fire-Daemon-Task-runid-" + instanceVo.getRunId() + "-s" + index++); + thread.start(); + } + + + state = GatewayState.RUNNING; + } + + @Override + public void shutdown() { + if (state == GatewayState.CLOSED) { + return; + } + + closePrevState(); + + log.info("Caliper socket拉取网关:{}关闭中", getInstanceVo().getRunId()); + + state = GatewayState.CLOSED; + + log.info("Caliper socket拉取网关:{}关闭成功", getInstanceVo().getRunId()); + } + + private void closePrevState() { + if (shouldRun != null) { + shouldRun.set(false); + shouldRun = null; + } + } + + @Override + public Set getActiveDeviceIds() { + Set set = new HashSet(); + if (deviceId != null) { + set.add(deviceId); + } + return set; + } +} diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperClient.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperClient.java new file mode 100644 index 0000000..da560a5 --- /dev/null +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperClient.java @@ -0,0 +1,125 @@ +package com.cisdi.data.caliper.gateway; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; + +@Slf4j +public class CaliperClient { + @Setter + @Getter + private volatile Channel channel = null; + @Setter + @Getter + private Long lastHeartbeatTime = System.currentTimeMillis(); + private EventLoopGroup workerGroup = null; + private final CaliperTaskStartVo startVo; + + public CaliperClient(CaliperTaskStartVo startVo) { + this.startVo = startVo; + + if (Epoll.isAvailable()) { + log.info("Caliper enable epoll and have tcp SO_KEEPALIVE check"); + } else { + log.info("Caliper disable epoll and not have custom SO_KEEPALIVE check"); + } + } + + public boolean close() { + boolean result = false; + if (workerGroup == null && channel == null) { + return true; + } + + if (channel != null) { + try { + channel.close().sync(); + channel = null; + result = true; + } catch (Exception e) { + log.error(e.getLocalizedMessage(), e); + } + } + + if (workerGroup != null) { + try { + workerGroup.shutdownGracefully().sync(); + workerGroup = null; + result = true; + } catch (InterruptedException e) { + log.error("Caliper到 {} : {} 的连接关闭失败", + startVo.getSocketParam().getPeerIp(), startVo.getSocketParam().getPeerPort(), e); + } + } + + if (channel != null || workerGroup != null) { + log.info("channel:{},workgoup:{}", channel, workerGroup); + } + + return result; + } + + public void run() throws IOException { + start(); + } + + private void start() { + lastHeartbeatTime = System.currentTimeMillis(); + if (channel != null) { + close(); + } + + //设置一个多线程循环器 + //启动附注类 + Bootstrap bootstrap = new Bootstrap(); + bootstrap.option(ChannelOption.SO_KEEPALIVE, true); + + if (Epoll.isAvailable()) { + bootstrap.channel(EpollSocketChannel.class); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + bootstrap.option(EpollChannelOption.TCP_KEEPIDLE, 10); + bootstrap.option(EpollChannelOption.TCP_KEEPINTVL, 5); + bootstrap.option(EpollChannelOption.TCP_KEEPCNT, 2); + workerGroup = new EpollEventLoopGroup(2); + } else { + workerGroup = new NioEventLoopGroup(2); + bootstrap.channel(NioSocketChannel.class); + } + + bootstrap.group(workerGroup); + //指定所使用的NIO传输channel + //指定客户端初始化处理 + bootstrap.handler(new CaliperClientInitializer(startVo, this)); + try { + channel = bootstrap.connect(startVo.getSocketParam().getPeerIp(), + startVo.getSocketParam().getPeerPort()).sync().channel(); + //channel.closeFuture().sync(); // 如果会话关闭,会退出当前线程 + } catch (InterruptedException e) { + log.warn(e.getMessage(), e); + } + } + + public void reconnect() { + log.info("Caliper重新连接中......{}:{}", getIp(), getPort()); + close(); + start(); + } + + public String getIp() { + return startVo.getSocketParam().getPeerIp(); + } + + public int getPort() { + return startVo.getSocketParam().getPeerPort(); + } + +} diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientHandler.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientHandler.java new file mode 100644 index 0000000..5d4a60d --- /dev/null +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientHandler.java @@ -0,0 +1,109 @@ +package com.cisdi.data.caliper.gateway; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import com.cisdi.data.sdk.consts.ServiceName; +import com.cisdi.data.sdk.gateway.message.SocketMessage; +import com.cisdi.data.sdk.service.SendService; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CaliperClientHandler extends SimpleChannelInboundHandler { + + private static final Logger logger = LoggerFactory.getLogger(CaliperClientHandler.class); + private static final Charset Encode_Charset = Charset.forName("UTF-8"); + private static final String HEARTBEAT_CODE = "HeartBea"; + + private CaliperClient caliperClient; + + private CaliperTaskStartVo startVo; + + public CaliperClientHandler(CaliperTaskStartVo startVo, CaliperClient caliperClient) { + this.startVo = startVo; + this.caliperClient = caliperClient; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { + + try { + List decode = CaliperVoDecoder.Decode(msg); + if(decode.size() == 1 && HEARTBEAT_CODE.equalsIgnoreCase(decode.get(0).getCode())) { + logger.info("Caliper {},启用心跳:{} , 心跳 vo:{}", caliperClient.getChannel(), + startVo.getCustomParam() == null ? false : startVo.getCustomParam().getEnableHeartbeat(), + decode); + caliperClient.setLastHeartbeatTime(System.currentTimeMillis()); + return; + } + + decode.forEach((CaliperItemVo i) -> { + i.setServerIp(startVo.getSocketParam().getPeerIp()); + i.setServerPort(startVo.getSocketParam().getPeerPort()); + }); + + + if (startVo.getGatewayBase() != null && Boolean.TRUE.equals(startVo.getGatewayBase().getInstanceVo().getLogOpen())) { + logger.info("Caliper {} vo:{} ", caliperClient.getChannel(), decode); + } + + if (startVo.getGatewayBase() != null && startVo.getServiceProvider() != null) { + Map data = new HashMap<>(); + SendService service = (SendService) startVo.getServiceProvider().getByName(ServiceName.Send); + SocketMessage socketMessage = startVo.getGatewayBase().buildSocketMessage(); + socketMessage.setDeviceId(startVo.getDeviceId()); + socketMessage.setProviderCode(null); + socketMessage.setMsgKey("666"); + data.put("data", decode); + String jsonString = JSON.toJSONString(data, SerializerFeature.WriteMapNullValue); + byte[] body = jsonString.getBytes(Encode_Charset); + socketMessage.setData(body); + logger.info("socketMess" + socketMessage.toString()); + service.sendMessage(socketMessage); + } else { + logger.info("Caliper收到消息{}", decode); + } + } catch (Exception e) { + logger.error(e.getLocalizedMessage(), e); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + logger.info("{} channelActive active", ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + logger.info("{} channelInactive inactive", ctx.channel()); + + try { + caliperClient.setChannel(null); + logger.info("set channel to null raise reconnect"); + } finally { + // 释放引用 + caliperClient = null; + } + + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { + logger.error("exceptionCaught:{}", t); + super.exceptionCaught(ctx, t); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + logger.error("userEventTriggered:{}", evt); + super.userEventTriggered(ctx, evt); + } +} diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientInitializer.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientInitializer.java new file mode 100644 index 0000000..4d4b16d --- /dev/null +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperClientInitializer.java @@ -0,0 +1,24 @@ +package com.cisdi.data.caliper.gateway; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; + +public class CaliperClientInitializer extends ChannelInitializer { + + private final CaliperClient caliperClient; + private final CaliperTaskStartVo startVo; + + public CaliperClientInitializer(CaliperTaskStartVo startVo, CaliperClient caliperClient) { + this.startVo = startVo; + this.caliperClient = caliperClient; + } + + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + //注册管道 + ChannelPipeline pipeline = socketChannel.pipeline(); + pipeline.addLast(new CaliperFrameDecoder(startVo)); + pipeline.addLast("chat", new CaliperClientHandler(startVo, caliperClient)); + } +} diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperCustomParam.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperCustomParam.java new file mode 100644 index 0000000..d6e0811 --- /dev/null +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperCustomParam.java @@ -0,0 +1,81 @@ +package com.cisdi.data.caliper.gateway; + +import com.alibaba.fastjson.JSON; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Rfid server有多个,所以需要支持配置多个服务端地址的 + * @author cup + * @date 2021/03/03 + */ +public class CaliperCustomParam implements Serializable { + private static final long serialVersionUID = -5017472517219960138L; + + /** + * 每个String 如 192.168.1.5:8823 形式 + */ + private List serverList = new ArrayList(); + + private Boolean enableHeartbeat = Boolean.FALSE; + + private Integer heartbeatTimeInMs = 5000; + private Integer heartbeatMaxLostCount = 3; + + public List getServerList() { + return serverList; + } + + public void setServerList(List serverList) { + this.serverList = serverList; + } + + public Boolean getEnableHeartbeat() { + return enableHeartbeat; + } + + public void setEnableHeartbeat(Boolean enableHeartbeat) { + this.enableHeartbeat = enableHeartbeat; + } + + public Integer getHeartbeatTimeInMs() { + return heartbeatTimeInMs; + } + + public void setHeartbeatTimeInMs(Integer heartbeatTimeInMs) { + this.heartbeatTimeInMs = heartbeatTimeInMs; + } + + public Integer getHeartbeatMaxLostCount() { + return heartbeatMaxLostCount; + } + + public void setHeartbeatMaxLostCount(Integer heartbeatMaxLostCount) { + this.heartbeatMaxLostCount = heartbeatMaxLostCount; + } + + @Override + public String toString() { + return JSON.toJSONString(this); + } + + public static void main(String[] args){ + CaliperCustomParam dto = new CaliperCustomParam(); + dto.getServerList().add("192.168.6.55:10000"); + dto.getServerList().add("192.168.6.56:10000"); + dto.getServerList().add("192.165.20.111:10000"); + dto.getServerList().add("192.165.20.112:10000"); + dto.getServerList().add("192.165.20.113:10000"); + dto.getServerList().add("192.165.20.119:10000"); + dto.getServerList().add("192.165.20.118:10000"); + dto.getServerList().add("192.165.20.120:10000"); + dto.getServerList().add("192.165.20.121:10000"); + dto.getServerList().add("192.165.20.121:10000"); + List customParams=dto.getServerList().stream().distinct().collect(Collectors.toList()); + System.out.println(customParams); + System.out.println(dto); + } +} diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperDaemonTask.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperDaemonTask.java new file mode 100644 index 0000000..59c4f29 --- /dev/null +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperDaemonTask.java @@ -0,0 +1,159 @@ +package com.cisdi.data.caliper.gateway; + +import com.cisdi.data.common.dto.AlarmDto; +import com.cisdi.data.sdk.consts.ServiceName; +import com.cisdi.data.sdk.service.AlarmService; +import lombok.extern.slf4j.Slf4j; + +import java.util.Arrays; +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; + +@Slf4j +public class CaliperDaemonTask implements Runnable { + + private CaliperClient caliperClient; + private final CaliperTaskStartVo startVo; + + public CaliperDaemonTask(CaliperClient caliperClient, CaliperTaskStartVo startVo) { + this.caliperClient = caliperClient; + this.startVo = startVo; + } + + @Override + public void run() { + final String[] channelInfo = {null}; + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + channelInfo[0] = startChannel(); + if (!startVo.getShouldRun().get()) { + timer.cancel(); + } + } + }, 0, startVo.getSocketParam().getPullInternal()); + if (caliperClient != null) { + caliperClient.close(); + caliperClient = null; + } + log.info("退出Caliper拉取连接线程 {}", channelInfo[0]); + } + + private String startChannel() { + String channelInfo = String.format("channel:%s, active:%s,open:%s,writable:%s", caliperClient.getChannel(), + caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isActive(), + caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isOpen(), + caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isWritable()); + try { + if (caliperClient != null && caliperClient.getChannel() != null + && caliperClient.getChannel().isActive()) { + if (startVo.getGatewayBase() != null + && Boolean.TRUE.equals(startVo.getGatewayBase().getInstanceVo().getLogOpen())) { + log.info("当前Caliper拉取连接通道处于连接状态,连接id:{},连接信息:{}", + startVo.getGatewayBase().getInstanceVo().getRunId(), channelInfo); + } + + if (startVo.getCustomParam() != null + && Boolean.TRUE.equals(startVo.getCustomParam().getEnableHeartbeat()) + && Math.abs(System.currentTimeMillis() - caliperClient.getLastHeartbeatTime()) > + (long) startVo.getCustomParam().getHeartbeatMaxLostCount() * startVo.getCustomParam().getHeartbeatTimeInMs()) { + log.warn("当前Caliper拉取连接通道心跳启用,且心跳超时,触发重连,连接id:{},连接信息:{}", + startVo.getGatewayBase().getInstanceVo().getRunId(), channelInfo); + //启用了心跳,且心跳时间超时 + reconnectToRfid(channelInfo, true); + } + } else { + assert caliperClient != null; + if (caliperClient.getChannel() == null) { + String errorMsg = String.format("Caliper到%s:%s的连接未建立或已关闭, %s", caliperClient.getIp(), + caliperClient.getPort(), channelInfo); + log.warn(errorMsg); + sendAlarmToPlatform(errorMsg); + reconnectToRfid(channelInfo, false); + } else { + String errorMsg = String.format("Caliper到%s:%s的连接未建立或已关闭, %s", caliperClient.getIp(), + caliperClient.getPort(), channelInfo); + log.warn(errorMsg); + + sendAlarmToPlatform(errorMsg); + + if (startVo.getCustomParam() != null + && Boolean.TRUE.equals(startVo.getCustomParam().getEnableHeartbeat()) + && Math.abs(System.currentTimeMillis() - caliperClient.getLastHeartbeatTime()) > + (long) startVo.getCustomParam().getHeartbeatMaxLostCount() * startVo.getCustomParam().getHeartbeatTimeInMs()) { + log.warn("当前Caliper拉取连接通道心跳启用,且心跳超时,触发重连,连接id:{},连接信息:{}", + startVo.getGatewayBase().getInstanceVo().getRunId(), channelInfo); + //启用了心跳,且心跳时间超时 + reconnectToRfid(channelInfo, true); + } + } + } + } catch (Exception e) { + log.warn(e.getMessage(), e); + } + return channelInfo; + } + + private void reconnectToRfid(final String copyChannelInfo, boolean forceConnect) throws InterruptedException { + Thread.sleep(5000); + Thread thread = new Thread(() -> { + try { + log.info("Caliper触发重连强制:{}....{}:{} {}", forceConnect, caliperClient.getIp(), caliperClient.getPort(), + copyChannelInfo); + if (caliperClient.getChannel() == null || forceConnect) { + caliperClient.reconnect(); + } + } catch (Exception e) { + log.info(e.getMessage(), e); + } + }); + thread.start(); + } + + public void forceReconnectToRfid() { + String channelInfo = String.format("channel:%s, active:%s,open:%s,writable:%s", caliperClient.getChannel(), + caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isActive(), + caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isOpen(), + caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isWritable()); + try { + Thread.sleep(500); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + log.info("Caliper触发重连强制:{}....{}:{} {}", true, caliperClient.getIp(), caliperClient.getPort(), + channelInfo); + caliperClient.reconnect(); + } catch (Exception e) { + log.info(e.getMessage(), e); + } + } + }); + thread.start(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + + private void sendAlarmToPlatform(String detail) { + if (startVo.getServiceProvider() != null && startVo.getGatewayBase() != null) { + AlarmService alarmService = (AlarmService) startVo.getServiceProvider().getByName(ServiceName.Alarm); + if (alarmService != null) { + AlarmDto alarmDto = new AlarmDto(); + + alarmDto.setAlarmType(AlarmDto.Connect_Fail_Alarm); + alarmDto.setDetail(detail); + alarmDto.setDeviceId(startVo.getDeviceId()); + alarmDto.setGatewayId(startVo.getGatewayBase().getInstanceVo().getRunId()); + alarmDto.setGatewayName(startVo.getGatewayBase().getInstanceVo().getName()); + alarmDto.setMsgKey(null); + alarmDto.setUpper(Boolean.TRUE); + alarmDto.setTime(new Date()); + + alarmService.raiseAlarm(Arrays.asList(alarmDto)); + } + } + } +} diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperFrameDecoder.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperFrameDecoder.java new file mode 100644 index 0000000..d9c9534 --- /dev/null +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperFrameDecoder.java @@ -0,0 +1,64 @@ +package com.cisdi.data.caliper.gateway; + +import com.cisdi.data.common.exception.BusinessException; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * @Description: 获取完整的报文 + * @Creator: Mo + * @Date: 2021/6/2 23:37 + */ +@Slf4j +public class CaliperFrameDecoder extends ByteToMessageDecoder { + private static final Logger logger = LoggerFactory.getLogger(CaliperFrameDecoder.class); + + private CaliperTaskStartVo startVo; + + private final static Integer MIN_LENGTH = 10; + private final byte START_SPLIT = 0x02; + private final byte END_SPLIT = 0x03; + private final Set validDataSizeInByte = new HashSet(Arrays.asList(10)); + + public CaliperFrameDecoder(CaliperTaskStartVo startVo) { + this.startVo = startVo; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List out) throws Exception { + if (bufferIn.readableBytes() < MIN_LENGTH) { + return; + } + String channelInfo = ctx.toString(); + + // 心跳报文为 02 48 65 61 72 74 42 65 61 03 长度10个字节,拥有独立的起始符和结束符,不会和数据报文共享起始符和结束符 + // 数据体长度固定为 8,心跳报文数据内容为 HeartBea + int beginIndex = bufferIn.readerIndex(); + + for (int i = 0; i + 9 < bufferIn.readableBytes(); i += 10) { + byte curByte = bufferIn.getByte(beginIndex + i); + if(curByte != START_SPLIT){ + ctx.close(); + throw new BusinessException(channelInfo + "完整报文第一个字节内容不正确,期望: 0x02,实际:" + curByte); + } + + byte endByte = bufferIn.getByte(beginIndex + i + 9); + if(endByte != END_SPLIT){ + ctx.close(); + throw new BusinessException(channelInfo + "完整报文最后一个字节内容不正确,期望: 0x03,实际:" + endByte); + } + + out.add(bufferIn.retainedSlice(beginIndex + i, 10)); + bufferIn.readerIndex(beginIndex + i + 10); + } + } +} diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperItemVo.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperItemVo.java new file mode 100644 index 0000000..9f29ab0 --- /dev/null +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperItemVo.java @@ -0,0 +1,58 @@ +package com.cisdi.data.caliper.gateway; + +import com.alibaba.fastjson.JSON; + +import java.io.Serializable; + +/** + * 读取到标签或标签离开,解析后的内容都一致 + * + * @author cup + * @date 2021/03/03 + */ +public class CaliperItemVo implements Serializable { + private static final long serialVersionUID = -4290553035349526464L; + /** + * rfid 服务器Ip + */ + private String serverIp; + + /** + * rfid 服务器端口 + */ + private Integer serverPort; + + /** + * 编号 + */ + private String code; + + public String getServerIp() { + return serverIp; + } + + public void setServerIp(String serverIp) { + this.serverIp = serverIp; + } + + public Integer getServerPort() { + return serverPort; + } + + public void setServerPort(Integer serverPort) { + this.serverPort = serverPort; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + @Override + public String toString() { + return JSON.toJSONString(this); + } +} diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperTaskStartVo.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperTaskStartVo.java new file mode 100644 index 0000000..c9e8537 --- /dev/null +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperTaskStartVo.java @@ -0,0 +1,62 @@ +package com.cisdi.data.caliper.gateway; + +import com.cisdi.data.sdk.gateway.base.PulledSocketGatewayBase; +import com.cisdi.data.sdk.param.PullSocketParam; +import com.cisdi.data.sdk.service.ServiceProvider; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class CaliperTaskStartVo { + private AtomicBoolean shouldRun; + private ServiceProvider serviceProvider; + private String deviceId; + private PulledSocketGatewayBase gatewayBase; + private PullSocketParam socketParam; + private CaliperCustomParam customParam; + private CaliperDaemonTask caliperDaemonTask; + + public AtomicBoolean getShouldRun() { + return shouldRun; + } + public void setShouldRun(AtomicBoolean shouldRun) { + this.shouldRun = shouldRun; + } + public ServiceProvider getServiceProvider() { + return serviceProvider; + } + public void setServiceProvider(ServiceProvider serviceProvider) { + this.serviceProvider = serviceProvider; + } + public String getDeviceId() { + return deviceId; + } + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + public PulledSocketGatewayBase getGatewayBase() { + return gatewayBase; + } + public void setGatewayBase(PulledSocketGatewayBase gatewayBase) { + this.gatewayBase = gatewayBase; + } + public PullSocketParam getSocketParam() { + return socketParam; + } + public void setSocketParam(PullSocketParam socketParam) { + this.socketParam = socketParam; + } + public CaliperCustomParam getCustomParam() { + return customParam; + } + public void setCustomParam(CaliperCustomParam customParam) { + this.customParam = customParam; + } + + public CaliperDaemonTask getRfidDaemonTask() { + return caliperDaemonTask; + } + + public void setRfidDaemonTask(CaliperDaemonTask caliperDaemonTask) { + this.caliperDaemonTask = caliperDaemonTask; + } +} diff --git a/src/main/java/com/cisdi/data/caliper/gateway/CaliperVoDecoder.java b/src/main/java/com/cisdi/data/caliper/gateway/CaliperVoDecoder.java new file mode 100644 index 0000000..d17cf90 --- /dev/null +++ b/src/main/java/com/cisdi/data/caliper/gateway/CaliperVoDecoder.java @@ -0,0 +1,87 @@ +package com.cisdi.data.caliper.gateway; + +import com.cisdi.data.common.exception.BusinessException; +import com.cisdi.data.util.HexUtil; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; + +import java.util.ArrayList; +import java.util.List; + +public class CaliperVoDecoder { + + public static List Decode(ByteBuf in) { + CaliperItemVo vo = new CaliperItemVo(); + List result = new ArrayList<>(); + //消息体部分 + int length; + if (in == null) { + throw new BusinessException("ByteBuf 入参为null"); + } else if (in.readableBytes() == 10) { + length = 8; + } + else { + throw new BusinessException("报文长度不匹配 ,期望:10,实际:" + in.readableBytes()); + } + //去掉报文头 和 结束符 + ByteBuf proceedIn = in.retainedSlice(1, length); + result.add(split(proceedIn)); + ReferenceCountUtil.release(proceedIn); + return result; + } + + + /** + * @param in 去掉头尾的报文 + * @Description: 拆分报文 + * @author: Mo + * @Date: 2021/6/2 10:51 + * @return: com.cisdi.data.RFID.gateway.RfidItemVo + */ + public static CaliperItemVo split(ByteBuf in) { + CaliperItemVo caliperItemVo = new CaliperItemVo(); + ByteBuf buf = in.readBytes(8); + // 编号 + String code = buf.toString(CharsetUtil.UTF_8); + caliperItemVo.setCode(code); + ReferenceCountUtil.release(buf); + return caliperItemVo; + } + + public static void main(String[] args) throws Exception { + // 标签读取 +// byte[] readBytes1 = HexUtil.hexStringToBytes("001c1952000b000e3000eabcdef01234567890003001000400000501"); +// ByteBuf copiedBuffer = Unpooled.copiedBuffer(readBytes1); +// RfidItemVo decoderVo = RfidVoDecoder.Decode(copiedBuffer); +// logger.info("vo1={}", decoderVo); +// +// // 标签离开 +// readBytes1 = HexUtil.hexStringToBytes("00161942050c000e3000eabcdef01234567890003001"); +// copiedBuffer = Unpooled.copiedBuffer(readBytes1); +// decoderVo = RfidVoDecoder.Decode(copiedBuffer); +// logger.info("vo1={}", decoderVo); + + //标签读取 + + + + byte[] readBytes1 = HexUtil.hexStringToBytes("02524630313030303103"); + ByteBuf copiedBuff = Unpooled.copiedBuffer(readBytes1); + List caliperItemVoList = CaliperVoDecoder.Decode(copiedBuff); + caliperItemVoList.forEach(System.out::println); + + + readBytes1 = HexUtil.hexStringToBytes("0248656172744265617403"); + copiedBuff = Unpooled.copiedBuffer(readBytes1); + caliperItemVoList = CaliperVoDecoder.Decode(copiedBuff); + caliperItemVoList.forEach(System.out::println); + + CaliperFrameDecoder decoder = new CaliperFrameDecoder(null); + readBytes1 = HexUtil.hexStringToBytes("0248656172744265617403"); + copiedBuff = Unpooled.copiedBuffer(readBytes1); + decoder.decode(null, copiedBuff, new ArrayList()); + + } +}