init caliper

This commit is contained in:
zzp 2024-09-10 17:18:03 +08:00
parent a1adfd9cbe
commit 94b164062b
10 changed files with 951 additions and 0 deletions

View File

@ -0,0 +1,182 @@
package com.cisdi.data.caliper;
import com.alibaba.fastjson.JSON;
import com.cisdi.data.caliper.gateway.CaliperClient;
import com.cisdi.data.caliper.gateway.CaliperCustomParam;
import com.cisdi.data.caliper.gateway.CaliperDaemonTask;
import com.cisdi.data.caliper.gateway.CaliperTaskStartVo;
import com.cisdi.data.common.exception.BusinessException;
import com.cisdi.data.common.utils.Strings;
import com.cisdi.data.sdk.consts.ServiceName;
import com.cisdi.data.sdk.enums.GatewayState;
import com.cisdi.data.sdk.gateway.base.PulledSocketGatewayBase;
import com.cisdi.data.sdk.param.PullSocketParam;
import com.cisdi.data.sdk.procotol.message.SocketReturnMessage;
import com.cisdi.data.sdk.service.RouteService;
import com.cisdi.data.sdk.vo.DeviceVo;
import com.cisdi.data.sdk.vo.ExeResultVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* 采用TCP/IP通信本机做客户端连上服务器后主动上报采集数据数据采用大端单精度浮点数上传具体如下
* 上传数据: f5 f5 f5 02 3f 9e 04 19 40 0f 02 0c f6 f6 f6
* <p>
* 起始帧 0xF5 0xF5 0xF5
* 数据长度(字节) 0x08
* 宽度(直径1.2345mm) 0x3F 0x9E 0x04 0x19
* 厚度(直径2.2345mm) 0x40 0x0F 0x02 0x0C
* 校验码0x57 数据之和取低八位数据 0x3f+0x9e+0x04+0x19+0x40+0x0f+0x02+0x0c=0x157
* 结束帧 0xF6 0xF6 0xF6
*
* @className: CaliperGateway
* @description: TODO
* @author: zhang.zhipeng
* @date: 2024/9/10 15:19
*/
@Slf4j
public class CaliperGateway extends PulledSocketGatewayBase {
private String deviceId;
private AtomicBoolean shouldRun = null;
private static final int Min_Pull_Internal = 1; // 允许最小的拉取间隔为1毫秒
private static final int Min_Timeout = 1000; // 允许最小的超时时间间隔为1000毫秒
@Override
public ExeResultVo sendReturnMessage(SocketReturnMessage returnMsg) {
throw new BusinessException("Caliper协议不支持下发命令。");
}
@Override
public void start() {
if (state == GatewayState.RUNNING) {
return;
}
try {
startInternal();
state = GatewayState.RUNNING;
log.info("Caliper socket拉取网关:{}启动成功", instanceVo.getRunId());
} catch (Exception e) {
log.error("Caliper socket拉取网关:{}启动失败", instanceVo.getRunId());
log.error(e.getLocalizedMessage(), e);
}
}
private void startInternal() {
socketParam = JSON.parseObject(getInstanceVo().getParameter(), PullSocketParam.class);
if (StringUtils.isEmpty(socketParam.getPeerIp()) || socketParam.getPeerPort() == null) {
throw new BusinessException("未正确配置拉取对端ip地址和对端端口。");
}
if (socketParam.getPullInternal() == null) {
throw new BusinessException("未正确配置拉取间隔");
}
if (socketParam.getTimeout() == null) {
throw new BusinessException("未正确配置拉取超时时间");
}
socketParam.setPullInternal(socketParam.getPullInternal() < Min_Pull_Internal ? Min_Pull_Internal : socketParam.getPullInternal());
socketParam.setTimeout(socketParam.getTimeout() < Min_Timeout ? Min_Timeout : socketParam.getTimeout());
RouteService routeService = (RouteService) serviceProvider.getByName(ServiceName.Route);
List<DeviceVo> deviceVos = routeService.findByRunId(instanceVo.getRunId());
if (deviceVos.size() > 1) {
throw new BusinessException("网关Id:" + instanceVo.getRunId() + "为Caliper socket拉取网关只允许关联一个设备Id");
}
deviceId = !deviceVos.isEmpty() ? deviceVos.get(0).getDeviceId() : null;
if (StringUtils.isEmpty(deviceId)) {
throw new BusinessException("runId=" + getInstanceVo().getRunId() + "未正确配置网关和设备关联");
}
log.info("Caliper socket拉取网关:{}读取启动配置参数:{}", getInstanceVo().getRunId(), socketParam);
closePrevState();
shouldRun = new AtomicBoolean(true);
String parameter = socketParam.getOtherParameter();
Set<String> targetServerSet = new HashSet<>();
targetServerSet.add(String.format("%s:%s", socketParam.getPeerIp(), socketParam.getPeerPort()));
CaliperCustomParam customParam = null;
if (parameter != null) {
customParam = JSON.parseObject(parameter, CaliperCustomParam.class);
if (customParam != null) {
//Caliper服务端参数去重
customParam.setServerList(customParam.getServerList().stream().distinct().collect(Collectors.toList()));
for (String server : customParam.getServerList()) {
targetServerSet.add(server.trim());
}
}
}
int index = 1;
for (String server : targetServerSet) {
// 复制已有参数
PullSocketParam serverParam = JSON.parseObject(JSON.toJSONString(socketParam), PullSocketParam.class);
serverParam.setPeerIp(Strings.split(server, ":")[0]);
serverParam.setPeerPort(Integer.valueOf(Strings.split(server, ":")[1]));
// 启动线程
CaliperTaskStartVo startVo = new CaliperTaskStartVo();
startVo.setDeviceId(deviceId);
startVo.setGatewayBase(this);
startVo.setServiceProvider(serviceProvider);
startVo.setSocketParam(serverParam);
startVo.setShouldRun(shouldRun);
startVo.setCustomParam(customParam);
CaliperClient fireClient = new CaliperClient(startVo);
CaliperDaemonTask daemonTask = new CaliperDaemonTask(fireClient, startVo);
startVo.setRfidDaemonTask(daemonTask);
Thread thread = new Thread(daemonTask, "Fire-Daemon-Task-runid-" + instanceVo.getRunId() + "-s" + index++);
thread.start();
}
state = GatewayState.RUNNING;
}
@Override
public void shutdown() {
if (state == GatewayState.CLOSED) {
return;
}
closePrevState();
log.info("Caliper socket拉取网关:{}关闭中", getInstanceVo().getRunId());
state = GatewayState.CLOSED;
log.info("Caliper socket拉取网关:{}关闭成功", getInstanceVo().getRunId());
}
private void closePrevState() {
if (shouldRun != null) {
shouldRun.set(false);
shouldRun = null;
}
}
@Override
public Set<String> getActiveDeviceIds() {
Set<String> set = new HashSet<String>();
if (deviceId != null) {
set.add(deviceId);
}
return set;
}
}

