# cold-chain
**Repository Path**: xray_0611/cold-chain
## Basic Information
- **Project Name**: cold-chain
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: MIT
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2024-03-08
- **Last Updated**: 2024-05-27
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 冷链监控系统
## 冷链行业
冷链(cold chain)是指某些食品原料、经过加工的食品或半成品、特殊的生物制品和药品在经过收购、加工、灭活后,在产品加工、贮藏、运输、分销和零售、使用过程中,其各个环节始终处于产品所 必需的特定低温环境下,减少损耗,防止污染和变质,以保证产品食品安全、生物安全、药品安全的特殊供应链系统。
冷链由冷冻加工、冷冻贮藏、冷藏运输及配送、冷冻销售四个方面构成:
1. 冷冻加工:包括肉禽类、鱼类和蛋类的冷却与冻结,以及在低温状态下的加工作业过程;也包括果 蔬的预冷;各种速冻食品和奶制品的低温加工等。在这个环节上主要涉及冷链装备有冷却、冻结装置和速冻装置
2. 冷冻贮藏:包括食品的冷却储藏和冻结储藏,以及水果蔬菜等食品的气调贮藏,它是保证食品在储 存和加工过程中的低温保鲜环境。在此环节主要涉及各类冷藏库/加工间、冷藏柜、冻结柜及家用冰箱等等。
3. 冷藏运输:包括食品的中、长途运输及短途配送等物流环节的低温状态。它主要涉及铁路冷藏车、 冷藏汽车、冷藏船、冷藏集装箱等低温运输工具。在冷藏运输过程中,温度波动是引起食品品质下 降的主要原因之一,所以运输工具应具有良好性能,在保持规定低温的同时,更要保持稳定的温 度,远途运输尤其重要。
4. 冷冻销售:包括各种冷链食品进入批发零售环节的冷冻储藏和销售,它由生产厂家、批发商和零售 商共同完成。随着大中城市各类连锁超市的快速发展,各种连锁超市正在成为冷链食品的主要销售 渠道,在这些零售终端中,大量使用了冷藏/冻陈列柜和储藏库,由此逐渐成为完整的食品冷链中不可或缺的重要环节。

冷链物流设备监控系统是一款应用于对食品药品冷链仓储、运输的环节中针对温度、湿度、电量等进行监控、预警和统计分析的系统。
实现了冷链监控环节的: 数据采集自动化 监控指标配置化 预警通知自动化 统计分析可视化 从而提升了生鲜、药品仓储、运输的安全管控水平,增强了政府、企业对业务各环节的了解和管理。
## 业务流程

## 数据流程

## 系统架构

## 数据库设计

# 数据采集
使用Netty开发数据采集服务器,接收报文,解析报文并序列化,最后发送给Kafka.

