package com.cisdi.data.AMETEKSurfaceVision.gateway; import com.alibaba.fastjson.JSON; import com.cisdi.data.AMETEKSurfaceVision.DynamicCodeActionParamDTO; import com.cisdi.data.AMETEKSurfaceVision.DynamicCodeActionParamDTO.PropertyConfig; import com.cisdi.data.common.exception.BusinessException; import com.cisdi.data.sdk.consts.ServiceName; import com.cisdi.data.sdk.gateway.message.SocketMessage; import com.cisdi.data.sdk.gateway.netty.IoSession; import com.cisdi.data.sdk.gateway.netty.impl.AbstractIoSession; import com.cisdi.data.sdk.service.DynamicCodecService; import com.cisdi.data.sdk.service.SendService; import com.cisdi.data.sdk.service.Service; import com.cisdi.data.sdk.vo.ExeResultVo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.util.HashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 表检仪协议会话 * * @author tzz */ public class SurfaceVisionIoSession extends AbstractIoSession implements IoSession { private long lastAliveTime = System.currentTimeMillis(); private static Logger logger = LoggerFactory.getLogger(SurfaceVisionIoSession.class); private String deviceId = null; private String gwPrefixCache = null; private final static byte[] beat = {0x07,0x7C,0x24,0x0D,0x0A}; private int count = 1; // 初始化为0的原子整型变量作为计数器,用作发送命令的replayId,初始值为1 /** 当前下发命令replayId */ private Integer currentCount = -1; public void setCurrentSend(SurfaceVisionSocketReturnMessage currentSend) { this.currentSend = currentSend; } /** 当前下发命令信息 */ private SurfaceVisionSocketReturnMessage currentSend = null; public void increment() { if (count < SurfaceVisionConstants.MAX_COUNT) { count++; // 自增1并返回新值 } else { count = 1; } } public int getCount() { return count; // 获取当前计数器的值 } public SurfaceVisionIoSession(String deviceId) { this.deviceId = deviceId; } @Override public String[] getDeviceIds() { return new String[0]; } /** 获取当前下发电文点位配置信息 */ private List getPropertyConfig(String msgKeyName){ //获取DynamicService Service service = serviceProvider.getByName(ServiceName.DynamicCodec); if (service == null || !(service instanceof DynamicCodecService)) { throw new BusinessException("获取DynamicService失败"); } //获取网关ID String runId = socketGateway.getInstanceVo().getRunId(); //获取点位配置信息 DynamicCodecService dynamicCodecService = (DynamicCodecService)service; String actionParam = dynamicCodecService.findByRunIdAndMsgKey(runId, msgKeyName); DynamicCodeActionParamDTO dto = null; try { dto = JSON.parseObject(actionParam, DynamicCodeActionParamDTO.class); } catch (Exception e) { throw new BusinessException("连接id:" + runId + "电文号:" + msgKeyName + "json转换错误", e); } if (dto == null || dto.getPropertyList() == null || dto.getPropertyList().isEmpty()) { throw new BusinessException("连接id:" + runId + "电文号:" + msgKeyName + "属性列表为空"); } //返回点位信息 return dto.getPropertyList(); } @Override public void onRead(Object message) { //获取表检仪上送或返回消息 ByteBuf inBuf = (ByteBuf) message; int length = inBuf.readableBytes(); //将消息头和消息体拆包,解析出消息头信息 SurfaceVisionFrameInputVo vo = SurfaceVisionVoDecoder.Decode(inBuf); logger.info("AMETEKSurfaceVision收到 msg: {}", vo); //消息头,也是电文信息 String msgKey = vo.getMsgKey(); try { //通过消息头判断返回消息类型 if(vo.getBeatType()){ //判断是否心跳信息 lastAliveTime = System.currentTimeMillis(); logger.info("接收到心跳消息,当前时间为:{}", lastAliveTime); } else if (msgKey.startsWith(SurfaceVisionConstants.RETURN_HEADER1)) { //非Query命令正常返回消息 //解码 SurfaceVisionVoDecoder.DecodeReturn(vo); //组装返回信息,判断replayId是否一致 ExeResultVo sendResult = new ExeResultVo(); if (!currentCount.equals(vo.getReplayId())) { logger.info("AMETEKSurfaceVision收到 msgkey: {} 消息:{}, ReplayId:{} 与实际发送下去的:{},不一致" , msgKey, vo, vo.getReplayId(), currentCount); sendResult.setSuccess(false); sendResult.setMessage("返回消息replayId与发送消息replayId不符"); } else { sendResult.setSuccess(vo.getStatus()); sendResult.setMessage(vo.getDescription()); } //组装返回结果 returnData(sendResult); } else if (SurfaceVisionConstants.RETURN_HEADER2.equals(msgKey)) { //下发命令异常返回消息 //解码 SurfaceVisionVoDecoder.DecodeReturnError(vo); ExeResultVo sendResult = new ExeResultVo(); sendResult.setSuccess(vo.getStatus()); sendResult.setMessage(vo.getDescription()); //组装返回结果 returnData(sendResult); } else if (SurfaceVisionConstants.RETURN_HEADER3.equals(msgKey)) { //Query命令返回 //解码 SurfaceVisionVoDecoder.DecodeReturnQuery(vo); ExeResultVo sendResult = new ExeResultVo(); sendResult.setSuccess(vo.getStatus()); sendResult.setMessage(vo.getDescription()); Map queryData = new HashMap<>(); queryData.putAll(vo.getReturnData()); sendResult.setData(queryData); //组装返回结果 returnData(sendResult); } else { //主动上报消息 //解码 SurfaceVisionVoDecoder.DecodeData(vo, getPropertyConfig(msgKey)); Map realDataMap = vo.getReturnData(); //数据发送到平台 if (vo != null && serviceProvider != null && socketGateway != null) { SendService service = (SendService) serviceProvider.getByName(ServiceName.Send); //组装数据包 SocketMessage socketMessage = socketGateway.buildSocketMessage(); socketMessage.setDeviceId(deviceId); socketMessage.getPropsMap().put(SurfaceVisionConstants.S_V_SEND_PLATFORM_DATA, realDataMap); socketMessage.setMsgKey(msgKey); //发送数据 service.sendMessage(socketMessage); } else { logger.info("AMETEKSurfaceVision收到 msgkey: {} 消息:{}, 但是不能发到平台", msgKey, JSON.toJSONString(realDataMap)); } } }catch (Exception e){ logger.error("AMETEKSurfaceVision接收消息匹配错误, msgKey: {}, msg:{}, error", msgKey, vo, e); } if(socketGateway!= null && Boolean.TRUE.equals(socketGateway.getInstanceVo().getLogOpen())) { logger.info("AMETEKSurfaceVision {} 报文长度:{} vo:{} ", getChannel(), length, JSON.toJSONString(vo)); } } /** 设置当前下发命令返回信息 */ private void returnData(ExeResultVo sendResult) { if(currentSend == null){ throw new BusinessException("下发返回消息没有对应的下发命令"); } currentSend.setRealData(sendResult); currentSend.setSendReturnFlag(true); currentSend = null; } @Override public void onOpen() { super.onOpen(); logger.info("建立连接,channel:{}", getChannel()); } @Override public void onClose() { super.onClose(); } public void close() { if(getChannel() != null) { try { getChannel().close().sync(); } catch (InterruptedException e) { logger.warn(e.getLocalizedMessage(), e); } } } public String gwPrefix() { if(gwPrefixCache != null) { return gwPrefixCache; } gwPrefixCache = ""; if(socketGateway != null && socketGateway.getInstanceVo() != null) { gwPrefixCache = "网关Id:" + socketGateway.getInstanceVo().getRunId() + "连接:" + getChannel() + " "; }else { gwPrefixCache = "连接:" + getChannel() + " "; } return gwPrefixCache; } public long getLastAliveTime() { return lastAliveTime; } @Override public void send(Object message) { //下发命令 SurfaceVisionSocketReturnMessage returnMsg = (SurfaceVisionSocketReturnMessage) message; String msgKey = returnMsg.getMsgKey(); //与AMETEKSurfaceVisionSocketProtocol类对应 Map propsMap = (Map) returnMsg.getPropsMap().get(SurfaceVisionConstants.S_V_SEND_DATA); //获取下发点位配置 List propertyConfigList = getPropertyConfig(msgKey); //获取计数器值作为下发命令replayId int c = getCount(); logger.info("下发数据, 数据:{}, 点位配置:{}, 返回ID:{}", propsMap, propertyConfigList, c); ByteBuf byteBuf = SurfaceVisionEncoder.Encode(msgKey, propsMap, propertyConfigList, c); logger.info("下发数据的byteBuf:{}", byteBuf); //下发命令编码后发送 getChannel().writeAndFlush(byteBuf); logger.info("下发数据成功"); //缓存当前下发命令信息和replayId currentSend = returnMsg; currentCount = c; //计数器自增 increment(); } public void sendBeat() { logger.info("发送心跳"); ByteBuf byteBuf = Unpooled.wrappedBuffer(beat); getChannel().writeAndFlush(byteBuf); logger.info("心跳发送成功"); } }