# delay_message
**Repository Path**: yushengruohui/delay_message
## Basic Information
- **Project Name**: delay_message
- **Description**: 基于 sqlite 实现 kafka 延时消息,实现思路适用于所有分布式消息队列,如 RocketMQ、ActiveMQ。性能表现:单线程 1.5w/s
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 4
- **Forks**: 0
- **Created**: 2022-01-08
- **Last Updated**: 2024-12-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 1、需求
延时消息(或者说定时消息)是业务系统里一个常见的功能点。常用业务场景如:
1) 订单超时取消
2) 离线超过指定时间的用户,召回通知
3) 手机消失多久后通知监护人……
现流行的实现方案主要有:
1)数据库定时轮询,扫描到达到延时时间的记录,业务处理,删除该记录
2)jdk 自带延时队列(DelayQueue),或优化的时间轮算法
3)redis 有序集合
4)支持延时消息的分布式消息队列
但以上方案,都存在各种缺陷:
1)定时轮询间隔小,则对数据库造成很大压力,分布式微服务架构不好适配。
2)jdk 自带延时队列,占用内存高,服务重启则丢失消息,分布式微服务架构不好适配。
3)redis 有序集合比较合适,但内存贵,分布式微服务架构不好适配。
4)现在主流的 RocketMQ 不支持任意延时时间的延时消息,RabbitMQ或ActiveMQ 性能不够好,发送配置麻烦,kafka不支持延时消息。
因此,我想实现一个**适配分布式**微服务架构、**高性能**、**方便业务系统使用**的延时消息转发中间件。
# 2、实现思路
要保证高性能,推荐使用 kafka 或者 RocketMQ 做分布式消息队列。当前是基于 sqlite 实现 kafka 延时消息。
当前实现思路是基于kafka的,实际适用于任意MQ产品。
## 2.1 整体实现思路