## 数据采集服务器
1) 创建SpringBoot项目,配置相关依赖
```
org.springframework.boot
spring-boot-starter-web
io.netty
netty-all
org.springframework.kafka
spring-kafka
com.cold
cold-common
3.8.4
```
2) 核心配置
```
server:
port: 8001
netty:
port: 11111
spring:
kafka:
bootstrap-servers: 192.168.223.177:9092
listener:
concurrency: 5
producer:
retries: 3
batch-size: 10000
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: coldflink
```
3) Kafka配置类
```
@Configuration
public class KafkaConfig {
//消息主题 对应数据采集服务
public static final String TOPIC = "cold_chain_data_collection";
//消息主题 对应数据分析服务
public static final String TOPIC_DRUID = "cold_chain_data_druid";
@Bean
public NewTopic newDataCollectionTopic(){
return new NewTopic(TOPIC,1,(short) 1);
}
@Bean
public NewTopic newDruidTopic(){
return new NewTopic(TOPIC_DRUID,1,(short)1);
}
}
```
4) Spring工具
```
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
public static T getBean(Class aClass){
Object bean = applicationContext.getBean(aClass);
return (T)bean;
}
}
```
5) 数据采集处理器
报警消息实体类
```\
/**
* 报警消息
*/
public class AlertMessage
{
private static final long serialVersionUID = 1L;
/** 消息id */
private Long id;
/** 设备代码 */
private String deviceCode;
/** 主机名称 */
private String hostName;
/**主机类型*/
private Long hostType;
/** 区域名称 */
private String areaName;
/** 公司名称 */
private String companyName;
/** 设备id */
private Long deviceId;
/** 主机id */
private Long hostId;
/** 区域id */
private Long areaId;
/** 公司id */
private Long companyId;
/** 温度 */
private Long tem;
/** 湿度 */
private Long hum;
/** 经度 */
private String lon;
/** 纬度 */
private String lat;
/** 最高温 */
private Long maxTem;
/** 最低温 */
private Long minTem;
/** 最大湿度 */
private Long maxHum;
/** 最小湿度 */
private Long minHum;
/** 温度报警 0 正常 1 高温 -1 低温 */
private Long alertTem;
/** 湿度报警 0 正常 1 湿度高 -1 湿度低 */
private Long alertHum;
/** 发送时间 */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date sendTime;
/** 设备状态值 0 停用 1 正常 -1 异常 */
private Long deviceState;
/** 设备状态文字 停用 正常 异常 */
private String deviceStatus;
....
}
```
数据采集处理器
```
/**
* 数据采集处理器
*/
@Slf4j
public class DataCollectionHandler extends SimpleChannelInboundHandler {
private Logger logger = LoggerFactory.getLogger(DataCollectionHandler.class);
private KafkaTemplate kafkaTemplate = SpringContextUtil.getBean(KafkaTemplate.class);
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
//解析报文 MSG|WHCC-A01|41|32|1|0.000000|0.000000|2024-03-01 10:22:56|V1.0
try {
logger.info("处理报文:" + s);
String[] strs = s.split("\\|");
AlertMessage message = new AlertMessage();
message.setDeviceCode(strs[1]);
message.setTem(Long.parseLong(strs[2]));
message.setHum(Long.parseLong(strs[3]));
message.setDeviceState(Long.parseLong(strs[4]));
message.setLon(strs[5]);
message.setLat(strs[6]);
message.setSendTime(format.parse(strs[7]));
//序列化为json
String json = new ObjectMapper().writeValueAsString(message);
//发送到kafka
kafkaTemplate.send(KafkaConfig.TOPIC, json);
logger.info("消息发送成功: " + json);
}catch (Exception ex){
ex.printStackTrace();
log.error("消息处理异常:" + s,ex);
}
}
}
```
6) 数据采集服务
```
/**
* 数据采集服务
*/
@Component
public class DataCollectionServer implements ApplicationListener {
@Value("${netty.port}")
private Integer port;
/**
* 启动数据采集服务
*/
public void start(){
NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
NioEventLoopGroup workGroup = new NioEventLoopGroup(10);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new StringEncoder())
.addLast(new StringDecoder())
.addLast(new DataCollectionHandler());
}
});
ChannelFuture sync = serverBootstrap.bind(port).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
@Override
public void onApplicationEvent(ApplicationContextEvent event) {
System.out.println("启动数据采集服务...");
start();
}
}
```
## 模拟报文
测试阶段无法使用真正的物联网设备,可以使用定时任务实现模拟报文
这里使用了若依系统结合Quarz框架做定时任务
1) 生成随机消息的类
```
/**
* 模拟硬件消息
*/
public class MessageTask {
private Logger logger = LoggerFactory.getLogger(MessageTask.class);
private Random random = new Random();
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void sendMessage(String deviceCode){
NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new StringEncoder())
.addLast(new StringDecoder());
}
});
ChannelFuture sync = bootstrap.connect("127.0.0.1", 11111).sync();
//随机数据报文
String msg = random(deviceCode);
sync.channel().writeAndFlush(msg);
sync.channel().closeFuture();
}catch (Exception ex){
logger.error("发送消息异常",ex);
}finally {
workGroup.shutdownGracefully();
}
}
public String random(String deviceCode){
//随机产生温度 -30 ~ 50
int tem = random.nextInt(80) - 30;
//随机湿度 0 ~ 90
int hum = random.nextInt(90);
//随机状态 0 停用 1 正常 -1 异常
int[] states = {0,1,-1};
//随机经度
double lon = 90 + random.nextInt(30) + random.nextDouble();
//随机纬度
double lat = 30 + random.nextInt(10) + random.nextDouble();
int state = states[random.nextInt(states.length)];
String message = String.format("MSG|%s|%d|%d|%d|%f|%f|%s|V1.0",
deviceCode,tem,hum,state,lon,lat,dateFormat.format(new Date()));
System.out.println(message);
return message;
}
}
```
2) 具体的模拟设备任务
```
@Component("taskCXASJ01")
public class TaskCXASJ01 extends MessageTask {
public void sendMessage(){
super.sendMessage("CXAS-J01");
}
}
```
3) 若依系统上配置定时任务

