# cold-chain **Repository Path**: xray_0611/cold-chain ## Basic Information - **Project Name**: cold-chain - **Description**: No description available - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-03-08 - **Last Updated**: 2024-05-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 冷链监控系统 ## 冷链行业 冷链(cold chain)是指某些食品原料、经过加工的食品或半成品、特殊的生物制品和药品在经过收购、加工、灭活后,在产品加工、贮藏、运输、分销和零售、使用过程中,其各个环节始终处于产品所 必需的特定低温环境下,减少损耗,防止污染和变质,以保证产品食品安全、生物安全、药品安全的特殊供应链系统。 冷链由冷冻加工、冷冻贮藏、冷藏运输及配送、冷冻销售四个方面构成: 1. 冷冻加工:包括肉禽类、鱼类和蛋类的冷却与冻结,以及在低温状态下的加工作业过程;也包括果 蔬的预冷;各种速冻食品和奶制品的低温加工等。在这个环节上主要涉及冷链装备有冷却、冻结装置和速冻装置 2. 冷冻贮藏:包括食品的冷却储藏和冻结储藏,以及水果蔬菜等食品的气调贮藏,它是保证食品在储 存和加工过程中的低温保鲜环境。在此环节主要涉及各类冷藏库/加工间、冷藏柜、冻结柜及家用冰箱等等。 3. 冷藏运输:包括食品的中、长途运输及短途配送等物流环节的低温状态。它主要涉及铁路冷藏车、 冷藏汽车、冷藏船、冷藏集装箱等低温运输工具。在冷藏运输过程中,温度波动是引起食品品质下 降的主要原因之一,所以运输工具应具有良好性能,在保持规定低温的同时,更要保持稳定的温 度,远途运输尤其重要。 4. 冷冻销售:包括各种冷链食品进入批发零售环节的冷冻储藏和销售,它由生产厂家、批发商和零售 商共同完成。随着大中城市各类连锁超市的快速发展,各种连锁超市正在成为冷链食品的主要销售 渠道,在这些零售终端中,大量使用了冷藏/冻陈列柜和储藏库,由此逐渐成为完整的食品冷链中不可或缺的重要环节。 ![冷链物流方案策划与设计-上海威士达冷链物流研究院](%E5%86%B7%E9%93%BE%E6%8A%A5%E8%AD%A6%E7%9B%91%E6%8E%A7%E7%B3%BB%E7%BB%9F.assets/R-C.jfif) 冷链物流设备监控系统是一款应用于对食品药品冷链仓储、运输的环节中针对温度、湿度、电量等进行监控、预警和统计分析的系统。 实现了冷链监控环节的: 数据采集自动化 监控指标配置化 预警通知自动化 统计分析可视化 从而提升了生鲜、药品仓储、运输的安全管控水平,增强了政府、企业对业务各环节的了解和管理。 ## 业务流程 ![1709087001830](%E5%86%B7%E9%93%BE%E6%8A%A5%E8%AD%A6%E7%9B%91%E6%8E%A7%E7%B3%BB%E7%BB%9F.assets/1709087001830.png) ## 数据流程 ![1709087463968](%E5%86%B7%E9%93%BE%E6%8A%A5%E8%AD%A6%E7%9B%91%E6%8E%A7%E7%B3%BB%E7%BB%9F.assets/1709087463968.png) ## 系统架构 ![1709089178258](%E5%86%B7%E9%93%BE%E6%8A%A5%E8%AD%A6%E7%9B%91%E6%8E%A7%E7%B3%BB%E7%BB%9F.assets/1709089178258.png) ## 数据库设计 ![image-20240314202200837](冷链报警监控系统.assets/image-20240314202200837.png) # 数据采集 使用Netty开发数据采集服务器,接收报文,解析报文并序列化,最后发送给Kafka. ![image-20240314192520619](冷链报警监控系统.assets/image-20240314192520619.png) ## 数据采集服务器 1) 创建SpringBoot项目,配置相关依赖 ``` org.springframework.boot spring-boot-starter-web io.netty netty-all org.springframework.kafka spring-kafka com.cold cold-common 3.8.4 ``` 2) 核心配置 ``` server: port: 8001 netty: port: 11111 spring: kafka: bootstrap-servers: 192.168.223.177:9092 listener: concurrency: 5 producer: retries: 3 batch-size: 10000 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: coldflink ``` 3) Kafka配置类 ``` @Configuration public class KafkaConfig { //消息主题 对应数据采集服务 public static final String TOPIC = "cold_chain_data_collection"; //消息主题 对应数据分析服务 public static final String TOPIC_DRUID = "cold_chain_data_druid"; @Bean public NewTopic newDataCollectionTopic(){ return new NewTopic(TOPIC,1,(short) 1); } @Bean public NewTopic newDruidTopic(){ return new NewTopic(TOPIC_DRUID,1,(short)1); } } ``` 4) Spring工具 ``` @Component public class SpringContextUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtil.applicationContext = applicationContext; } public static T getBean(Class aClass){ Object bean = applicationContext.getBean(aClass); return (T)bean; } } ``` 5) 数据采集处理器 报警消息实体类 ```\ /** * 报警消息 */ public class AlertMessage { private static final long serialVersionUID = 1L; /** 消息id */ private Long id; /** 设备代码 */ private String deviceCode; /** 主机名称 */ private String hostName; /**主机类型*/ private Long hostType; /** 区域名称 */ private String areaName; /** 公司名称 */ private String companyName; /** 设备id */ private Long deviceId; /** 主机id */ private Long hostId; /** 区域id */ private Long areaId; /** 公司id */ private Long companyId; /** 温度 */ private Long tem; /** 湿度 */ private Long hum; /** 经度 */ private String lon; /** 纬度 */ private String lat; /** 最高温 */ private Long maxTem; /** 最低温 */ private Long minTem; /** 最大湿度 */ private Long maxHum; /** 最小湿度 */ private Long minHum; /** 温度报警 0 正常 1 高温 -1 低温 */ private Long alertTem; /** 湿度报警 0 正常 1 湿度高 -1 湿度低 */ private Long alertHum; /** 发送时间 */ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date sendTime; /** 设备状态值 0 停用 1 正常 -1 异常 */ private Long deviceState; /** 设备状态文字 停用 正常 异常 */ private String deviceStatus; .... } ``` 数据采集处理器 ``` /** * 数据采集处理器 */ @Slf4j public class DataCollectionHandler extends SimpleChannelInboundHandler { private Logger logger = LoggerFactory.getLogger(DataCollectionHandler.class); private KafkaTemplate kafkaTemplate = SpringContextUtil.getBean(KafkaTemplate.class); private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { //解析报文 MSG|WHCC-A01|41|32|1|0.000000|0.000000|2024-03-01 10:22:56|V1.0 try { logger.info("处理报文:" + s); String[] strs = s.split("\\|"); AlertMessage message = new AlertMessage(); message.setDeviceCode(strs[1]); message.setTem(Long.parseLong(strs[2])); message.setHum(Long.parseLong(strs[3])); message.setDeviceState(Long.parseLong(strs[4])); message.setLon(strs[5]); message.setLat(strs[6]); message.setSendTime(format.parse(strs[7])); //序列化为json String json = new ObjectMapper().writeValueAsString(message); //发送到kafka kafkaTemplate.send(KafkaConfig.TOPIC, json); logger.info("消息发送成功: " + json); }catch (Exception ex){ ex.printStackTrace(); log.error("消息处理异常:" + s,ex); } } } ``` 6) 数据采集服务 ``` /** * 数据采集服务 */ @Component public class DataCollectionServer implements ApplicationListener { @Value("${netty.port}") private Integer port; /** * 启动数据采集服务 */ public void start(){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(2); NioEventLoopGroup workGroup = new NioEventLoopGroup(10); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new StringEncoder()) .addLast(new StringDecoder()) .addLast(new DataCollectionHandler()); } }); ChannelFuture sync = serverBootstrap.bind(port).sync(); sync.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } @Override public void onApplicationEvent(ApplicationContextEvent event) { System.out.println("启动数据采集服务..."); start(); } } ``` ## 模拟报文 测试阶段无法使用真正的物联网设备,可以使用定时任务实现模拟报文 这里使用了若依系统结合Quarz框架做定时任务 1) 生成随机消息的类 ``` /** * 模拟硬件消息 */ public class MessageTask { private Logger logger = LoggerFactory.getLogger(MessageTask.class); private Random random = new Random(); private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public void sendMessage(String deviceCode){ NioEventLoopGroup workGroup = new NioEventLoopGroup(4); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new StringEncoder()) .addLast(new StringDecoder()); } }); ChannelFuture sync = bootstrap.connect("127.0.0.1", 11111).sync(); //随机数据报文 String msg = random(deviceCode); sync.channel().writeAndFlush(msg); sync.channel().closeFuture(); }catch (Exception ex){ logger.error("发送消息异常",ex); }finally { workGroup.shutdownGracefully(); } } public String random(String deviceCode){ //随机产生温度 -30 ~ 50 int tem = random.nextInt(80) - 30; //随机湿度 0 ~ 90 int hum = random.nextInt(90); //随机状态 0 停用 1 正常 -1 异常 int[] states = {0,1,-1}; //随机经度 double lon = 90 + random.nextInt(30) + random.nextDouble(); //随机纬度 double lat = 30 + random.nextInt(10) + random.nextDouble(); int state = states[random.nextInt(states.length)]; String message = String.format("MSG|%s|%d|%d|%d|%f|%f|%s|V1.0", deviceCode,tem,hum,state,lon,lat,dateFormat.format(new Date())); System.out.println(message); return message; } } ``` 2) 具体的模拟设备任务 ``` @Component("taskCXASJ01") public class TaskCXASJ01 extends MessageTask { public void sendMessage(){ super.sendMessage("CXAS-J01"); } } ``` 3) 若依系统上配置定时任务 ![image-20240318153253343](冷链报警监控系统.assets/image-20240318153253343.png) # 实时数据计算 ## Flink概述 **数据流与流计算** 数据流是一串连续不断的数据的集合,就象水管里的水流,在水管的一端一点一点地供水,而在水管的 另一端看到的是一股连续不断的水流。类似于人们对河流的理解本质上也就是流的概念,但是这条河没 有开始也没有结束,数据流非常适合于离散的、没有开头或结尾的数据。例如,交通信号灯的数据是连 续的,没有“开始”或“结束”,是连续的过程而不是分批发送的数据记录。通常情况下,数据流对于在生 成连续数据流中以小尺寸(通常以KB字节为单位)发送数据的数据源类型是有用的。这包括各种各样的 数据源,例如来自连接设备的遥测,客户访问的Web应用时生成的日志文件、电子商务交易或来自社交 网络或地理LBS服务的信息等。 传统上,数据是分批移动的,批处理通常同时处理大量数据,具有较长时间的延迟。例如,该复制过程 每24小时运行一次。虽然这可以是处理大量数据的有效方法,但它不适用于流式传输的数据,因为数据 在处理时已经是旧的内容。 采用数据流是时间序列和随时间检测模式的最佳选择。例如,跟踪Web会话的时间。大多数物联网产生 的数据非常适合数据流处理,包括交通传感器,健康传感器,交易日志和活动日志等都是数据流的理想 选择。 流数据通常用于实时聚合和关联、过滤或采样。通过数据流,我们可以实时分析数据,并深入了解各种 行为,例如统计,服务器活动,设备地理位置或网站点击量等。 **数据流整合技术的解决方案** - 金融机构跟踪市场变化,并可根据配置约束(例如达到特定股票价格时出售)调整客户组合的配 置。 - 电网监控吞吐量并在达到某些阈值时生成警报。 - 新闻资讯APP从各种平台进行流式传输时,产生的点击记录,实时统计信息数据,以便它可以提供 与受众人口相关的文章推荐。 - 电子商务站点以数据流传输点击记录,可以检查数据流中的异常行为,并在点击流显示异常行为时 发出安全警报。 **数据流带给我们的挑战** 数据流是一种功能强大的工具,但在使用流数据源时,有一些常见的挑战。以下的列表显示了要规划数 据流的一些事项: - 可扩展性规划 - 数据持久性规划 - 如何在存储层和处理层中加入容错机制 **数据流的管理工具** 随着数据流的不断增长,出现了许多合适的大数据流解决方案。我们总结了一个列表,这些都是用于处 理流数据的常用工具: - Apache Kafka 是一个分布式发布/订阅消息传递系统,它集成了应用程序和数据流处理。 - Apache Storm Apache Storm是一个分布式实时计算系统。Storm用于分布式机器学习、实时分析处理,尤其是其具 有超高数据处理的能力。 - Apache Flink Apache Flink是一种数据流引擎,为数据流上的分布式计算提供了诸多便利。 **Flink简介** Apache Flink 是一个开源的分布式流式处理框架,是新的流数据计算引擎,用java实现。Flink可以: - 提供准确的结果,甚至在出现无序或者延迟加载的数据的情况下。 - 它是状态化的容错的,同时在维护一次完整的的应用状态时,能无缝修复错误。 - 大规模运行,在上千个节点运行时有很好的吞吐量和低延迟。 ![image-20240314185140540](冷链报警监控系统.assets/image-20240314185140540.png) Flink的核心组件: ![image-20240314185156862](冷链报警监控系统.assets/image-20240314185156862.png) **应用场景** Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括: ![image-20240314185317894](冷链报警监控系统.assets/image-20240314185317894.png) Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。 Flink适用的应用场景包括: 1. 事件驱动型应用 反欺诈 异常检测 基于规则的报警 业务流程监控 社交网络Web 应用 2. 数据分析应用 电信网络质量监控 移动应用中的产品更新及实验评估分析 消费者技术中的实时数据即席分析 大规模图分析 3. 数据管道应用 电商中的实时查询索引构建 电商中的持续 ETL **Flink架构** Flink在运行中主要有三个组件组成,JobClient,JobManager 和 TaskManager。 - Program Code:我们编写的 Flink 应用程序代码。 - Job Client:Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点。 Job Client 负责 接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。 执行完 成后,Job Client 将结果返回给用户。 - Job Manager:主进程(也称为作业管理器)协调和管理程序的执行。 它的主要职责包括安排任 务,管理checkpoint ,故障恢复等。机器集群中至少要有一个 master,master 负责调度 task, 协调 checkpoints 和容灾,高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是 standby; Job Manager 包含 Actor system、Scheduler、Check pointing 三个重要的组件。 - Task Manager:从 Job Manager 处接收需要部署的 Task。 Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点。 ## Flink安装 Flink的运行一般分为三种模式,即local、Standalone、On Yarn。 **下载程序:** 1. **Local****模式** Local模式比较简单,用于本地测试,安装过程也比较简单,只需在主节点上解压安装包就代表成功安 装了,在flink安装目录下使用**./bin/start-cluster.sh**(windows环境下是.bat)命令,就可以通过 master:8081监控集群状态,关闭集群命令:**./bin/stop-cluster.sh**(windows环境下是.bat)。 2. **Standalone****模式** Standalone模式顾名思义,是在本地集群上调度执行,不依赖于外部调度机制例如YARN。此时需要对 配置文件进行一些简单的修改,我们预计使用当前服务器充当Job manager和Task Manager,一般情 况下需要多台机器。 在安装Flink之前,需要对安装环境进行检查,对于Standalone模式,需要提前安装好zookeeper。 1) 修改配置 ``` tar -xzf flink-1.9.1-bin-scala_2.12.tgz cd /opt/flink-1.9.1/conf vim flink-conf.yaml # 主要更改的位置有: jobmanager.rpc.address: 172.17.0.143 taskmanager.numberOfTaskSlots: 2 parallelism.default: 4 #取消下面两行的注释 rest.port: 8081 rest.address: 0.0.0.0 ``` 2) 启动flink集群 (因为在环境变量中已经指定了flink的bin位置,因此可以直接输入start-cluster.sh) 3) 验证flink进程,登录web界面,查看Web界面是否正常。至此,standalone模式已成功安装。 ![image-20240314190346170](冷链报警监控系统.assets/image-20240314190346170.png) **示例演示** Flink安装目录下的example目录里有一些Flink程序示例,我们可以使用这些示例来感受一下Flink的功能。 下面的步骤是我们演示SocketWindowWordCount这个应用的过程,这个应用的作用是监听某个socket 服务器端口,实时计算这个端口数据的单词数量。 1. 打开端口 ``` # 在nc命令行中输入文本, 必要时需要安装nc命令, yum -y install nc [root@node2-vm06 streaming]# nc -l 9010 ``` 2. 提交示例应用 有两种方式提交应用: 1)使用flink命令提交应用 ``` [root@node2-vm06 streaming]# /opt/flink-1.9.1/bin/flink run /opt/flink- 1.9.1/examples/streaming/SocketWindowWordCount.jar --port 9010 ``` 2)页面上选择应用的jar文件 ![image-20240314190505468](冷链报警监控系统.assets/image-20240314190505468.png) 3. 在nc命令行中输入单词 ``` nc -l 9010 abc abc def dfs def ttt ttt ggg ggg ``` 4. 查看结果 ``` def : 2 dfs : 1 abc : 2 ttt : 2 ggg : 2 ``` ## 项目开发 ### 缓存设备详情 查询设备详情信息缓存到Redis中,在实时消息处理时作为报警状态设置的依据 ``` /** * 设备详情 */ public interface DeviceDetailsMapper { @Select("select d.code device_code,d.min_tem,d.max_tem,d.min_hum,d.max_hum,d.status device_status,\n" + "c.abbr company_name,h.code host_code,h.type host_type,a.name area_name\n" + "from cc_monitor_device d,cc_company c,cc_area a,cc_host h \n" + "where d.host_id = h.id and h.area_id = a.id and a.company_id = c.id;") @Results({ @Result(property = "deviceCode",column = "device_code"), @Result(property = "hostName",column = "host_code"), @Result(property = "hostType",column = "host_type"), @Result(property = "areaName",column = "area_name"), @Result(property = "companyName",column = "company_name"), @Result(property = "minTem",column = "min_tem"), @Result(property = "maxTem",column = "max_tem"), @Result(property = "minHum",column = "min_hum"), @Result(property = "maxHum",column = "max_hum"), @Result(property = "deviceStatus",column = "device_status") }) List selectDeviceDetails(); } ``` 设备详细业务 ``` /** * 设备详细业务 */ public interface IDeviceDetailsService { /** * 查询数据库中的设备详情保存到Redis中 */ void saveDeviceDetailsToRedis(); } @Service public class DeviceDetailsServiceImpl implements IDeviceDetailsService { @Resource private DeviceDetailsMapper deviceDetailsMapper; @Override public void saveDeviceDetailsToRedis() { List list = deviceDetailsMapper.selectDeviceDetails(); Jedis jedis = JedisUtils.getInstance().getJedis(); list.forEach(deviceDetails -> { jedis.set(deviceDetails.getDeviceCode(),JSON.toJSONString(deviceDetails)); }); } } ``` ### 数据收集 1) 创建Maven项目cold-data-collection 2) 配置pom ``` UTF-8 1.9.1 1.8 2.11 ${java.version} ${java.version} org.apache.flink flink-avro ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.kafka kafka-clients 2.2.1 com.cold cold-common 3.8.4 ``` 3) 编写实时计算 ``` /** * 报警消息实时计算 填充温度和湿度报警属性 */ public class AlertMessageMap implements MapFunction { @Override public AlertMessage map(AlertMessage value){ JedisUtils jedisUtils = JedisUtils.getInstance(); Jedis jedis = jedisUtils.getJedis(); try { //读取redis中的设备数据 String json = jedis.get(value.getDeviceCode()); DeviceDetails device = JSON.parseObject(json, DeviceDetails.class); //把设备数据填充到消息中 value.setAreaName(device.getAreaName()); value.setCompanyName(device.getCompanyName()); value.setHostName(device.getHostName()); value.setMaxHum(device.getMaxHum()); value.setMinHum(device.getMinHum()); value.setMaxTem(device.getMaxTem()); value.setMinTem(device.getMinTem()); value.setHostType(device.getHostType()); if(device.getDeviceStatus() == 0){ value.setDeviceStatus("停用"); }else if(device.getDeviceStatus() == 1){ value.setDeviceStatus("正常"); }else if(device.getDeviceStatus() == -1){ value.setDeviceStatus("异常"); } //设置温度湿度报警 if (value.getTem() > value.getMaxTem()) { value.setAlertTem(1L); //高温 } else if (value.getTem() < value.getMinTem()) { value.setAlertTem(-1L); //低温 } else { value.setAlertTem(0L); //正常 } if (value.getHum() > value.getMaxHum()) { value.setAlertHum(1L); } else if (value.getHum() < value.getMinHum()) { value.setAlertHum(-1L); } else { value.setAlertHum(0L); } } catch (Exception e) { e.printStackTrace(); } finally { jedisUtils.returnJedis(jedis); } System.out.println("转换消息:" + value); return value; } } ``` 4) 启动实时计算任务 ``` /** * Flink处理实时报警消息的任务 */ public class MessageStreamingJob { //消息来源主题 对应数据采集服务 public static final String FROM_TOPIC = "cold_chain_data_collection"; //消息目标主题 public static final String TO_TOPIC = "cold_chain_data_druid"; public static void main(String[] args) throws Exception { //获得运行环境 createLocalEnvironment 本地运行 getExecutionEnvironment 部署服务器 StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); //设置检查点容错 env.enableCheckpointing(5000); //检查点模式 env.getCheckpointConfig().setCheckpointingMode(CheckpointConfig.DEFAULT_MODE); //设置重启策略 env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1000)); //消费者kafka队列属性 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.223.177:9092"); properties.setProperty("group.id", "coldflink"); properties.setProperty("auto.offset.reset", "earliest"); //创建Flink消费者,将从队列中读取消息 FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(FROM_TOPIC,new SimpleStringSchema(),properties); consumer.setStartFromLatest(); //输入数据流,进行数据转换 json --> AlertMessage --> 填充报警信息 DataStream stream = env.addSource(consumer) .setParallelism(1) .map(msg -> JSON.parseObject(msg,AlertMessage.class)) .map(new AlertMessageMap()); //创建生产者,将计算完的数据发到下个队列 FlinkKafkaProducer producer = new FlinkKafkaProducer<>(TO_TOPIC,new SimpleStringSchema(),properties); //将流数据转json发给kafka stream.map((alertMessage -> JSON.toJSONString(alertMessage))).addSink(producer); producer.close(); env.execute("冷链物流实时监控任务2.0"); } } ``` # 实时数据分析 ## Apache Druid **Apache Druid 简介** Apache Druid是一个拥有大数据实时查询和分析的高容错、高性能开源分布式系统,旨在快速处理大 规模的数据,并能够实现快速查询和分析。尤其是当发生代码部署、机器故障以及其他产品系统遇到宕 机等情况时,Druid仍然能够保持100%正常运行。创建Druid的最初意图主要是为了解决查询延时问 题,当时试图使用hadoop来实现交互式查询分析,但是很难满足实时分析的需要。而Druid提供了以交 互方式访问数据的能力,并权衡了查询的灵活性和性能二采取了特殊的存储格式。 Druid允许以类似Dremel和PowerDrill的方式进行单表查询,同时还增加了一些新特性,如为局部嵌套 数据结构提供列式存储格式、为快速过滤做索引、实时摄取和查询、高容错的分布式体系架构等。 **特性** - 为分析而设计:为OLAP工作流的探索性分析而构建,支持各种过滤、聚合和查询等类; - 快速的交互式查询:Druid的低延迟数据摄取架构允许事件在他们创建后毫秒内可被查询到; - 高可用性:Druid的数据在系统更新时依然可用,规模的扩大和缩小都不会造成数据丢失; - 可扩展:Druid已实现每天能够处理数十亿事件和TB级数据。 Druid设计时的三个原则: - 快速查询 : 部分数据聚合 + 内存化 + 索引 - 水平拓展能力: 分布式数据 + 并行化查询 - 实时分析 : 数据不可更改 、只能追加 **适用场景** - 需要交互式聚合和快速探究大量数据时; - 需要实时查询分析时; - 具有大量数据时,如每天数亿事件的新增、每天数10T数据的增加; - 对数据尤其是大数据进行实时分析时; - 需要一个高可用、高容错、高性能数据库时。 **不适用场景** - Druid支持流式插入,但是不支持流式更新(更新需要通过后台批处理任务)。 - 需要建立一个离线的报表系统,因此查询延迟并不是系统的关键因素。 - 需要对多个大表进行join操作。 **架构** - Overlord Overlord负责接受任务、协调任务的分配、创建任务锁以及收集、返回任务运行状态给调用者。当集群中有多个Overlord时,则通过选举算法产生Leader,其他Follower作为备份。 - MiddleManager MiddleManager负责接收Overlord分配的实时任务,同时创建新的进程用于启动Peon来执行实时任务,每一个MiddleManager可以运行多个Peon实例,每个实时Peon既提供实时数据查询也负责实时数据的摄入工作。 - Coordinator Coordinator 主要负责Druid集群中Segment的管理与发布(主要是管理历史Segment),包括加载新Segment、丢弃不符合规则的Segment、管理Segment副本以及Segment负载均衡等。如果集群中存在多个Coordinator Node,则通过选举算法产生Leader,其他Follower作为备份。 - Historical Historical 的职责是负责加载Druid中非实时窗口内且满足加载规则的所有历史数据的Segment。每一个Historical Node只与Zookeeper保持同步,会把加载完成的Segment同步到Zookeeper。 - Broker Broker Node 是整个集群查询的入口,Broker 实时同步Zookeeper上保存的集群内所有已发布的Segment的元信息,即每个Segment保存在哪些存储节点上,Broker 为Zookeeper中每个dataSource创建一个timeline,timeline按照时间顺序描述了每个Segment的存放位置。 ![image-20240318085853436](冷链报警监控系统.assets/image-20240318085853436.png) ## 安装Druid 1)安装jdk配置JAVA_HOME 2)安装zookeeper+kafka 3)下载 apache-druid-0.15.1-incubating-bin.tar.gz [https://archive.apache.org](https://archive.apache.org/dist/druid/0.15.1-incubating/apache-druid-0.15.1-incubating-bin.tar.gz) 4)解压到/usr/local/druid 5)复制mysql驱动包到druid的mysql-metadata-storage下 cp /usr/local/mysql-connector-java-5.1.44-bin.jar /usr/local/druid/extensions/mysql-metadata-storage/ 6)修改 /bin/verify-default-ports ,把2181端口检测去掉 ``` my @ports = (1527, 8081, 8082, 8083, 8090, 8091, 8200, 9095); vi druid/conf/supervisor/single-server/micro-quickstart.conf #:verify bin/verify-default-ports #!p10 zk bin/run-zk conf ``` 7) 修改配置 ``` vi conf/druid/single-server/micro-quickstart/_common/common.runtime.properties druid.extensions.loadList=["druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches","mysql-metadata-storage"] druid.host=192.168.222.222 druid.zk.service.host=192.168.222.222 druid.zk.paths.base=/druid # 配置数据源为mysql # For Derby server on your Druid Coordinator (only viable in a cluster with a single Coordinator, no fail-over): #druid.metadata.storage.type=derby #druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/var/druid/metadata.db;create=true #druid.metadata.storage.connector.host=localhost #druid.metadata.storage.connector.port=1527 # For MySQL (make sure to include the MySQL JDBC driver on the classpath): druid.metadata.storage.type=mysql druid.metadata.storage.connector.connectURI=jdbc:mysql://192.168.222.222:3306/druid?serverTimezone=UTC druid.metadata.storage.connector.user=root druid.metadata.storage.connector.password=123456 ``` 8) 启动druid ``` /bin/start-micro-quickstart ``` 9) 打开8888端口网页,添加superverst ``` { "type": "kafka", "dataSchema": { "dataSource": "cold_chain_data_druid", "parser": { "type": "string", "parseSpec": { "format": "json", "timestampSpec": { "column": "sendTime", "format": "yyyy-MM-dd HH:mm:ss" }, "dimensionsSpec": { "dimensions": [ "companyName", "sendTime", "hostName", "areaName", "deviceCode", "lat", "lon", "tem", "maxTem", "minTem", "hum", "maxHum", "minHum", "alertTem", "alertHum", "deviceStatus" ] } } }, "metricsSpec": [], "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": { "type": "none" }, "rollup": false, "intervals": null }, "transformSpec": { "filter": null, "transforms": [] } }, "tuningConfig": { "type": "kafka", "maxRowsInMemory": 1000000, "maxBytesInMemory": 0, "maxRowsPerSegment": 5000000, "maxTotalRows": null, "intermediatePersistPeriod": "PT10M", "basePersistDirectory": "/usr/local/druid/var/tmp", "maxPendingPersists": 0, "indexSpec": { "bitmap": { "type": "concise" }, "dimensionCompression": "lz4", "metricCompression": "lz4", "longEncoding": "longs" }, "buildV9Directly": true, "reportParseExceptions": false, "handoffConditionTimeout": 0, "resetOffsetAutomatically": false, "segmentWriteOutMediumFactory": null, "workerThreads": null, "chatThreads": null, "chatRetries": 8, "httpTimeout": "PT10S", "shutdownTimeout": "PT80S", "offsetFetchPeriod": "PT30S", "intermediateHandoffPeriod": "P2147483647D", "logParseExceptions": false, "maxParseExceptions": 2147483647, "maxSavedParseExceptions": 0, "skipSequenceNumberAvailabilityCheck": false }, "ioConfig": { "topic": "cold_chain_data_druid", "replicas": 1, "taskCount": 1, "taskDuration": "PT600S", "consumerProperties": { "bootstrap.servers": "192.168.223.177:9092" }, "pollTimeout": 100, "startDelay": "PT5S", "period": "PT30S", "useEarliestOffset": false, "completionTimeout": "PT1200S", "lateMessageRejectionPeriod": null, "earlyMessageRejectionPeriod": null, "stream": "cold_chain_data_druid", "useEarliestSequenceNumber": false }, "context": null, "suspended": false } ``` ## 实时消息查询 ``` public class DruidHelper { private String url = "jdbc:avatica:remote:url=http://192.168.223.177:8888/druid/v2/sql/avatica/"; private Properties conf = new Properties(); private Connection connection; /** * 获得Druid连接 */ public Connection getConnection() { try { if (null == connection) { connection = DriverManager.getConnection(url, conf); } } catch (SQLException e) { e.printStackTrace(); } return connection; } /** * 关闭Druid连接 */ public void close(Connection connection, Statement st, ResultSet rs) { try { if (rs!=null) { rs.close(); } if (st!=null) { st.close(); } if (connection!=null) { connection.close(); } } catch (SQLException e) { e.printStackTrace(); } } } ``` ``` public interface IDruidService { /** * 查询实时消息 * @param params * @return */ List selectMessages(Map params); /** * 查询温度湿度图表数据 * @param params * @return */ Map selectChartTemHum(Map params); /** * 查询所有主机名 * @return */ List selectHostNames(); /** * 所有主机的最近消息 * @return */ List selectLatestMessages(); /** * 查询某台主机的最近消息 * @param hostName * @return */ AlertMessage selectHostLatestMessages(String hostName); } ``` ``` /** * 实时消息查询 */ @Service public class DruidServiceImpl implements IDruidService { private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 多条件查询消息 * @param params * @return */ @Override public List selectMessages(Map params) { DruidHelper helper = new DruidHelper(); Connection connection = helper.getConnection(); Statement statement = null; ResultSet resultSet = null; try { statement = connection.createStatement(); resultSet = statement.executeQuery(querySQL(params)); List messages = new ArrayList<>(); while(resultSet.next()){ AlertMessage message = readMessage(resultSet); messages.add(message); } return messages; } catch (Exception e) { throw new RuntimeException(e); } finally { helper.close(connection,statement,resultSet); } } /** * 查询温度湿度图表数据 * @param params * @return */ @Override public Map selectChartTemHum(Map params) { List messages = selectMessages(params); List times = new ArrayList<>(); List tems = new ArrayList<>(); List hums = new ArrayList<>(); //给数组赋值 for (AlertMessage message : messages) { String strTime = format.format(message.getSendTime()); times.add(strTime.substring(11)); tems.add(message.getTem()); hums.add(message.getHum()); } //按时间重新排序 Collections.reverse(times); Collections.reverse(tems); Collections.reverse(hums); Map map = new HashMap(); map.put("times", times); map.put("tems", tems); map.put("hums", hums); return map; } /** * 查询所有主机名称 * @return */ @Override public List selectHostNames() { String sql = "select hostName from cold_chain_data_druid group by hostName"; DruidHelper helper = new DruidHelper(); Connection connection = helper.getConnection(); Statement statement = null; ResultSet resultSet = null; try { statement = connection.createStatement(); resultSet = statement.executeQuery(sql); List messages = new ArrayList<>(); while(resultSet.next()){ messages.add(resultSet.getString("hostName")); } return messages; } catch (Exception e) { throw new RuntimeException(e); } finally { helper.close(connection,statement,resultSet); } } /** * 查询所有主机的最近消息 * @return */ @Override public List selectLatestMessages() { //获得所有主机名 List hosts = selectHostNames(); List latestMsgs = new ArrayList<>(); for(String host : hosts){ //找到每个主机的最近一条消息 AlertMessage message = selectHostLatestMessages(host); latestMsgs.add(message); } return latestMsgs; } /** * 查询主机最近消息 * @param hostName * @return */ @Override public AlertMessage selectHostLatestMessages(String hostName) { String sql = "select * from cold_chain_data_druid where hostName = '"+ hostName +"' order by __time desc limit 1"; DruidHelper helper = new DruidHelper(); Connection connection = helper.getConnection(); Statement statement = null; ResultSet resultSet = null; try { statement = connection.createStatement(); resultSet = statement.executeQuery(sql); if(resultSet.next()){ return readMessage(resultSet); } return null; } catch (Exception e) { throw new RuntimeException(e); } finally { helper.close(connection,statement,resultSet); } } /** * ResultSet转消息 * @param rs * @return * @throws Exception */ private AlertMessage readMessage(ResultSet rs) throws Exception { AlertMessage messageEntity = new AlertMessage(); messageEntity.setDeviceCode(rs.getString("deviceCode")); messageEntity.setHostName(rs.getString("hostName")); messageEntity.setCompanyName(rs.getString("companyName")); messageEntity.setTem(Long.valueOf(rs.getString("tem"))); messageEntity.setMaxTem(Long.valueOf(rs.getString("maxTem"))); messageEntity.setMinTem(Long.valueOf(rs.getString("minTem"))); messageEntity.setHum(Long.valueOf(rs.getString("hum"))); messageEntity.setMaxHum(Long.valueOf(rs.getString("maxHum"))); messageEntity.setMinHum(Long.valueOf(rs.getString("minHum"))); messageEntity.setAlertTem(Long.valueOf(rs.getString("alertTem"))); messageEntity.setAlertHum(Long.valueOf(rs.getString("alertHum"))); messageEntity.setDeviceStatus(rs.getString("deviceStatus")); messageEntity.setLat(rs.getString("lat")); messageEntity.setLon(rs.getString("lon")); messageEntity.setSendTime(format.parse(rs.getString("sendTime"))); return messageEntity; } /** * 拼接SQL * @param params * @return */ private String querySQL(Map params) { StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append("SELECT * from cold_chain_data_druid where 1= 1 "); //设备编码 String hostName = (String) params.get("hostName"); if (!StringUtils.isEmpty(hostName)) { stringBuffer.append(" and hostName = '"); stringBuffer.append(hostName); stringBuffer.append("' "); } //前台传入开始、结束时间 String startTime = (String) params.get("startTime"); if (!StringUtils.isEmpty(startTime)) { stringBuffer.append(" and __time > '"); stringBuffer.append(startTime); stringBuffer.append("' "); } String endTime = (String) params.get("endTime"); if (!StringUtils.isEmpty(endTime)) { stringBuffer.append(" and __time < '"); stringBuffer.append(endTime); stringBuffer.append("' "); } stringBuffer.append(" order by __time desc"); //查询条数 String limit = params.get("limit") != null ? params.get("limit").toString().trim() : "10"; stringBuffer.append(" limit "); stringBuffer.append(limit); System.out.println(stringBuffer.toString()); return stringBuffer.toString(); } } ``` ``` /** * Druid实时数据查询 */ @RestController @RequestMapping("/manage/druid") public class DruidMessageController { @Resource private IDruidService druidService; /** * 查询温度湿度消息 * @param hostName * @param limit * @return */ @RequestMapping("messages") public AjaxResult getTemHumMessage(String hostName,@RequestParam(required = false,defaultValue = "20") Integer limit){ Map params = new HashMap<>(); params.put("hostName",hostName); params.put("limit",limit); Map map = druidService.selectChartTemHum(params); return AjaxResult.success(map); } /** * 查询每个设备最近的消息 * @return */ @RequestMapping("latest-messages") public AjaxResult getLatestMessage(){ List messages = druidService.selectLatestMessages(); return AjaxResult.success(messages); } } ``` # 数据可视化 Superset 是Apache顶级项目,是由 Airbnb 开源的数据分析与可视化平台,同时也是由 Python 语言构建的轻量级 BI 系统。Superset 可实现对 TB 量级数据进行处理,兼容常见的数十种关系或非关系型数据库,并在内部实现 SQL 编辑查询等操作。 ## Superset安装 1) 拉取镜像 ``` docker pull amancevice/superset:0.37.2 ``` 2) 运行镜像 ``` docker run -d -p 8088:8088 -v /opt/docker/superset:/home/superset amancevice/superset ``` 3) 更新容器 ``` docker exec -it 0d6482595313 superset db upgrade ``` 4) 初始化 ``` docker exec -it 0d6482595313 superset init ``` 5)设置账号密码 ``` docker exec -it 0d6482595313 bash export FLASK_APP=superset flask fab create-admin ``` ## Superset使用 1) 输入IP:8088登录Superset页面,账号密码admin/123456,在mysql中创建person表 ![image-20240318135930795](冷链报警监控系统.assets/image-20240318135930795.png) 2) 点击Databases,创建数据源,点击Test提示OK ![image-20240318092413261](冷链报警监控系统.assets/image-20240318092413261.png) 3) 点击tables,创建表 ![image-20240318092527627](冷链报警监控系统.assets/image-20240318092527627.png) 4) 编辑表格,添加出生年份作为计算字段 ![image-20240318093710297](冷链报警监控系统.assets/image-20240318093710297.png) ![image-20240318093741014](冷链报警监控系统.assets/image-20240318093741014.png) 4) 点击Charts,创建图表 ![image-20240318093818458](冷链报警监控系统.assets/image-20240318093818458.png) 5) 设置图表参数如下 ![image-20240318114511887](冷链报警监控系统.assets/image-20240318114511887.png) 6) 图表可以生成iframe代码,嵌入到网页中 ![image-20240318114545673](冷链报警监控系统.assets/image-20240318114545673.png) 7) 分享的iframe可能出现跨域问题,需要设置角色,给Public角色添加权限 ![image-20240318114421825](冷链报警监控系统.assets/image-20240318114421825.png) 8) 在服务器的/opt/docker/superset目录下,创建superset_config.py文件,添加内容后重启容器 ```\ WTF_CSRF_ENABLED = False PUBLIC_ROLE_LIKE_GAMMA = True HTTP_HEADERS = {} WTF_CSRF_EXEMPT_LIST = [ "superset.views.core.explore_json" ] ``` ## 项目应用 1) 创建druid数据源 ![image-20240318142316866](冷链报警监控系统.assets/image-20240318142316866.png) 2) 创建表 ![image-20240318142410154](冷链报警监控系统.assets/image-20240318142410154.png) 3) 创建冷藏车温度报警折线图 ![image-20240318143759557](冷链报警监控系统.assets/image-20240318143759557.png) 4) 按需求创建其它的图表后嵌入到项目中 # 新增业务 添加库存管理业务 1. 入库:创建入库单后包括如下几个状态:未发货、在途(已发货未入库)、部分入库、作废、入库完成,入库类型包括:采购入库、外协入库、退货入库,入库单支持lodop和网页打印 ![1716190130108](README.assets/1716190130108.png) 2. 出库:创建出库单后包括如下几个状态:未发货、部分发货、已发货、作废,入库类型包括:销售出库、外协出库、调拨出库,出库单支持lodop和网页打印 ![1716190139798](README.assets/1716190139798.png) ![1716190693984](README.assets/1716190693984.png)