# LLM-Framework-New **Repository Path**: mstes/llm-framework-new ## Basic Information - **Project Name**: LLM-Framework-New - **Description**: LLM Message Processor 是基于 RabbitMQ、MinIO 和状态机的异步消息处理系统,支持消息消费 / 发布、对象存取、内存缓存与结构化日志。核心通过单条消息一次状态机流转(LOAD→HANDLER/EXCEPTION)实现可靠处理 - **Primary Language**: Python - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-02-22 - **Last Updated**: 2026-03-02 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # LLM Message Processor 系统文档 ## 1. 项目概述 LLM Message Processor 是一个基于 **RabbitMQ** 的异步消息处理系统,集成了 **MinIO 对象存储**、**本地内存缓存**和**结构化日志**,通过状态机模式实现消息的可靠处理与流转。 ### 核心功能 - RabbitMQ 消息消费与发布(支持死信队列) - MinIO 二进制/JSON 对象存取 - 内存缓存管理(支持视频与传感器数据) - 结构化日志(控制台+JSON文件,含异常堆栈) - 状态机驱动的消息处理流程 ### 技术栈 | 组件 | 依赖库 | 用途 | |---------------|-----------------|--------------------------| | 消息队列 | `pika` | RabbitMQ 客户端 | | 对象存储 | `minio` | MinIO 客户端 | | 配置管理 | `PyYAML` | YAML 配置文件解析 | | 数据结构 | `dataclasses` | 类型安全的数据类定义 | | 日志系统 | `logging` | 结构化日志记录 | --- ## 2. 快速开始 本节将指导你如何快速搭建并运行系统。 ### 2.1 环境要求 - Python 3.10+ - RabbitMQ 3.8+ (需提前启动) - MinIO Server (需提前启动) ### 2.2 安装依赖 ```bash pip install pika minio pyyaml ``` ### 2.3 准备配置文件 在项目根目录创建 `config.yaml`: ```yaml minio: host: "localhost:9000" access_key: "minioadmin" secret_key: "minioadmin" bucket_name: "exam-data" ssl: false Rabbitmq: host: "localhost" port: 5672 access_key: "guest" secret_key: "guest" virtual_host: "/" shared_exchange_name: "exam-exchange" shared_dead_exchange_name: "exam-dlx" heartbeat: 60 LLM: queue_name: "llm-queue" queue_routekey: "llm-rk" dead_queue_name: "llm-dlq" dead_queue_routekey: "llm-dl-rk" msg_ttl: 3600 ExamResult: queue_name: "exam-result-queue" queue_routekey: "exam-result-rk" dead_queue_name: "exam-result-dlq" dead_queue_routekey: "exam-result-dl-rk" msg_ttl: 86400 ``` ### 2.4 编写主程序 `main.py` ```python import loop import models.rabbitmq as rabbitmq import models.cache as cache import models.minio_client as minio import models.logger as logger import config as conf def on_message(cacher: cache.Cacher) -> loop.Result: """ 业务回调函数:在此处编写你的核心业务逻辑 Args: cacher: 缓存对象,包含已加载的消息、传感器数据和视频 Returns: Result: 处理结果对象 """ # TODO: 在此处添加你的业务逻辑 # 示例:从 cacher.get() 获取数据并处理 res = loop.Result( analysis="test", process_scores=[2, 2, 0, 0, 0, 0, 0], ) return res if __name__ == "__main__": # 1. 初始化日志系统 logger.Init("test", level=logger.LogLevel.DEBUG) # 2. 加载配置文件 config = conf.build("config.yaml") # 3. 初始化 RabbitMQ 客户端 (LLM 消费队列) llm_mq = rabbitmq.build(rabbitmq.Config( host=config.rabbitmq_host, port=config.rabbitmq_port, username=config.rabbitmq_access_key, password=config.rabbitmq_secret_key, virtual_host=config.rabbitmq_virtual_host, heartbeat=config.heartbeat, exchange_name=config.shared_exchange_name, queue_name=config.llm_queue_name, routing_key=config.llm_queue_routekey, dead_letter_exchange=config.shared_dead_exchange_name, dead_letter_routing_key=config.llm_dead_queue_routekey, dead_letter_queue=config.llm_dead_queue_name, message_ttl=config.llm_msg_ttl )) # 4. 初始化 RabbitMQ 客户端 (结果发送队列) result_mq = rabbitmq.build(rabbitmq.Config( host=config.rabbitmq_host, port=config.rabbitmq_port, username=config.rabbitmq_access_key, password=config.rabbitmq_secret_key, virtual_host=config.rabbitmq_virtual_host, heartbeat=config.heartbeat, exchange_name=config.shared_exchange_name, queue_name=config.exam_result_queue_name, routing_key=config.exam_result_queue_routekey, dead_letter_exchange=config.shared_dead_exchange_name, dead_letter_routing_key=config.exam_result_dead_queue_routekey, dead_letter_queue=config.exam_result_dead_queue_name, message_ttl=config.exam_result_msg_ttl )) # 5. 初始化缓存管理器 cacher = cache.build() # 6. 初始化 MinIO 客户端 minio_client = minio.build(minio.Config( host=config.minio_host, access_key=config.minio_access_key, secret_key=config.minio_secret_key, bucket_name=config.minio_bucket_name, ssl=config.minio_ssl )) # 7. 构建 Looper 状态机并启动消费 looper = loop.build(llm_mq, result_mq, cacher, minio_client, on_message) looper.run() ``` ### 2.5 运行系统 确保 RabbitMQ 和 MinIO 已启动,然后执行: ```bash python main.py ``` 系统将开始监听 `llm-queue` 队列。 --- ## 3. 系统架构 ```mermaid graph TD A[RabbitMQ
LLM Queue] -->|消费消息| B[Looper
状态机] B -->|LOAD| C[Cacher
缓存管理] C -->|获取对象| D[MinIO
对象存储] D -->|返回数据| C C -->|HANDLER| E[业务回调
on_message] E -->|返回结果| F[RabbitMQ
Result Queue] B -->|EXCEPTION| G[死信队列
Dead Letter] style B fill:#f9f,stroke:#333 style C fill:#bbf,stroke:#333 style D fill:#bfb,stroke:#333 ``` --- ## 4. 模块详解 ### 4.1 日志模块 (`logger.py`) 提供结构化日志记录,支持控制台输出与 JSON 文件输出,自动记录异常堆栈。 #### 核心类与函数 | 类/函数 | 说明 | |-----------------------|----------------------------------------------------------------------| | `LogLevel` | 日志级别枚举:`DEBUG`, `INFO`, `ERROR`, `FATAL` | | `LogRotationConfig` | 日志轮转配置:`max_bytes`(单文件大小)、`backup_count`(备份数) | | `JSONFormatter` | 自定义 JSON 格式化器,将日志序列化为 JSON 字符串(含异常堆栈) | | `Logger` | 日志管理核心类,支持控制台与文件双输出 | | `Init()` | 初始化全局日志器(必须在使用日志前调用) | | `debug()/info()/error()/fatal()` | 模块级日志记录函数,`ERROR/FATAL` 自动记录异常堆栈 | #### 日志格式示例 **控制台输出**: ``` 2026-02-22 10:00:00 [test] [DEBUG] [loop.py:42] Looper初始化完成 ``` **JSON 文件输出**: ```json { "asctime": "2026-02-22 10:00:00", "name": "test", "levelname": "ERROR", "caller": "[loop.py:85]", "message": "JSON 格式错误: Expecting value: line 1 column 1 (char 0)", "exception": [ "Traceback (most recent call last):", " File \"loop.py\", line 78, in _load", " json_dict = json.loads(msg.decode(encoding='utf-8'))", "json.JSONDecodeError: Expecting value: line 1 column 1 (char 0)" ] } ``` --- ### 4.2 缓存模块 (`cache.py`) 提供内存缓存管理,支持消息数据、传感器数据与视频二进制数据的存储。 #### 核心数据类 | 数据类 | 字段说明 | |----------------|--------------------------------------------------------------------------| | `BaseMessage` | 传感器基础消息:`sensor_type`(传感器类型)、`value`(MinIO 对象键) | | `CacheData` | 缓存核心数据:
- 基础信息:`message_id`, `exam_id`, `user_id`, `name`
- 传感器数据:`sensor_contents`(List[BaseMessage])、`sensor_map`(Dict[str, List[Dict]])
- 视频数据:`video_binary`(Optional[bytes]) | #### 核心类 `Cacher` | 方法 | 说明 | |-------------------------------|----------------------------------------------------------------------| | `set_from_dict(json_dict)` | 从 JSON 字典初始化缓存,返回 `bool`(是否成功) | | `set_sensor_from_raw_json(sensor_name, raw_json_str)` | 存入传感器 JSON Lines 数据到 `sensor_map` | | `set_video(video_data)` | 存入视频二进制数据(仅允许设置一次) | | `get()` | 获取当前缓存数据 `Optional[CacheData]` | | `clear()` | 清空缓存 | --- ### 4.3 MinIO 客户端模块 (`minio_client.py`) 封装 MinIO 客户端操作,支持视频与 JSON 对象的存取。 #### 核心类 `MinioClient` | 方法 | 说明 | |-------------------------------|----------------------------------------------------------------------| | `_init_client()` | 初始化 MinIO 客户端,自动创建 Bucket(若不存在) | | `get(object_key)` | 获取对象,自动判断视频文件(返回 `bytes`)或 JSON 文件(返回 `str`) | | `is_video_file(object_key)` | 判断是否为视频文件(检查 `.mp4`, `.webm` 后缀或 `video` 关键词) | --- ### 4.4 RabbitMQ 客户端模块 (`rabbitmq.py`) 封装 RabbitMQ 连接、交换机/队列声明绑定、消息消费与发布,支持死信队列。 #### 核心配置类 `Config` | 字段分类 | 关键字段 | |----------------|--------------------------------------------------------------------------| | 基础连接 | `host`, `port`, `username`, `password`, `virtual_host`, `heartbeat` | | 普通队列 | `exchange_name`, `queue_name`, `routing_key` | | 死信队列 | `dead_letter_exchange`, `dead_letter_queue`, `dead_letter_routing_key`, `message_ttl` | #### 核心类 `RabbitmqConnector` | 方法 | 说明 | |-------------------------------|----------------------------------------------------------------------| | `connect()` | 建立 RabbitMQ 连接与通道 | | `declareAndBindNormalQueue()` | 声明并绑定普通交换机/队列 + 死信交换机/队列 | | `set_on_message(callback)` | 设置消息处理回调 `OnMessageCallback`(签名:`bytes -> bool`) | | `consumer()` | 启动消费者循环,自动 ACK 成功消息 | | `publish(exchange_name, routing_key, msg)` | 发布消息(默认持久化) | --- ### 4.5 状态机处理模块 (`loop.py`) 核心消息处理引擎,通过状态机驱动单条消息的完整处理流程。 #### 状态机设计 ```mermaid stateDiagram-v2 [*] --> LOAD: 每条消息重置状态 LOAD --> HANDLER: 加载成功 LOAD --> EXCEPTION: 加载失败 HANDLER --> [*]: 业务成功 (返回True/ACK) HANDLER --> EXCEPTION: 业务失败 EXCEPTION --> [*]: 处理失败 (返回False/不ACK) ``` | 状态 | 说明 | |---------------|----------------------------------------------------------------------| | `LOAD` | 加载消息:解码 JSON → 初始化缓存 → 从 MinIO 拉取传感器/视频数据 | | `HANDLER` | 业务处理:调用 `on_message` 回调 → 填充结果字段 → 发送结果队列 | | `EXCEPTION` | 异常终态:记录日志 → 返回失败(不 ACK) | #### 核心数据类 `Result` 处理结果数据类,支持序列化为 JSON 字节流: ```python @dataclass class Result: process_scores: list[int] = None # 业务处理分数 analysis: str = "" # 分析结果 message_id: str = "" # 消息ID(自动填充) exam_id: str = "" # 考试ID(自动填充) # ... 更多自动填充字段:user_id, name, room_id 等 raw_file_id: str = "" # 原始文件ID(自动填充) def to_json_bytes(self) -> bytes: # 序列化为 JSON 字节流 ``` #### 核心类 `Looper` | 方法 | 说明 | |-------------------------------|----------------------------------------------------------------------| | `_load(msg)` | 状态 `LOAD` 逻辑:加载消息与 MinIO 数据,返回 `bool` | | `_message_cb()` | 状态 `HANDLER` 逻辑:调用业务回调,填充结果,返回 `Optional[Result]` | | `_run(msg)` | 状态机核心:单条消息执行一次,返回 `bool`(True=ACK, False=不ACK) | | `run()` | 启动消费者循环(持续监听队列) | --- ### 4.6 配置模块 (`config.py`) 解析 YAML 配置文件,统一管理 MinIO、RabbitMQ 与业务队列配置。 --- ## 5. 状态机流转示例 ### 成功流程 1. **LOAD**:接收消息 → 解码 JSON → 初始化缓存 → 从 MinIO 拉取 `sensor_contents` 中的视频与传感器数据 2. **HANDLER**:调用 `on_message(cacher)` → 填充 `Result` 字段 → 发送到结果队列 → 返回 `True`(ACK) 3. **退出**:状态机结束,等待下一条消息 ### 失败流程 1. **LOAD**:接收消息 → JSON 解码失败 → 跳转 `EXCEPTION` 2. **EXCEPTION**:记录错误日志(含堆栈)→ 返回 `False`(不 ACK) 3. **退出**:消息进入死信队列(根据 RabbitMQ 配置) --- ## 6. 扩展开发 ### 自定义业务回调 实现 `OnMessage` 类型的回调函数(即 `main.py` 中的 `on_message` 函数): ```python def on_message(cacher: cache.Cacher) -> loop.Result: # 1. 获取缓存数据 cache_data = cacher.get() print(f"处理消息 ID: {cache_data.message_id}") # 2. 获取传感器数据 (List[Dict]) if "some_sensor" in cache_data.sensor_map: sensor_data = cache_data.sensor_map["some_sensor"] # 处理传感器数据... # 3. 执行业务逻辑 # ... # 4. 返回结果 return loop.Result( analysis="自定义分析结果", process_scores=[1, 2, 3, 4, 5] ) ``` --- ## 7. 项目结构 ``` . ├── main.py # 系统入口 ├── config.py # 配置解析 ├── config.yaml # 配置文件 ├── models/ │ ├── logger.py # 日志模块 │ ├── cache.py # 缓存模块 │ ├── minio_client.py # MinIO 客户端 │ ├── rabbitmq.py # RabbitMQ 客户端 │ └── loop.py # 状态机处理模块 └── requirements.txt # 依赖列表 ```