# 实时数据计算
## Flink概述
**数据流与流计算**
数据流是一串连续不断的数据的集合,就象水管里的水流,在水管的一端一点一点地供水,而在水管的
另一端看到的是一股连续不断的水流。类似于人们对河流的理解本质上也就是流的概念,但是这条河没
有开始也没有结束,数据流非常适合于离散的、没有开头或结尾的数据。例如,交通信号灯的数据是连
续的,没有“开始”或“结束”,是连续的过程而不是分批发送的数据记录。通常情况下,数据流对于在生
成连续数据流中以小尺寸(通常以KB字节为单位)发送数据的数据源类型是有用的。这包括各种各样的
数据源,例如来自连接设备的遥测,客户访问的Web应用时生成的日志文件、电子商务交易或来自社交
网络或地理LBS服务的信息等。
传统上,数据是分批移动的,批处理通常同时处理大量数据,具有较长时间的延迟。例如,该复制过程
每24小时运行一次。虽然这可以是处理大量数据的有效方法,但它不适用于流式传输的数据,因为数据
在处理时已经是旧的内容。
采用数据流是时间序列和随时间检测模式的最佳选择。例如,跟踪Web会话的时间。大多数物联网产生
的数据非常适合数据流处理,包括交通传感器,健康传感器,交易日志和活动日志等都是数据流的理想
选择。
流数据通常用于实时聚合和关联、过滤或采样。通过数据流,我们可以实时分析数据,并深入了解各种
行为,例如统计,服务器活动,设备地理位置或网站点击量等。
**数据流整合技术的解决方案**
- 金融机构跟踪市场变化,并可根据配置约束(例如达到特定股票价格时出售)调整客户组合的配
置。
- 电网监控吞吐量并在达到某些阈值时生成警报。
- 新闻资讯APP从各种平台进行流式传输时,产生的点击记录,实时统计信息数据,以便它可以提供
与受众人口相关的文章推荐。
- 电子商务站点以数据流传输点击记录,可以检查数据流中的异常行为,并在点击流显示异常行为时
发出安全警报。
**数据流带给我们的挑战**
数据流是一种功能强大的工具,但在使用流数据源时,有一些常见的挑战。以下的列表显示了要规划数
据流的一些事项:
- 可扩展性规划
- 数据持久性规划
- 如何在存储层和处理层中加入容错机制
**数据流的管理工具**
随着数据流的不断增长,出现了许多合适的大数据流解决方案。我们总结了一个列表,这些都是用于处
理流数据的常用工具:
- Apache Kafka
是一个分布式发布/订阅消息传递系统,它集成了应用程序和数据流处理。
- Apache Storm
Apache Storm是一个分布式实时计算系统。Storm用于分布式机器学习、实时分析处理,尤其是其具
有超高数据处理的能力。
- Apache Flink
Apache Flink是一种数据流引擎,为数据流上的分布式计算提供了诸多便利。
**Flink简介**
Apache Flink 是一个开源的分布式流式处理框架,是新的流数据计算引擎,用java实现。Flink可以:
- 提供准确的结果,甚至在出现无序或者延迟加载的数据的情况下。
- 它是状态化的容错的,同时在维护一次完整的的应用状态时,能无缝修复错误。
- 大规模运行,在上千个节点运行时有很好的吞吐量和低延迟。

Flink的核心组件:

**应用场景**
Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:

Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。
Flink适用的应用场景包括:
1. 事件驱动型应用
反欺诈
异常检测
基于规则的报警
业务流程监控
社交网络Web 应用
2. 数据分析应用
电信网络质量监控
移动应用中的产品更新及实验评估分析
消费者技术中的实时数据即席分析
大规模图分析
3. 数据管道应用
电商中的实时查询索引构建
电商中的持续 ETL
**Flink架构**
Flink在运行中主要有三个组件组成,JobClient,JobManager 和 TaskManager。
- Program Code:我们编写的 Flink 应用程序代码。
- Job Client:Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点。 Job Client 负责
接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。 执行完
成后,Job Client 将结果返回给用户。
- Job Manager:主进程(也称为作业管理器)协调和管理程序的执行。 它的主要职责包括安排任
务,管理checkpoint ,故障恢复等。机器集群中至少要有一个 master,master 负责调度 task,
协调 checkpoints 和容灾,高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是
standby; Job Manager 包含 Actor system、Scheduler、Check pointing 三个重要的组件。
- Task Manager:从 Job Manager 处接收需要部署的 Task。
Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点。
## Flink安装
Flink的运行一般分为三种模式,即local、Standalone、On Yarn。
**下载程序:**
1. **Local****模式**
Local模式比较简单,用于本地测试,安装过程也比较简单,只需在主节点上解压安装包就代表成功安
装了,在flink安装目录下使用**./bin/start-cluster.sh**(windows环境下是.bat)命令,就可以通过
master:8081监控集群状态,关闭集群命令:**./bin/stop-cluster.sh**(windows环境下是.bat)。
2. **Standalone****模式**
Standalone模式顾名思义,是在本地集群上调度执行,不依赖于外部调度机制例如YARN。此时需要对
配置文件进行一些简单的修改,我们预计使用当前服务器充当Job manager和Task Manager,一般情
况下需要多台机器。
在安装Flink之前,需要对安装环境进行检查,对于Standalone模式,需要提前安装好zookeeper。
1) 修改配置
```
tar -xzf flink-1.9.1-bin-scala_2.12.tgz
cd /opt/flink-1.9.1/conf
vim flink-conf.yaml
# 主要更改的位置有:
jobmanager.rpc.address: 172.17.0.143
taskmanager.numberOfTaskSlots: 2
parallelism.default: 4
#取消下面两行的注释
rest.port: 8081
rest.address: 0.0.0.0
```
2) 启动flink集群 (因为在环境变量中已经指定了flink的bin位置,因此可以直接输入start-cluster.sh)
3) 验证flink进程,登录web界面,查看Web界面是否正常。至此,standalone模式已成功安装。