View File

@ -0,0 +1,125 @@
package com.cisdi.data.caliper.gateway;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@Slf4j
public class CaliperClient {
@Setter
@Getter
private volatile Channel channel = null;
@Setter
@Getter
private Long lastHeartbeatTime = System.currentTimeMillis();
private EventLoopGroup workerGroup = null;
private final CaliperTaskStartVo startVo;
public CaliperClient(CaliperTaskStartVo startVo) {
this.startVo = startVo;
if (Epoll.isAvailable()) {
log.info("Caliper enable epoll and have tcp SO_KEEPALIVE check");
} else {
log.info("Caliper disable epoll and not have custom SO_KEEPALIVE check");
}
}
public boolean close() {
boolean result = false;
if (workerGroup == null && channel == null) {
return true;
}
if (channel != null) {
try {
channel.close().sync();
channel = null;
result = true;
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
}
}
if (workerGroup != null) {
try {
workerGroup.shutdownGracefully().sync();
workerGroup = null;
result = true;
} catch (InterruptedException e) {
log.error("Caliper到 {} : {} 的连接关闭失败",
startVo.getSocketParam().getPeerIp(), startVo.getSocketParam().getPeerPort(), e);
}
}
if (channel != null || workerGroup != null) {
log.info("channel:{},workgoup:{}", channel, workerGroup);
}
return result;
}
public void run() throws IOException {
start();
}
private void start() {
lastHeartbeatTime = System.currentTimeMillis();
if (channel != null) {
close();
}
//设置一个多线程循环器
//启动附注类
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
if (Epoll.isAvailable()) {
bootstrap.channel(EpollSocketChannel.class);
bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED);
bootstrap.option(EpollChannelOption.TCP_KEEPIDLE, 10);
bootstrap.option(EpollChannelOption.TCP_KEEPINTVL, 5);
bootstrap.option(EpollChannelOption.TCP_KEEPCNT, 2);
workerGroup = new EpollEventLoopGroup(2);
} else {
workerGroup = new NioEventLoopGroup(2);
bootstrap.channel(NioSocketChannel.class);
}
bootstrap.group(workerGroup);
//指定所使用的NIO传输channel
//指定客户端初始化处理
bootstrap.handler(new CaliperClientInitializer(startVo, this));
try {
channel = bootstrap.connect(startVo.getSocketParam().getPeerIp(),
startVo.getSocketParam().getPeerPort()).sync().channel();
//channel.closeFuture().sync(); // 如果会话关闭会退出当前线程
} catch (InterruptedException e) {
log.warn(e.getMessage(), e);
}
}
public void reconnect() {
log.info("Caliper重新连接中......{}:{}", getIp(), getPort());
close();
start();
}
public String getIp() {
return startVo.getSocketParam().getPeerIp();
}
public int getPort() {
return startVo.getSocketParam().getPeerPort();
}
}

