# ai-service **Repository Path**: fatcatkk/ai-service ## Basic Information - **Project Name**: ai-service - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-03-15 - **Last Updated**: 2026-03-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # AI Video Analysis Service 基于 FastAPI + LangGraph 构建的实时视频流分析服务,用于工业场景下的合规性监控。通过 YOLO 目标检测与视觉语言模型(VLM)的协同驱动多步骤工作流,实现复杂的合规性检测与告警。 --- ## 目录 - [功能特性](#功能特性) - [系统架构](#系统架构) - [快速开始](#快速开始) - [项目结构](#项目结构) - [工作流引擎](#工作流引擎) - [API 文档](#api-文档) - [算法包](#算法包) - [配置说明](#配置说明) - [外部依赖](#外部依赖) --- ## 功能特性 - **多步骤工作流编排**:基于 LangGraph 的状态机,支持顺序步骤、条件跳转、超时重试 - **混合检测策略**:YOLO 目标检测 + 视觉语言模型语义分析 - **视频流复用**:单路视频源支持多任务并发订阅,避免重复解码 - **WebSocket 实时推送**:逐帧检测结果、步骤进度、告警通知 - **可插拔算法包**:动态加载 ONNX 模型,支持两阶段检测流水线 - **工作流持久化**:工作流定义以 JSON 文件存储,支持热更新 --- ## 系统架构 ``` 视频流输入 │ ▼ StreamProcessor (帧解码线程) │ ▼ FrameDistributor (发布-订阅) │ │ ▼ ▼ LangGraphTask LangGraphTask ...(多任务并发) │ ▼ ThreadPoolExecutor (4 推理 Worker) │ ▼ CompiledGraph.invoke() ├── entry_node → 初始化步骤状态 ├── step_router_node → 加载当前步骤配置 ├── detect_node → YOLO 检测 ├── analyze_node → VLM 语义分析(条件执行) ├── rule_evaluator → 规则评估 └── transition_node → 决定下一步骤 │ ▼ WebSocket 推送 → 客户端 ``` --- ## 快速开始 ### 1. 安装依赖 ```bash pip install -r requirements.txt ``` ### 2. 配置环境变量 复制 `.env` 并按需修改: ```bash cp .env .env.local ``` 关键配置项: ```env # vLLM 推理服务地址 LLM_API_URL=http://localhost:6006/v1 LLM_MODEL_NAME=Qwen3-VL-8B-Instruct-4bit-GPTQ # RabbitMQ 告警通道 RABBITMQ_HOST=localhost RABBITMQ_PORT=5672 # 推理并发数 INFERENCE_POOL_SIZE=4 ``` ### 3. 启动服务 ```bash python run.py ``` 服务默认监听 `http://0.0.0.0:8000`。 ### 4. 启动任务示例 ```bash # 使用已存储的工作流 curl -X POST http://localhost:8000/api/ai/start \ -H "Content-Type: application/json" \ -d '{"taskId": "task-001", "workflowId": "gesture_test"}' # 停止任务 curl -X POST http://localhost:8000/api/ai/stop \ -H "Content-Type: application/json" \ -d '{"taskId": "task-001"}' ``` --- ## 项目结构 ``` ai-service/ ├── app/ │ ├── main.py # FastAPI 入口,路由注册,应用初始化 │ ├── websocket.py # WebSocket 连接管理 │ │ │ ├── core/ # 核心服务 │ │ ├── task_manager.py # 任务生命周期管理(LangGraph V2) │ │ ├── llm_service.py # VLM 服务封装(LangChain + Qwen VL) │ │ ├── model_manager.py # ONNX 模型加载与缓存 │ │ ├── stream_processor.py # 视频流解码线程 │ │ ├── frame_distributor.py # 帧分发(发布-订阅) │ │ ├── alert_service.py # 告警与 RabbitMQ 集成 │ │ ├── workflow_file_store.py # 工作流 JSON 持久化 │ │ ├── workflow_config_service.py # 工作流配置缓存 │ │ └── task_status_service.py # 任务状态心跳追踪 │ │ │ ├── langgraph_engine/ # LangGraph 状态机 │ │ ├── state.py # VideoAnalysisState TypedDict 定义 │ │ ├── graph_builder.py # 图编译(简单图 / 多步骤增强图) │ │ ├── nodes.py # 处理节点:检测、分析、规则评估、跳转 │ │ └── edges.py # 条件边:是否调用 LLM、是否触发告警 │ │ │ ├── models/ │ │ └── workflow.py # Pydantic 数据模型:工作流、步骤、规则 │ │ │ ├── routes/ │ │ └── workflow_routes.py # 工作流 CRUD API │ │ │ ├── services/ │ │ └── algorithm_package_service.py # 算法包动态加载 │ │ │ └── algorithm_packages/ # 可插拔 ONNX 算法包 │ ├── first/ # 两阶段联合检测 │ ├── combined-detection/ # 联合检测变体 │ ├── compliance/ # 车辆/人员合规检测 │ ├── loading-unloading-compliance/ # 装卸作业合规 │ ├── vehicle_person_detector/ # 车辆+人员检测 │ └── yolo_person_detection/ # YOLO 人员检测 │ ├── workflows/ # 工作流定义(JSON) │ └── gesture_test.json │ ├── tests/ # 单元测试 ├── requirements.txt ├── run.py # 启动脚本 └── .env # 环境配置 ``` --- ## 工作流引擎 ### 工作流定义 工作流以 JSON 格式定义,存储于 `workflows/` 目录,支持通过 API 管理。 ```json { "workflow_id": "gesture_test", "name": "手势动作检测流程", "entry_step": "person_appear", "exit_steps": ["end"], "steps": [ { "step_id": "person_appear", "name": "等待人员出现", "detection_rules": [ { "type": "yolo", "model": "vehicle_person_detector", "objects": ["person"], "confidence_threshold": 0.6 } ], "duration_limit": 60.0, "transition": { "on_success": "open_palm", "on_timeout": "end" } }, { "step_id": "open_palm", "name": "张开手掌确认", "detection_rules": [ { "type": "vlm", "prompt": "请判断画面中的人是否张开了手掌,返回 JSON:{\"detected\": true/false}", "confidence_threshold": 0.7 } ], "duration_limit": 30.0, "transition": { "on_success": "end", "on_timeout": "end" } } ], "global_config": { "frame_interval": 1.0, "alert_channels": ["websocket", "log"] } } ``` ### 支持的检测规则类型 | 类型 | 说明 | |------|------| | `yolo` | ONNX 目标检测,支持类别过滤、置信度阈值、ROI 区域 | | `vlm` | 视觉语言模型语义分析,Prompt 自定义,解析 JSON 输出 | | `timer` | 基于时间的触发规则 | | `rule_engine` | 条件表达式规则 | | `sensor` | 外部传感器 API 集成 | ### 图执行流程 ``` entry → step_router → detect → [analyze?] → rule_evaluator → transition → step_router / END │ alert (可选) ``` --- ## API 文档 ### 任务管理 | 方法 | 路径 | 说明 | |------|------|------| | POST | `/api/ai/start` | 启动分析任务 | | POST | `/api/ai/stop` | 停止分析任务 | | GET | `/api/tasks/active` | 获取活跃任务列表 | **启动任务请求体:** ```json { "taskId": "task-001", "workflowId": "gesture_test", "videoSource": "rtsp://192.168.1.100/stream" } ``` ### 工作流管理 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/workflows` | 获取所有工作流 | | GET | `/api/workflows/{id}` | 获取工作流详情 | | POST | `/api/workflows` | 创建工作流 | | PUT | `/api/workflows/{id}` | 更新工作流 | | DELETE | `/api/workflows/{id}` | 删除工作流 | ### 算法包管理 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/algorithms/list` | 列出所有算法包 | | GET | `/algorithms/info/{name}` | 获取算法包信息 | | POST | `/algorithms/upload` | 上传算法包(.zip) | | POST | `/algorithms/delete` | 删除算法包 | ### 测试接口 | 方法 | 路径 | 说明 | |------|------|------| | POST | `/test/algorithm-package` | 单图测试算法包 | | POST | `/test/task-with-video` | 视频文件测试工作流 | ### WebSocket ``` ws://localhost:8000/ws/tasks/{task_id} ``` **消息类型:** | 类型 | 说明 | |------|------| | `status_update` | 当前步骤及进度 | | `step_completed` | 步骤完成(含耗时和结果) | | `detection_result` | 单帧检测对象列表 | | `alert` | 告警通知 | | `task_completed` | 工作流一轮执行完成 | | `error` | 错误通知 | --- ## 算法包 算法包存放于 `app/algorithm_packages/`,每个包为独立目录,包含: ``` my_algorithm/ ├── package.json # 元数据与模型配置 ├── inference_onnx.py # 推理实现(必须包含算法类) ├── algorithm_logic.py # 算法逻辑 └── models/ └── detector/ ├── model.onnx ├── classes.txt ├── preprocess.py └── postprocess.py ``` **package.json 格式:** ```json { "name": "my_algorithm", "version": "1.0.0", "type": "object_detection", "framework": "onnx", "models": { "detector": { "model": "models/detector/model.onnx" } } } ``` --- ## 配置说明 所有配置通过 `.env` 文件或环境变量注入: | 变量 | 默认值 | 说明 | |------|--------|------| | `HOST` | `0.0.0.0` | 监听地址 | | `PORT` | `8000` | 监听端口 | | `LLM_API_URL` | `http://localhost:6006/v1` | vLLM 服务地址 | | `LLM_MODEL_NAME` | `Qwen3-VL-8B-Instruct-4bit-GPTQ` | VLM 模型名 | | `LLM_TIMEOUT` | `60` | LLM 请求超时(秒) | | `RABBITMQ_HOST` | `localhost` | RabbitMQ 主机 | | `RABBITMQ_PORT` | `5672` | RabbitMQ 端口 | | `INFERENCE_POOL_SIZE` | `4` | 推理线程池大小 | | `FRAME_BUFFER_SIZE` | `100` | 帧缓冲区大小 | | `LOG_LEVEL` | `INFO` | 日志级别 | | `ENABLE_METRICS` | `true` | 启用 Prometheus 指标 | | `METRICS_PORT` | `9090` | 指标监听端口 | --- ## 外部依赖 | 服务 | 用途 | 默认地址 | |------|------|----------| | vLLM | Qwen VL 视觉语言模型推理 | `http://localhost:6006` | | RabbitMQ | 告警消息队列 | `localhost:5672` | | 视频流源 | RTSP / 本地视频文件 | 任务配置中指定 | **Python 主要依赖:** - `fastapi` / `uvicorn` — Web 框架 - `langgraph` / `langchain` — 工作流编排与 LLM 集成 - `onnxruntime` — ONNX 模型推理 - `opencv-python` — 视频流处理 - `pika` — RabbitMQ 客户端 - `pydantic` — 数据验证