**示例演示**
Flink安装目录下的example目录里有一些Flink程序示例,我们可以使用这些示例来感受一下Flink的功能。
下面的步骤是我们演示SocketWindowWordCount这个应用的过程,这个应用的作用是监听某个socket
服务器端口,实时计算这个端口数据的单词数量。
1. 打开端口
```
# 在nc命令行中输入文本, 必要时需要安装nc命令, yum -y install nc
[root@node2-vm06 streaming]# nc -l 9010
```
2. 提交示例应用
有两种方式提交应用:
1)使用flink命令提交应用
```
[root@node2-vm06 streaming]# /opt/flink-1.9.1/bin/flink run /opt/flink-
1.9.1/examples/streaming/SocketWindowWordCount.jar --port 9010
```
2)页面上选择应用的jar文件

3. 在nc命令行中输入单词
```
nc -l 9010
abc abc def dfs def
ttt ttt ggg ggg
```
4. 查看结果
```
def : 2
dfs : 1
abc : 2
ttt : 2
ggg : 2
```
## 项目开发
### 缓存设备详情
查询设备详情信息缓存到Redis中,在实时消息处理时作为报警状态设置的依据
```
/**
* 设备详情
*/
public interface DeviceDetailsMapper {
@Select("select d.code device_code,d.min_tem,d.max_tem,d.min_hum,d.max_hum,d.status device_status,\n" +
"c.abbr company_name,h.code host_code,h.type host_type,a.name area_name\n" +
"from cc_monitor_device d,cc_company c,cc_area a,cc_host h \n" +
"where d.host_id = h.id and h.area_id = a.id and a.company_id = c.id;")
@Results({
@Result(property = "deviceCode",column = "device_code"),
@Result(property = "hostName",column = "host_code"),
@Result(property = "hostType",column = "host_type"),
@Result(property = "areaName",column = "area_name"),
@Result(property = "companyName",column = "company_name"),
@Result(property = "minTem",column = "min_tem"),
@Result(property = "maxTem",column = "max_tem"),
@Result(property = "minHum",column = "min_hum"),
@Result(property = "maxHum",column = "max_hum"),
@Result(property = "deviceStatus",column = "device_status")
})
List selectDeviceDetails();
}
```
设备详细业务
```
/**
* 设备详细业务
*/
public interface IDeviceDetailsService {
/**
* 查询数据库中的设备详情保存到Redis中
*/
void saveDeviceDetailsToRedis();
}
@Service
public class DeviceDetailsServiceImpl implements IDeviceDetailsService {
@Resource
private DeviceDetailsMapper deviceDetailsMapper;
@Override
public void saveDeviceDetailsToRedis() {
List list = deviceDetailsMapper.selectDeviceDetails();
Jedis jedis = JedisUtils.getInstance().getJedis();
list.forEach(deviceDetails -> {
jedis.set(deviceDetails.getDeviceCode(),JSON.toJSONString(deviceDetails));
});
}
}
```
### 数据收集
1) 创建Maven项目cold-data-collection
2) 配置pom
```
UTF-8
1.9.1
1.8
2.11
${java.version}
${java.version}
org.apache.flink
flink-avro
${flink.version}
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}
org.apache.kafka
kafka-clients
2.2.1
com.cold
cold-common
3.8.4
```
3) 编写实时计算
```
/**
* 报警消息实时计算 填充温度和湿度报警属性
*/
public class AlertMessageMap implements MapFunction {
@Override
public AlertMessage map(AlertMessage value){
JedisUtils jedisUtils = JedisUtils.getInstance();
Jedis jedis = jedisUtils.getJedis();
try {
//读取redis中的设备数据
String json = jedis.get(value.getDeviceCode());
DeviceDetails device = JSON.parseObject(json, DeviceDetails.class);
//把设备数据填充到消息中
value.setAreaName(device.getAreaName());
value.setCompanyName(device.getCompanyName());
value.setHostName(device.getHostName());
value.setMaxHum(device.getMaxHum());
value.setMinHum(device.getMinHum());
value.setMaxTem(device.getMaxTem());
value.setMinTem(device.getMinTem());
value.setHostType(device.getHostType());
if(device.getDeviceStatus() == 0){
value.setDeviceStatus("停用");
}else if(device.getDeviceStatus() == 1){
value.setDeviceStatus("正常");
}else if(device.getDeviceStatus() == -1){
value.setDeviceStatus("异常");
}
//设置温度湿度报警
if (value.getTem() > value.getMaxTem()) {
value.setAlertTem(1L); //高温
} else if (value.getTem() < value.getMinTem()) {
value.setAlertTem(-1L); //低温
} else {
value.setAlertTem(0L); //正常
}
if (value.getHum() > value.getMaxHum()) {
value.setAlertHum(1L);
} else if (value.getHum() < value.getMinHum()) {
value.setAlertHum(-1L);
} else {
value.setAlertHum(0L);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
jedisUtils.returnJedis(jedis);
}
System.out.println("转换消息:" + value);
return value;
}
}
```
4) 启动实时计算任务
```
/**
* Flink处理实时报警消息的任务
*/
public class MessageStreamingJob {
//消息来源主题 对应数据采集服务
public static final String FROM_TOPIC = "cold_chain_data_collection";
//消息目标主题
public static final String TO_TOPIC = "cold_chain_data_druid";
public static void main(String[] args) throws Exception {
//获得运行环境 createLocalEnvironment 本地运行 getExecutionEnvironment 部署服务器
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
//设置检查点容错
env.enableCheckpointing(5000);
//检查点模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointConfig.DEFAULT_MODE);
//设置重启策略
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1000));
//消费者kafka队列属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.223.177:9092");
properties.setProperty("group.id", "coldflink");
properties.setProperty("auto.offset.reset", "earliest");
//创建Flink消费者,将从队列中读取消息
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(FROM_TOPIC,new SimpleStringSchema(),properties);
consumer.setStartFromLatest();
//输入数据流,进行数据转换 json --> AlertMessage --> 填充报警信息
DataStream stream = env.addSource(consumer)
.setParallelism(1)
.map(msg -> JSON.parseObject(msg,AlertMessage.class))
.map(new AlertMessageMap());
//创建生产者,将计算完的数据发到下个队列
FlinkKafkaProducer producer = new FlinkKafkaProducer<>(TO_TOPIC,new SimpleStringSchema(),properties);
//将流数据转json发给kafka
stream.map((alertMessage -> JSON.toJSONString(alertMessage))).addSink(producer);
producer.close();
env.execute("冷链物流实时监控任务2.0");
}
}
```
# 实时数据分析
## Apache Druid
**Apache Druid 简介**
Apache Druid是一个拥有大数据实时查询和分析的高容错、高性能开源分布式系统,旨在快速处理大
规模的数据,并能够实现快速查询和分析。尤其是当发生代码部署、机器故障以及其他产品系统遇到宕
机等情况时,Druid仍然能够保持100%正常运行。创建Druid的最初意图主要是为了解决查询延时问
题,当时试图使用hadoop来实现交互式查询分析,但是很难满足实时分析的需要。而Druid提供了以交
互方式访问数据的能力,并权衡了查询的灵活性和性能二采取了特殊的存储格式。
Druid允许以类似Dremel和PowerDrill的方式进行单表查询,同时还增加了一些新特性,如为局部嵌套
数据结构提供列式存储格式、为快速过滤做索引、实时摄取和查询、高容错的分布式体系架构等。
**特性**
- 为分析而设计:为OLAP工作流的探索性分析而构建,支持各种过滤、聚合和查询等类;
- 快速的交互式查询:Druid的低延迟数据摄取架构允许事件在他们创建后毫秒内可被查询到;
- 高可用性:Druid的数据在系统更新时依然可用,规模的扩大和缩小都不会造成数据丢失;
- 可扩展:Druid已实现每天能够处理数十亿事件和TB级数据。
Druid设计时的三个原则:
- 快速查询 : 部分数据聚合 + 内存化 + 索引
- 水平拓展能力: 分布式数据 + 并行化查询
- 实时分析 : 数据不可更改 、只能追加
**适用场景**
- 需要交互式聚合和快速探究大量数据时;
- 需要实时查询分析时;
- 具有大量数据时,如每天数亿事件的新增、每天数10T数据的增加;
- 对数据尤其是大数据进行实时分析时;
- 需要一个高可用、高容错、高性能数据库时。
**不适用场景**
- Druid支持流式插入,但是不支持流式更新(更新需要通过后台批处理任务)。
- 需要建立一个离线的报表系统,因此查询延迟并不是系统的关键因素。
- 需要对多个大表进行join操作。
**架构**
- Overlord
Overlord负责接受任务、协调任务的分配、创建任务锁以及收集、返回任务运行状态给调用者。当集群中有多个Overlord时,则通过选举算法产生Leader,其他Follower作为备份。
- MiddleManager
MiddleManager负责接收Overlord分配的实时任务,同时创建新的进程用于启动Peon来执行实时任务,每一个MiddleManager可以运行多个Peon实例,每个实时Peon既提供实时数据查询也负责实时数据的摄入工作。
- Coordinator
Coordinator 主要负责Druid集群中Segment的管理与发布(主要是管理历史Segment),包括加载新Segment、丢弃不符合规则的Segment、管理Segment副本以及Segment负载均衡等。如果集群中存在多个Coordinator Node,则通过选举算法产生Leader,其他Follower作为备份。
- Historical
Historical 的职责是负责加载Druid中非实时窗口内且满足加载规则的所有历史数据的Segment。每一个Historical Node只与Zookeeper保持同步,会把加载完成的Segment同步到Zookeeper。
- Broker
Broker Node 是整个集群查询的入口,Broker 实时同步Zookeeper上保存的集群内所有已发布的Segment的元信息,即每个Segment保存在哪些存储节点上,Broker 为Zookeeper中每个dataSource创建一个timeline,timeline按照时间顺序描述了每个Segment的存放位置。