## 2.2 程序业务逻辑
1)业务系统先推送延时消息到统一延时消息队列
2)定时读取延时消息队列的延时消息,保存于本地,提交偏移量
3)定时扫描本地到达延时期限的消息,转发到实际业务消息队列
4)删除本地延时消息
## 2.3 实现细节
1)一个业务处理流程使用一个本地数据库,可并发执行提高性能。
2)参考雪花算法,去除机器码,高效生成 id 。
3)批量处理
4)本地存储使用 sqlite | rocksdb ,高效读写
## 2.4 依赖框架
1)kafka-client
2)sqlite或者rocksdb
3)slf4j+log4j2
4)jackson
# 3、性能测试
测试机器: i5-6500,16GB内存,机械硬盘
延时消息大小: 1kb
并发处理数:1
已本地简单测试[sqlite案例],性能表现:
1) 1个并发处理数就可以达到1秒存储、转发、删除 约15000条延时消息,2 个可以达到 30000条/s……
2) 一次性处理1万条记录,是经过多次对比试验得出的合适批次大小
# 4、部署
## 4.1 系统环境依赖
1)jdk 1.8
2)kafka 1.1.0
可以自行替换为符合实际kafka版本的jar包(不会有冲突的,jar包版本和kafka服务版本不一致可能会有异常[无法拉取消息、提交失败等])。
可修改pom.xml内的 kafka_version
```xml
1.1.0
```
重新打包即可。当前程序可以独立部署,对现有工程项目无侵入性。
## 4.2 安装
1)在项目根目录执行 mvn clean package 打包后,会在 target 目录下生成 delay-message-sqlite-1.0.0.tar.gz 文件
2)解压 delay-message-sqlite-1.0.0.tar.gz 到指定安装目录(推荐: /usr/local/delay_msg ) ,执行启动脚本即可
解压指令:
```shell
tar -zxvf delay-message-sqlite-1.0.0.tar.gz
```
执行启动脚本:
```shell
sh delay-message-sqlite-1.0.0/bin/start.sh
```
3)如需修改配置,可修改 delay-message-sqlite-1.0.0/resources/kafka.properties文件,设置后需要重启服务,配置才生效
默认配置如下:
```properties
# kafka 连接url [ip:port,ip:port……]
kafka.url=127.0.0.1:9092
# 延时消息本地存储路径,建议使用绝对值
kafka.delay.store.path=/var/delay_message/db
# 统一延时消息topic
kafka.delay.topic=common_delay_msg
# 消费者组id
kafka.delay.group.id=common_delay_app
# 并发处理数。限制条件: workers 小于等于topic分区数
kafka.delay.workers=2
```
4)业务方发送 kafka 消息到 topic (common_delay_msg)
消息体参数说明:
```json
{
"topic": "实际业务topic",
"messageKey": "消息的key,影响发送到那个分区",
"message": "业务消息内容",
"delayTime": 1641470704
}
```
delayTime: 指定延时时限,**秒级别时间戳**
消息体案例:
```json
{
"topic": "cancel_order",
"messageKey": "123456",
"message": "{\"orderId\":123456789123456,\"userId\":\"yhh\"}",
"delayTime": 1641470704
}
```
## 4.3 程序迁移
复制 **延时消息保存目录** 到新机器,重启部署、启动程序即可。(该配置项所在目录 kafka.delay.store.path=/var/delay_message/db)
## 4.4 排查日志
日志默认输出到 /var/delay_message/log ,日志输出方式为异步输出。
system.log 记录了系统 info 级别以上的日志,info级别日志不是立刻输出的,所以程序重启时,可能会丢失部分日志
exception.log 记录了系统 warn 级别以上的日志,日志配置为立即输出,程序正常重启,不会丢失日志,重点关注这个日志即可。
如需自定义日志配置,可以在 log4j2.xml 进行配置。
如果要进行本地调试,可以解开注释,否则控制台没有日志输出:
```xml
```
# 5、注意事项
1) 当前程序严格依赖于系统时钟,注意配置程序部署服务器的时钟和业务服务器时钟一致
2) 建议配置统一延时消息队列(common_delay_msg)的分区数为 2 的倍数
3) 每个 kafka.delay.workers 约需要 200 mb 内存,默认配置为2 , jvm 建议配置 1 GB 以上内存,避免频繁gc 。
**workers 增大后,不要再减小**,否则会导致部分 sqlite 数据库没有线程访问,消息丢失。
并发处理数越大,延时消息处理效率越高,但需要注意不要大于topic的分区数。
需要自行测试多少个并发处理数就会达到磁盘io、网络带宽上限。
当前程序主要瓶颈在于磁盘io和网络带宽,实际内存和cpu资源占用极低。
4) 程序运行时,不要操作延时消息保存目录即里面的文件
5) 当前配置为正常情况下不会抛弃消息模式,但程序重启时,存在重复发送消息的可能,下游业务系统需要做好幂等性处理。
如果kafka集群异常,当前配置为重新发送16次,如果仍不能恢复过来,则抛弃当前消息,实际生产环境里,基本不可能出现该场景。
如果确定消息不能抛弃,需要自行修改源码(MsgTransferTask.run,KafkaUtils.send(……)),重新打包、部署。
6) 程序出现未知异常(sqlite被手动修改、磁盘满了……),会直接结束程序运行。
# 6、闲聊
整体思路,实现,源码里都比较清晰,如果 RocketMQ 也有自定义延时需求,参考着修改源码即可,实现逻辑是一样的。
如果要尽可能的实现延时消息的最终处理,可以再额外采用2个延迟消息处理方案:
1、每天扫描一次数据库,把符合延时条件的记录统一处理一次
2、惰性处理,当用户再次访问某功能点时,再修改相关符合延时条件的记录
作者邮箱:1950249908@qq.com ,如有问题,欢迎骚扰。
源码路径:
https://gitee.com/yushengruohui/delay_message
https://github.com/yushengruohui/delay_message
转发请备注好原作者:yushengruohui 。欢迎转发。