View File

@ -0,0 +1,109 @@
package com.cisdi.data.caliper.gateway;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.cisdi.data.sdk.consts.ServiceName;
import com.cisdi.data.sdk.gateway.message.SocketMessage;
import com.cisdi.data.sdk.service.SendService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CaliperClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger logger = LoggerFactory.getLogger(CaliperClientHandler.class);
private static final Charset Encode_Charset = Charset.forName("UTF-8");
private static final String HEARTBEAT_CODE = "HeartBea";
private CaliperClient caliperClient;
private CaliperTaskStartVo startVo;
public CaliperClientHandler(CaliperTaskStartVo startVo, CaliperClient caliperClient) {
this.startVo = startVo;
this.caliperClient = caliperClient;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
try {
List<CaliperItemVo> decode = CaliperVoDecoder.Decode(msg);
if(decode.size() == 1 && HEARTBEAT_CODE.equalsIgnoreCase(decode.get(0).getCode())) {
logger.info("Caliper {},启用心跳:{} 心跳 vo:{}", caliperClient.getChannel(),
startVo.getCustomParam() == null ? false : startVo.getCustomParam().getEnableHeartbeat(),
decode);
caliperClient.setLastHeartbeatTime(System.currentTimeMillis());
return;
}
decode.forEach((CaliperItemVo i) -> {
i.setServerIp(startVo.getSocketParam().getPeerIp());
i.setServerPort(startVo.getSocketParam().getPeerPort());
});
if (startVo.getGatewayBase() != null && Boolean.TRUE.equals(startVo.getGatewayBase().getInstanceVo().getLogOpen())) {
logger.info("Caliper {} vo:{} ", caliperClient.getChannel(), decode);
}
if (startVo.getGatewayBase() != null && startVo.getServiceProvider() != null) {
Map<String, Object> data = new HashMap<>();
SendService service = (SendService) startVo.getServiceProvider().getByName(ServiceName.Send);
SocketMessage socketMessage = startVo.getGatewayBase().buildSocketMessage();
socketMessage.setDeviceId(startVo.getDeviceId());
socketMessage.setProviderCode(null);
socketMessage.setMsgKey("666");
data.put("data", decode);
String jsonString = JSON.toJSONString(data, SerializerFeature.WriteMapNullValue);
byte[] body = jsonString.getBytes(Encode_Charset);
socketMessage.setData(body);
logger.info("socketMess" + socketMessage.toString());
service.sendMessage(socketMessage);
} else {
logger.info("Caliper收到消息{}", decode);
}
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("{} channelActive active", ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("{} channelInactive inactive", ctx.channel());
try {
caliperClient.setChannel(null);
logger.info("set channel to null raise reconnect");
} finally {
// 释放引用
caliperClient = null;
}
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception {
logger.error("exceptionCaught:{}", t);
super.exceptionCaught(ctx, t);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.error("userEventTriggered:{}", evt);
super.userEventTriggered(ctx, evt);
}
}

View File

@ -0,0 +1,24 @@
package com.cisdi.data.caliper.gateway;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class CaliperClientInitializer extends ChannelInitializer<SocketChannel> {
private final CaliperClient caliperClient;
private final CaliperTaskStartVo startVo;
public CaliperClientInitializer(CaliperTaskStartVo startVo, CaliperClient caliperClient) {
this.startVo = startVo;
this.caliperClient = caliperClient;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//注册管道
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new CaliperFrameDecoder(startVo));
pipeline.addLast("chat", new CaliperClientHandler(startVo, caliperClient));
}
}

View File

@ -0,0 +1,81 @@
package com.cisdi.data.caliper.gateway;
import com.alibaba.fastjson.JSON;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Rfid server有多个所以需要支持配置多个服务端地址的
* @author cup
* @date 2021/03/03
*/
public class CaliperCustomParam implements Serializable {
private static final long serialVersionUID = -5017472517219960138L;
/**
* 每个String 192.168.1.5:8823 形式
*/
private List<String> serverList = new ArrayList<String>();
private Boolean enableHeartbeat = Boolean.FALSE;
private Integer heartbeatTimeInMs = 5000;
private Integer heartbeatMaxLostCount = 3;
public List<String> getServerList() {
return serverList;
}
public void setServerList(List<String> serverList) {
this.serverList = serverList;
}
public Boolean getEnableHeartbeat() {
return enableHeartbeat;
}
public void setEnableHeartbeat(Boolean enableHeartbeat) {
this.enableHeartbeat = enableHeartbeat;
}
public Integer getHeartbeatTimeInMs() {
return heartbeatTimeInMs;
}
public void setHeartbeatTimeInMs(Integer heartbeatTimeInMs) {
this.heartbeatTimeInMs = heartbeatTimeInMs;
}
public Integer getHeartbeatMaxLostCount() {
return heartbeatMaxLostCount;
}
public void setHeartbeatMaxLostCount(Integer heartbeatMaxLostCount) {
this.heartbeatMaxLostCount = heartbeatMaxLostCount;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
public static void main(String[] args){
CaliperCustomParam dto = new CaliperCustomParam();
dto.getServerList().add("192.168.6.55:10000");
dto.getServerList().add("192.168.6.56:10000");
dto.getServerList().add("192.165.20.111:10000");
dto.getServerList().add("192.165.20.112:10000");
dto.getServerList().add("192.165.20.113:10000");
dto.getServerList().add("192.165.20.119:10000");
dto.getServerList().add("192.165.20.118:10000");
dto.getServerList().add("192.165.20.120:10000");
dto.getServerList().add("192.165.20.121:10000");
dto.getServerList().add("192.165.20.121:10000");
List<String> customParams=dto.getServerList().stream().distinct().collect(Collectors.toList());
System.out.println(customParams);
System.out.println(dto);
}
}

View File

@ -0,0 +1,159 @@
package com.cisdi.data.caliper.gateway;
import com.cisdi.data.common.dto.AlarmDto;
import com.cisdi.data.sdk.consts.ServiceName;
import com.cisdi.data.sdk.service.AlarmService;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
@Slf4j
public class CaliperDaemonTask implements Runnable {
private CaliperClient caliperClient;
private final CaliperTaskStartVo startVo;
public CaliperDaemonTask(CaliperClient caliperClient, CaliperTaskStartVo startVo) {
this.caliperClient = caliperClient;
this.startVo = startVo;
}
@Override
public void run() {
final String[] channelInfo = {null};
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
channelInfo[0] = startChannel();
if (!startVo.getShouldRun().get()) {
timer.cancel();
}
}
}, 0, startVo.getSocketParam().getPullInternal());
if (caliperClient != null) {
caliperClient.close();
caliperClient = null;
}
log.info("退出Caliper拉取连接线程 {}", channelInfo[0]);
}
private String startChannel() {
String channelInfo = String.format("channel:%s, active:%s,open:%s,writable:%s", caliperClient.getChannel(),
caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isActive(),
caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isOpen(),
caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isWritable());
try {
if (caliperClient != null && caliperClient.getChannel() != null
&& caliperClient.getChannel().isActive()) {
if (startVo.getGatewayBase() != null
&& Boolean.TRUE.equals(startVo.getGatewayBase().getInstanceVo().getLogOpen())) {
log.info("当前Caliper拉取连接通道处于连接状态连接id:{},连接信息:{}",
startVo.getGatewayBase().getInstanceVo().getRunId(), channelInfo);
}
if (startVo.getCustomParam() != null
&& Boolean.TRUE.equals(startVo.getCustomParam().getEnableHeartbeat())
&& Math.abs(System.currentTimeMillis() - caliperClient.getLastHeartbeatTime()) >
(long) startVo.getCustomParam().getHeartbeatMaxLostCount() * startVo.getCustomParam().getHeartbeatTimeInMs()) {
log.warn("当前Caliper拉取连接通道心跳启用且心跳超时触发重连连接id:{},连接信息:{}",
startVo.getGatewayBase().getInstanceVo().getRunId(), channelInfo);
//启用了心跳且心跳时间超时
reconnectToRfid(channelInfo, true);
}
} else {
assert caliperClient != null;
if (caliperClient.getChannel() == null) {
String errorMsg = String.format("Caliper到%s:%s的连接未建立或已关闭, %s", caliperClient.getIp(),
caliperClient.getPort(), channelInfo);
log.warn(errorMsg);
sendAlarmToPlatform(errorMsg);
reconnectToRfid(channelInfo, false);
} else {
String errorMsg = String.format("Caliper到%s:%s的连接未建立或已关闭, %s", caliperClient.getIp(),
caliperClient.getPort(), channelInfo);
log.warn(errorMsg);
sendAlarmToPlatform(errorMsg);
if (startVo.getCustomParam() != null
&& Boolean.TRUE.equals(startVo.getCustomParam().getEnableHeartbeat())
&& Math.abs(System.currentTimeMillis() - caliperClient.getLastHeartbeatTime()) >
(long) startVo.getCustomParam().getHeartbeatMaxLostCount() * startVo.getCustomParam().getHeartbeatTimeInMs()) {
log.warn("当前Caliper拉取连接通道心跳启用且心跳超时触发重连连接id:{},连接信息:{}",
startVo.getGatewayBase().getInstanceVo().getRunId(), channelInfo);
//启用了心跳且心跳时间超时
reconnectToRfid(channelInfo, true);
}
}
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
}
return channelInfo;
}
private void reconnectToRfid(final String copyChannelInfo, boolean forceConnect) throws InterruptedException {
Thread.sleep(5000);
Thread thread = new Thread(() -> {
try {
log.info("Caliper触发重连强制:{}....{}:{} {}", forceConnect, caliperClient.getIp(), caliperClient.getPort(),
copyChannelInfo);
if (caliperClient.getChannel() == null || forceConnect) {
caliperClient.reconnect();
}
} catch (Exception e) {
log.info(e.getMessage(), e);
}
});
thread.start();
}
public void forceReconnectToRfid() {
String channelInfo = String.format("channel:%s, active:%s,open:%s,writable:%s", caliperClient.getChannel(),
caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isActive(),
caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isOpen(),
caliperClient.getChannel() == null ? "" : caliperClient.getChannel().isWritable());
try {
Thread.sleep(500);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
log.info("Caliper触发重连强制:{}....{}:{} {}", true, caliperClient.getIp(), caliperClient.getPort(),
channelInfo);
caliperClient.reconnect();
} catch (Exception e) {
log.info(e.getMessage(), e);
}
}
});
thread.start();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
private void sendAlarmToPlatform(String detail) {
if (startVo.getServiceProvider() != null && startVo.getGatewayBase() != null) {
AlarmService alarmService = (AlarmService) startVo.getServiceProvider().getByName(ServiceName.Alarm);
if (alarmService != null) {
AlarmDto alarmDto = new AlarmDto();
alarmDto.setAlarmType(AlarmDto.Connect_Fail_Alarm);
alarmDto.setDetail(detail);
alarmDto.setDeviceId(startVo.getDeviceId());
alarmDto.setGatewayId(startVo.getGatewayBase().getInstanceVo().getRunId());
alarmDto.setGatewayName(startVo.getGatewayBase().getInstanceVo().getName());
alarmDto.setMsgKey(null);
alarmDto.setUpper(Boolean.TRUE);
alarmDto.setTime(new Date());
alarmService.raiseAlarm(Arrays.asList(alarmDto));
}
}
}
}