## 安装Druid
1)安装jdk配置JAVA_HOME
2)安装zookeeper+kafka
3)下载 apache-druid-0.15.1-incubating-bin.tar.gz
[https://archive.apache.org](https://archive.apache.org/dist/druid/0.15.1-incubating/apache-druid-0.15.1-incubating-bin.tar.gz)
4)解压到/usr/local/druid
5)复制mysql驱动包到druid的mysql-metadata-storage下
cp /usr/local/mysql-connector-java-5.1.44-bin.jar /usr/local/druid/extensions/mysql-metadata-storage/
6)修改 /bin/verify-default-ports ,把2181端口检测去掉
```
my @ports = (1527, 8081, 8082, 8083, 8090, 8091, 8200, 9095);
vi druid/conf/supervisor/single-server/micro-quickstart.conf
#:verify bin/verify-default-ports
#!p10 zk bin/run-zk conf
```
7) 修改配置
```
vi conf/druid/single-server/micro-quickstart/_common/common.runtime.properties
druid.extensions.loadList=["druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches","mysql-metadata-storage"]
druid.host=192.168.222.222
druid.zk.service.host=192.168.222.222
druid.zk.paths.base=/druid
# 配置数据源为mysql
# For Derby server on your Druid Coordinator (only viable in a cluster with a single Coordinator, no fail-over):
#druid.metadata.storage.type=derby
#druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/var/druid/metadata.db;create=true
#druid.metadata.storage.connector.host=localhost
#druid.metadata.storage.connector.port=1527
# For MySQL (make sure to include the MySQL JDBC driver on the classpath):
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://192.168.222.222:3306/druid?serverTimezone=UTC
druid.metadata.storage.connector.user=root
druid.metadata.storage.connector.password=123456
```
8) 启动druid
```
/bin/start-micro-quickstart
```
9) 打开8888端口网页,添加superverst
```
{
"type": "kafka",
"dataSchema": {
"dataSource": "cold_chain_data_druid",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "sendTime",
"format": "yyyy-MM-dd HH:mm:ss"
},
"dimensionsSpec": {
"dimensions": [
"companyName",
"sendTime",
"hostName",
"areaName",
"deviceCode",
"lat",
"lon",
"tem",
"maxTem",
"minTem",
"hum",
"maxHum",
"minHum",
"alertTem",
"alertHum",
"deviceStatus"
]
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/usr/local/druid/var/tmp",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs"
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": false,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false
},
"ioConfig": {
"topic": "cold_chain_data_druid",
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT600S",
"consumerProperties": {
"bootstrap.servers": "192.168.223.177:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1200S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"stream": "cold_chain_data_druid",
"useEarliestSequenceNumber": false
},
"context": null,
"suspended": false
}
```
## 实时消息查询
```
public class DruidHelper {
private String url = "jdbc:avatica:remote:url=http://192.168.223.177:8888/druid/v2/sql/avatica/";
private Properties conf = new Properties();
private Connection connection;
/**
* 获得Druid连接
*/
public Connection getConnection() {
try {
if (null == connection) {
connection = DriverManager.getConnection(url, conf);
}
} catch (SQLException e) {
e.printStackTrace();
}
return connection;
}
/**
* 关闭Druid连接
*/
public void close(Connection connection, Statement st, ResultSet rs) {
try {
if (rs!=null) {
rs.close();
}
if (st!=null) {
st.close();
}
if (connection!=null) {
connection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
```
```
public interface IDruidService {
/**
* 查询实时消息
* @param params
* @return
*/
List selectMessages(Map params);
/**
* 查询温度湿度图表数据
* @param params
* @return
*/
Map selectChartTemHum(Map params);
/**
* 查询所有主机名
* @return
*/
List selectHostNames();
/**
* 所有主机的最近消息
* @return
*/
List selectLatestMessages();
/**
* 查询某台主机的最近消息
* @param hostName
* @return
*/
AlertMessage selectHostLatestMessages(String hostName);
}
```
```
/**
* 实时消息查询
*/
@Service
public class DruidServiceImpl implements IDruidService {
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 多条件查询消息
* @param params
* @return
*/
@Override
public List selectMessages(Map params) {
DruidHelper helper = new DruidHelper();
Connection connection = helper.getConnection();
Statement statement = null;
ResultSet resultSet = null;
try {
statement = connection.createStatement();
resultSet = statement.executeQuery(querySQL(params));
List messages = new ArrayList<>();
while(resultSet.next()){
AlertMessage message = readMessage(resultSet);
messages.add(message);
}
return messages;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
helper.close(connection,statement,resultSet);
}
}
/**
* 查询温度湿度图表数据
* @param params
* @return
*/
@Override
public Map selectChartTemHum(Map params) {
List messages = selectMessages(params);
List times = new ArrayList<>();
List tems = new ArrayList<>();
List hums = new ArrayList<>();
//给数组赋值
for (AlertMessage message : messages) {
String strTime = format.format(message.getSendTime());
times.add(strTime.substring(11));
tems.add(message.getTem());
hums.add(message.getHum());
}
//按时间重新排序
Collections.reverse(times);
Collections.reverse(tems);
Collections.reverse(hums);
Map map = new HashMap();
map.put("times", times);
map.put("tems", tems);
map.put("hums", hums);
return map;
}
/**
* 查询所有主机名称
* @return
*/
@Override
public List selectHostNames() {
String sql = "select hostName from cold_chain_data_druid group by hostName";
DruidHelper helper = new DruidHelper();
Connection connection = helper.getConnection();
Statement statement = null;
ResultSet resultSet = null;
try {
statement = connection.createStatement();
resultSet = statement.executeQuery(sql);
List messages = new ArrayList<>();
while(resultSet.next()){
messages.add(resultSet.getString("hostName"));
}
return messages;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
helper.close(connection,statement,resultSet);
}
}
/**
* 查询所有主机的最近消息
* @return
*/
@Override
public List selectLatestMessages() {
//获得所有主机名
List hosts = selectHostNames();
List latestMsgs = new ArrayList<>();
for(String host : hosts){
//找到每个主机的最近一条消息
AlertMessage message = selectHostLatestMessages(host);
latestMsgs.add(message);
}
return latestMsgs;
}
/**
* 查询主机最近消息
* @param hostName
* @return
*/
@Override
public AlertMessage selectHostLatestMessages(String hostName) {
String sql = "select * from cold_chain_data_druid where hostName = '"+ hostName +"' order by __time desc limit 1";
DruidHelper helper = new DruidHelper();
Connection connection = helper.getConnection();
Statement statement = null;
ResultSet resultSet = null;
try {
statement = connection.createStatement();
resultSet = statement.executeQuery(sql);
if(resultSet.next()){
return readMessage(resultSet);
}
return null;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
helper.close(connection,statement,resultSet);
}
}
/**
* ResultSet转消息
* @param rs
* @return
* @throws Exception
*/
private AlertMessage readMessage(ResultSet rs) throws Exception {
AlertMessage messageEntity = new AlertMessage();
messageEntity.setDeviceCode(rs.getString("deviceCode"));
messageEntity.setHostName(rs.getString("hostName"));
messageEntity.setCompanyName(rs.getString("companyName"));
messageEntity.setTem(Long.valueOf(rs.getString("tem")));
messageEntity.setMaxTem(Long.valueOf(rs.getString("maxTem")));
messageEntity.setMinTem(Long.valueOf(rs.getString("minTem")));
messageEntity.setHum(Long.valueOf(rs.getString("hum")));
messageEntity.setMaxHum(Long.valueOf(rs.getString("maxHum")));
messageEntity.setMinHum(Long.valueOf(rs.getString("minHum")));
messageEntity.setAlertTem(Long.valueOf(rs.getString("alertTem")));
messageEntity.setAlertHum(Long.valueOf(rs.getString("alertHum")));
messageEntity.setDeviceStatus(rs.getString("deviceStatus"));
messageEntity.setLat(rs.getString("lat"));
messageEntity.setLon(rs.getString("lon"));
messageEntity.setSendTime(format.parse(rs.getString("sendTime")));
return messageEntity;
}
/**
* 拼接SQL
* @param params
* @return
*/
private String querySQL(Map params) {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("SELECT * from cold_chain_data_druid where 1= 1 ");
//设备编码
String hostName = (String) params.get("hostName");
if (!StringUtils.isEmpty(hostName)) {
stringBuffer.append(" and hostName = '");
stringBuffer.append(hostName);
stringBuffer.append("' ");
}
//前台传入开始、结束时间
String startTime = (String) params.get("startTime");
if (!StringUtils.isEmpty(startTime)) {
stringBuffer.append(" and __time > '");
stringBuffer.append(startTime);
stringBuffer.append("' ");
}
String endTime = (String) params.get("endTime");
if (!StringUtils.isEmpty(endTime)) {
stringBuffer.append(" and __time < '");
stringBuffer.append(endTime);
stringBuffer.append("' ");
}
stringBuffer.append(" order by __time desc");
//查询条数
String limit = params.get("limit") != null ? params.get("limit").toString().trim() : "10";
stringBuffer.append(" limit ");
stringBuffer.append(limit);
System.out.println(stringBuffer.toString());
return stringBuffer.toString();
}
}
```
```
/**
* Druid实时数据查询
*/
@RestController
@RequestMapping("/manage/druid")
public class DruidMessageController {
@Resource
private IDruidService druidService;
/**
* 查询温度湿度消息
* @param hostName
* @param limit
* @return
*/
@RequestMapping("messages")
public AjaxResult getTemHumMessage(String hostName,@RequestParam(required = false,defaultValue = "20") Integer limit){
Map params = new HashMap<>();
params.put("hostName",hostName);
params.put("limit",limit);
Map map = druidService.selectChartTemHum(params);
return AjaxResult.success(map);
}
/**
* 查询每个设备最近的消息
* @return
*/
@RequestMapping("latest-messages")
public AjaxResult getLatestMessage(){
List messages = druidService.selectLatestMessages();
return AjaxResult.success(messages);
}
}
```
# 数据可视化
Superset 是Apache顶级项目,是由 Airbnb 开源的数据分析与可视化平台,同时也是由 Python 语言构建的轻量级 BI 系统。Superset 可实现对 TB 量级数据进行处理,兼容常见的数十种关系或非关系型数据库,并在内部实现 SQL 编辑查询等操作。
## Superset安装
1) 拉取镜像
```
docker pull amancevice/superset:0.37.2
```
2) 运行镜像
```
docker run -d -p 8088:8088 -v /opt/docker/superset:/home/superset amancevice/superset
```
3) 更新容器
```
docker exec -it 0d6482595313 superset db upgrade
```
4) 初始化
```
docker exec -it 0d6482595313 superset init
```
5)设置账号密码
```
docker exec -it 0d6482595313 bash
export FLASK_APP=superset
flask fab create-admin
```
## Superset使用
1) 输入IP:8088登录Superset页面,账号密码admin/123456,在mysql中创建person表

