# LLM-Framework-New
**Repository Path**: mstes/llm-framework-new
## Basic Information
- **Project Name**: LLM-Framework-New
- **Description**: LLM Message Processor 是基于 RabbitMQ、MinIO 和状态机的异步消息处理系统,支持消息消费 / 发布、对象存取、内存缓存与结构化日志。核心通过单条消息一次状态机流转(LOAD→HANDLER/EXCEPTION)实现可靠处理
- **Primary Language**: Python
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2026-02-22
- **Last Updated**: 2026-03-02
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# LLM Message Processor 系统文档
## 1. 项目概述
LLM Message Processor 是一个基于 **RabbitMQ** 的异步消息处理系统,集成了 **MinIO 对象存储**、**本地内存缓存**和**结构化日志**,通过状态机模式实现消息的可靠处理与流转。
### 核心功能
- RabbitMQ 消息消费与发布(支持死信队列)
- MinIO 二进制/JSON 对象存取
- 内存缓存管理(支持视频与传感器数据)
- 结构化日志(控制台+JSON文件,含异常堆栈)
- 状态机驱动的消息处理流程
### 技术栈
| 组件 | 依赖库 | 用途 |
|---------------|-----------------|--------------------------|
| 消息队列 | `pika` | RabbitMQ 客户端 |
| 对象存储 | `minio` | MinIO 客户端 |
| 配置管理 | `PyYAML` | YAML 配置文件解析 |
| 数据结构 | `dataclasses` | 类型安全的数据类定义 |
| 日志系统 | `logging` | 结构化日志记录 |
---
## 2. 快速开始
本节将指导你如何快速搭建并运行系统。
### 2.1 环境要求
- Python 3.10+
- RabbitMQ 3.8+ (需提前启动)
- MinIO Server (需提前启动)
### 2.2 安装依赖
```bash
pip install pika minio pyyaml
```
### 2.3 准备配置文件
在项目根目录创建 `config.yaml`:
```yaml
minio:
host: "localhost:9000"
access_key: "minioadmin"
secret_key: "minioadmin"
bucket_name: "exam-data"
ssl: false
Rabbitmq:
host: "localhost"
port: 5672
access_key: "guest"
secret_key: "guest"
virtual_host: "/"
shared_exchange_name: "exam-exchange"
shared_dead_exchange_name: "exam-dlx"
heartbeat: 60
LLM:
queue_name: "llm-queue"
queue_routekey: "llm-rk"
dead_queue_name: "llm-dlq"
dead_queue_routekey: "llm-dl-rk"
msg_ttl: 3600
ExamResult:
queue_name: "exam-result-queue"
queue_routekey: "exam-result-rk"
dead_queue_name: "exam-result-dlq"
dead_queue_routekey: "exam-result-dl-rk"
msg_ttl: 86400
```
### 2.4 编写主程序 `main.py`
```python
import loop
import models.rabbitmq as rabbitmq
import models.cache as cache
import models.minio_client as minio
import models.logger as logger
import config as conf
def on_message(cacher: cache.Cacher) -> loop.Result:
"""
业务回调函数:在此处编写你的核心业务逻辑
Args:
cacher: 缓存对象,包含已加载的消息、传感器数据和视频
Returns:
Result: 处理结果对象
"""
# TODO: 在此处添加你的业务逻辑
# 示例:从 cacher.get() 获取数据并处理
res = loop.Result(
analysis="test",
process_scores=[2, 2, 0, 0, 0, 0, 0],
)
return res
if __name__ == "__main__":
# 1. 初始化日志系统
logger.Init("test", level=logger.LogLevel.DEBUG)
# 2. 加载配置文件
config = conf.build("config.yaml")
# 3. 初始化 RabbitMQ 客户端 (LLM 消费队列)
llm_mq = rabbitmq.build(rabbitmq.Config(
host=config.rabbitmq_host,
port=config.rabbitmq_port,
username=config.rabbitmq_access_key,
password=config.rabbitmq_secret_key,
virtual_host=config.rabbitmq_virtual_host,
heartbeat=config.heartbeat,
exchange_name=config.shared_exchange_name,
queue_name=config.llm_queue_name,
routing_key=config.llm_queue_routekey,
dead_letter_exchange=config.shared_dead_exchange_name,
dead_letter_routing_key=config.llm_dead_queue_routekey,
dead_letter_queue=config.llm_dead_queue_name,
message_ttl=config.llm_msg_ttl
))
# 4. 初始化 RabbitMQ 客户端 (结果发送队列)
result_mq = rabbitmq.build(rabbitmq.Config(
host=config.rabbitmq_host,
port=config.rabbitmq_port,
username=config.rabbitmq_access_key,
password=config.rabbitmq_secret_key,
virtual_host=config.rabbitmq_virtual_host,
heartbeat=config.heartbeat,
exchange_name=config.shared_exchange_name,
queue_name=config.exam_result_queue_name,
routing_key=config.exam_result_queue_routekey,
dead_letter_exchange=config.shared_dead_exchange_name,
dead_letter_routing_key=config.exam_result_dead_queue_routekey,
dead_letter_queue=config.exam_result_dead_queue_name,
message_ttl=config.exam_result_msg_ttl
))
# 5. 初始化缓存管理器
cacher = cache.build()
# 6. 初始化 MinIO 客户端
minio_client = minio.build(minio.Config(
host=config.minio_host,
access_key=config.minio_access_key,
secret_key=config.minio_secret_key,
bucket_name=config.minio_bucket_name,
ssl=config.minio_ssl
))
# 7. 构建 Looper 状态机并启动消费
looper = loop.build(llm_mq, result_mq, cacher, minio_client, on_message)
looper.run()
```
### 2.5 运行系统
确保 RabbitMQ 和 MinIO 已启动,然后执行:
```bash
python main.py
```
系统将开始监听 `llm-queue` 队列。
---
## 3. 系统架构
```mermaid
graph TD
A[RabbitMQ
LLM Queue] -->|消费消息| B[Looper
状态机]
B -->|LOAD| C[Cacher
缓存管理]
C -->|获取对象| D[MinIO
对象存储]
D -->|返回数据| C
C -->|HANDLER| E[业务回调
on_message]
E -->|返回结果| F[RabbitMQ
Result Queue]
B -->|EXCEPTION| G[死信队列
Dead Letter]
style B fill:#f9f,stroke:#333
style C fill:#bbf,stroke:#333
style D fill:#bfb,stroke:#333
```
---
## 4. 模块详解
### 4.1 日志模块 (`logger.py`)
提供结构化日志记录,支持控制台输出与 JSON 文件输出,自动记录异常堆栈。
#### 核心类与函数
| 类/函数 | 说明 |
|-----------------------|----------------------------------------------------------------------|
| `LogLevel` | 日志级别枚举:`DEBUG`, `INFO`, `ERROR`, `FATAL` |
| `LogRotationConfig` | 日志轮转配置:`max_bytes`(单文件大小)、`backup_count`(备份数) |
| `JSONFormatter` | 自定义 JSON 格式化器,将日志序列化为 JSON 字符串(含异常堆栈) |
| `Logger` | 日志管理核心类,支持控制台与文件双输出 |
| `Init()` | 初始化全局日志器(必须在使用日志前调用) |
| `debug()/info()/error()/fatal()` | 模块级日志记录函数,`ERROR/FATAL` 自动记录异常堆栈 |
#### 日志格式示例
**控制台输出**:
```
2026-02-22 10:00:00 [test] [DEBUG] [loop.py:42] Looper初始化完成
```
**JSON 文件输出**:
```json
{
"asctime": "2026-02-22 10:00:00",
"name": "test",
"levelname": "ERROR",
"caller": "[loop.py:85]",
"message": "JSON 格式错误: Expecting value: line 1 column 1 (char 0)",
"exception": [
"Traceback (most recent call last):",
" File \"loop.py\", line 78, in _load",
" json_dict = json.loads(msg.decode(encoding='utf-8'))",
"json.JSONDecodeError: Expecting value: line 1 column 1 (char 0)"
]
}
```
---
### 4.2 缓存模块 (`cache.py`)
提供内存缓存管理,支持消息数据、传感器数据与视频二进制数据的存储。
#### 核心数据类
| 数据类 | 字段说明 |
|----------------|--------------------------------------------------------------------------|
| `BaseMessage` | 传感器基础消息:`sensor_type`(传感器类型)、`value`(MinIO 对象键) |
| `CacheData` | 缓存核心数据:
- 基础信息:`message_id`, `exam_id`, `user_id`, `name`
- 传感器数据:`sensor_contents`(List[BaseMessage])、`sensor_map`(Dict[str, List[Dict]])
- 视频数据:`video_binary`(Optional[bytes]) |
#### 核心类 `Cacher`
| 方法 | 说明 |
|-------------------------------|----------------------------------------------------------------------|
| `set_from_dict(json_dict)` | 从 JSON 字典初始化缓存,返回 `bool`(是否成功) |
| `set_sensor_from_raw_json(sensor_name, raw_json_str)` | 存入传感器 JSON Lines 数据到 `sensor_map` |
| `set_video(video_data)` | 存入视频二进制数据(仅允许设置一次) |
| `get()` | 获取当前缓存数据 `Optional[CacheData]` |
| `clear()` | 清空缓存 |
---
### 4.3 MinIO 客户端模块 (`minio_client.py`)
封装 MinIO 客户端操作,支持视频与 JSON 对象的存取。
#### 核心类 `MinioClient`
| 方法 | 说明 |
|-------------------------------|----------------------------------------------------------------------|
| `_init_client()` | 初始化 MinIO 客户端,自动创建 Bucket(若不存在) |
| `get(object_key)` | 获取对象,自动判断视频文件(返回 `bytes`)或 JSON 文件(返回 `str`) |
| `is_video_file(object_key)` | 判断是否为视频文件(检查 `.mp4`, `.webm` 后缀或 `video` 关键词) |
---
### 4.4 RabbitMQ 客户端模块 (`rabbitmq.py`)
封装 RabbitMQ 连接、交换机/队列声明绑定、消息消费与发布,支持死信队列。
#### 核心配置类 `Config`
| 字段分类 | 关键字段 |
|----------------|--------------------------------------------------------------------------|
| 基础连接 | `host`, `port`, `username`, `password`, `virtual_host`, `heartbeat` |
| 普通队列 | `exchange_name`, `queue_name`, `routing_key` |
| 死信队列 | `dead_letter_exchange`, `dead_letter_queue`, `dead_letter_routing_key`, `message_ttl` |
#### 核心类 `RabbitmqConnector`
| 方法 | 说明 |
|-------------------------------|----------------------------------------------------------------------|
| `connect()` | 建立 RabbitMQ 连接与通道 |
| `declareAndBindNormalQueue()` | 声明并绑定普通交换机/队列 + 死信交换机/队列 |
| `set_on_message(callback)` | 设置消息处理回调 `OnMessageCallback`(签名:`bytes -> bool`) |
| `consumer()` | 启动消费者循环,自动 ACK 成功消息 |
| `publish(exchange_name, routing_key, msg)` | 发布消息(默认持久化) |
---
### 4.5 状态机处理模块 (`loop.py`)
核心消息处理引擎,通过状态机驱动单条消息的完整处理流程。
#### 状态机设计
```mermaid
stateDiagram-v2
[*] --> LOAD: 每条消息重置状态
LOAD --> HANDLER: 加载成功
LOAD --> EXCEPTION: 加载失败
HANDLER --> [*]: 业务成功 (返回True/ACK)
HANDLER --> EXCEPTION: 业务失败
EXCEPTION --> [*]: 处理失败 (返回False/不ACK)
```
| 状态 | 说明 |
|---------------|----------------------------------------------------------------------|
| `LOAD` | 加载消息:解码 JSON → 初始化缓存 → 从 MinIO 拉取传感器/视频数据 |
| `HANDLER` | 业务处理:调用 `on_message` 回调 → 填充结果字段 → 发送结果队列 |
| `EXCEPTION` | 异常终态:记录日志 → 返回失败(不 ACK) |
#### 核心数据类 `Result`
处理结果数据类,支持序列化为 JSON 字节流:
```python
@dataclass
class Result:
process_scores: list[int] = None # 业务处理分数
analysis: str = "" # 分析结果
message_id: str = "" # 消息ID(自动填充)
exam_id: str = "" # 考试ID(自动填充)
# ... 更多自动填充字段:user_id, name, room_id 等
raw_file_id: str = "" # 原始文件ID(自动填充)
def to_json_bytes(self) -> bytes: # 序列化为 JSON 字节流
```
#### 核心类 `Looper`
| 方法 | 说明 |
|-------------------------------|----------------------------------------------------------------------|
| `_load(msg)` | 状态 `LOAD` 逻辑:加载消息与 MinIO 数据,返回 `bool` |
| `_message_cb()` | 状态 `HANDLER` 逻辑:调用业务回调,填充结果,返回 `Optional[Result]` |
| `_run(msg)` | 状态机核心:单条消息执行一次,返回 `bool`(True=ACK, False=不ACK) |
| `run()` | 启动消费者循环(持续监听队列) |
---
### 4.6 配置模块 (`config.py`)
解析 YAML 配置文件,统一管理 MinIO、RabbitMQ 与业务队列配置。
---
## 5. 状态机流转示例
### 成功流程
1. **LOAD**:接收消息 → 解码 JSON → 初始化缓存 → 从 MinIO 拉取 `sensor_contents` 中的视频与传感器数据
2. **HANDLER**:调用 `on_message(cacher)` → 填充 `Result` 字段 → 发送到结果队列 → 返回 `True`(ACK)
3. **退出**:状态机结束,等待下一条消息
### 失败流程
1. **LOAD**:接收消息 → JSON 解码失败 → 跳转 `EXCEPTION`
2. **EXCEPTION**:记录错误日志(含堆栈)→ 返回 `False`(不 ACK)
3. **退出**:消息进入死信队列(根据 RabbitMQ 配置)
---
## 6. 扩展开发
### 自定义业务回调
实现 `OnMessage` 类型的回调函数(即 `main.py` 中的 `on_message` 函数):
```python
def on_message(cacher: cache.Cacher) -> loop.Result:
# 1. 获取缓存数据
cache_data = cacher.get()
print(f"处理消息 ID: {cache_data.message_id}")
# 2. 获取传感器数据 (List[Dict])
if "some_sensor" in cache_data.sensor_map:
sensor_data = cache_data.sensor_map["some_sensor"]
# 处理传感器数据...
# 3. 执行业务逻辑
# ...
# 4. 返回结果
return loop.Result(
analysis="自定义分析结果",
process_scores=[1, 2, 3, 4, 5]
)
```
---
## 7. 项目结构
```
.
├── main.py # 系统入口
├── config.py # 配置解析
├── config.yaml # 配置文件
├── models/
│ ├── logger.py # 日志模块
│ ├── cache.py # 缓存模块
│ ├── minio_client.py # MinIO 客户端
│ ├── rabbitmq.py # RabbitMQ 客户端
│ └── loop.py # 状态机处理模块
└── requirements.txt # 依赖列表
```