View File

@ -0,0 +1,64 @@
package com.cisdi.data.caliper.gateway;
import com.cisdi.data.common.exception.BusinessException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @Description: 获取完整的报文
* @Creator: Mo
* @Date: 2021/6/2 23:37
*/
@Slf4j
public class CaliperFrameDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(CaliperFrameDecoder.class);
private CaliperTaskStartVo startVo;
private final static Integer MIN_LENGTH = 10;
private final byte START_SPLIT = 0x02;
private final byte END_SPLIT = 0x03;
private final Set<Integer> validDataSizeInByte = new HashSet<Integer>(Arrays.asList(10));
public CaliperFrameDecoder(CaliperTaskStartVo startVo) {
this.startVo = startVo;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List<Object> out) throws Exception {
if (bufferIn.readableBytes() < MIN_LENGTH) {
return;
}
String channelInfo = ctx.toString();
// 心跳报文为 02 48 65 61 72 74 42 65 61 03 长度10个字节拥有独立的起始符和结束符不会和数据报文共享起始符和结束符
// 数据体长度固定为 8心跳报文数据内容为 HeartBea
int beginIndex = bufferIn.readerIndex();
for (int i = 0; i + 9 < bufferIn.readableBytes(); i += 10) {
byte curByte = bufferIn.getByte(beginIndex + i);
if(curByte != START_SPLIT){
ctx.close();
throw new BusinessException(channelInfo + "完整报文第一个字节内容不正确,期望: 0x02,实际:" + curByte);
}
byte endByte = bufferIn.getByte(beginIndex + i + 9);
if(endByte != END_SPLIT){
ctx.close();
throw new BusinessException(channelInfo + "完整报文最后一个字节内容不正确,期望: 0x03,实际:" + endByte);
}
out.add(bufferIn.retainedSlice(beginIndex + i, 10));
bufferIn.readerIndex(beginIndex + i + 10);
}
}
}