2) 点击Databases,创建数据源,点击Test提示OK

3) 点击tables,创建表

4) 编辑表格,添加出生年份作为计算字段


4) 点击Charts,创建图表

5) 设置图表参数如下

6) 图表可以生成iframe代码,嵌入到网页中

7) 分享的iframe可能出现跨域问题,需要设置角色,给Public角色添加权限

8) 在服务器的/opt/docker/superset目录下,创建superset_config.py文件,添加内容后重启容器
```\
WTF_CSRF_ENABLED = False
PUBLIC_ROLE_LIKE_GAMMA = True
HTTP_HEADERS = {}
WTF_CSRF_EXEMPT_LIST = [
"superset.views.core.explore_json"
]
```
## 项目应用
1) 创建druid数据源

2) 创建表

3) 创建冷藏车温度报警折线图

4) 按需求创建其它的图表后嵌入到项目中
# 新增业务
添加库存管理业务
1. 入库:创建入库单后包括如下几个状态:未发货、在途(已发货未入库)、部分入库、作废、入库完成,入库类型包括:采购入库、外协入库、退货入库,入库单支持lodop和网页打印

2. 出库:创建出库单后包括如下几个状态:未发货、部分发货、已发货、作废,入库类型包括:销售出库、外协出库、调拨出库,出库单支持lodop和网页打印