View File

@ -0,0 +1,58 @@
package com.cisdi.data.caliper.gateway;
import com.alibaba.fastjson.JSON;
import java.io.Serializable;
/**
* 读取到标签或标签离开解析后的内容都一致
*
* @author cup
* @date 2021/03/03
*/
public class CaliperItemVo implements Serializable {
private static final long serialVersionUID = -4290553035349526464L;
/**
* rfid 服务器Ip
*/
private String serverIp;
/**
* rfid 服务器端口
*/
private Integer serverPort;
/**
* 编号
*/
private String code;
public String getServerIp() {
return serverIp;
}
public void setServerIp(String serverIp) {
this.serverIp = serverIp;
}
public Integer getServerPort() {
return serverPort;
}
public void setServerPort(Integer serverPort) {
this.serverPort = serverPort;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}

View File

@ -0,0 +1,62 @@
package com.cisdi.data.caliper.gateway;
import com.cisdi.data.sdk.gateway.base.PulledSocketGatewayBase;
import com.cisdi.data.sdk.param.PullSocketParam;
import com.cisdi.data.sdk.service.ServiceProvider;
import java.util.concurrent.atomic.AtomicBoolean;
public class CaliperTaskStartVo {
private AtomicBoolean shouldRun;
private ServiceProvider serviceProvider;
private String deviceId;
private PulledSocketGatewayBase gatewayBase;
private PullSocketParam socketParam;
private CaliperCustomParam customParam;
private CaliperDaemonTask caliperDaemonTask;
public AtomicBoolean getShouldRun() {
return shouldRun;
}
public void setShouldRun(AtomicBoolean shouldRun) {
this.shouldRun = shouldRun;
}
public ServiceProvider getServiceProvider() {
return serviceProvider;
}
public void setServiceProvider(ServiceProvider serviceProvider) {
this.serviceProvider = serviceProvider;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public PulledSocketGatewayBase getGatewayBase() {
return gatewayBase;
}
public void setGatewayBase(PulledSocketGatewayBase gatewayBase) {
this.gatewayBase = gatewayBase;
}
public PullSocketParam getSocketParam() {
return socketParam;
}
public void setSocketParam(PullSocketParam socketParam) {
this.socketParam = socketParam;
}
public CaliperCustomParam getCustomParam() {
return customParam;
}
public void setCustomParam(CaliperCustomParam customParam) {
this.customParam = customParam;
}
public CaliperDaemonTask getRfidDaemonTask() {
return caliperDaemonTask;
}
public void setRfidDaemonTask(CaliperDaemonTask caliperDaemonTask) {
this.caliperDaemonTask = caliperDaemonTask;
}
}

View File

@ -0,0 +1,87 @@
package com.cisdi.data.caliper.gateway;
import com.cisdi.data.common.exception.BusinessException;
import com.cisdi.data.util.HexUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayList;
import java.util.List;
public class CaliperVoDecoder {
public static List<CaliperItemVo> Decode(ByteBuf in) {
CaliperItemVo vo = new CaliperItemVo();
List<CaliperItemVo> result = new ArrayList<>();
//消息体部分
int length;
if (in == null) {
throw new BusinessException("ByteBuf 入参为null");
} else if (in.readableBytes() == 10) {
length = 8;
}
else {
throw new BusinessException("报文长度不匹配 ,期望10实际" + in.readableBytes());
}
//去掉报文头 结束符
ByteBuf proceedIn = in.retainedSlice(1, length);
result.add(split(proceedIn));
ReferenceCountUtil.release(proceedIn);
return result;
}
/**
* @param in 去掉头尾的报文
* @Description: 拆分报文
* @author: Mo
* @Date: 2021/6/2 10:51
* @return: com.cisdi.data.RFID.gateway.RfidItemVo
*/
public static CaliperItemVo split(ByteBuf in) {
CaliperItemVo caliperItemVo = new CaliperItemVo();
ByteBuf buf = in.readBytes(8);
// 编号
String code = buf.toString(CharsetUtil.UTF_8);
caliperItemVo.setCode(code);
ReferenceCountUtil.release(buf);
return caliperItemVo;
}
public static void main(String[] args) throws Exception {
// 标签读取
// byte[] readBytes1 = HexUtil.hexStringToBytes("001c1952000b000e3000eabcdef01234567890003001000400000501");
// ByteBuf copiedBuffer = Unpooled.copiedBuffer(readBytes1);
// RfidItemVo decoderVo = RfidVoDecoder.Decode(copiedBuffer);
// logger.info("vo1={}", decoderVo);
//
// // 标签离开
// readBytes1 = HexUtil.hexStringToBytes("00161942050c000e3000eabcdef01234567890003001");
// copiedBuffer = Unpooled.copiedBuffer(readBytes1);
// decoderVo = RfidVoDecoder.Decode(copiedBuffer);
// logger.info("vo1={}", decoderVo);
//标签读取
byte[] readBytes1 = HexUtil.hexStringToBytes("02524630313030303103");
ByteBuf copiedBuff = Unpooled.copiedBuffer(readBytes1);
List<CaliperItemVo> caliperItemVoList = CaliperVoDecoder.Decode(copiedBuff);
caliperItemVoList.forEach(System.out::println);
readBytes1 = HexUtil.hexStringToBytes("0248656172744265617403");
copiedBuff = Unpooled.copiedBuffer(readBytes1);
caliperItemVoList = CaliperVoDecoder.Decode(copiedBuff);
caliperItemVoList.forEach(System.out::println);
CaliperFrameDecoder decoder = new CaliperFrameDecoder(null);
readBytes1 = HexUtil.hexStringToBytes("0248656172744265617403");
copiedBuff = Unpooled.copiedBuffer(readBytes1);
decoder.decode(null, copiedBuff, new ArrayList<Object>());
}
}