# LabInfer
**Repository Path**: zwb1998/LabInfer
## Basic Information
- **Project Name**: LabInfer
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: MIT
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-05-06
- **Last Updated**: 2025-06-23
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# LABInfer
[roadmap](https://github.com/users/Zhuwenbopro/projects/1)
目标是创建异构、分布式大语言模型推理框架。尽量使用最直白的实现方式实现各个模块,并提供设计实现思路,方便各位实验室的同学在此基础上修改代码,融入自己的 idea。
```
# 安装 OpenBLAS
sudo apt-get install libopenblas-dev
sudo apt-get install libopencv-dev
```
# 任务描述(Prompt)
我在设计大语言模型的推理引擎,C++ 实现,支持多节点多卡并行计算,节点内 TP + PP,节点间只 PP。
* Server 使用 engine.add_request() 添加请求,随即 Engine 内部进行运算,每一轮的 step 存到某个队列内,由 Server 自行取用。
* Engine 利用 Scheduler 将 request 组成 request_batch 送给 Worker。(Scheduler 负责决策,Engine 负责根据决策准备 ModelInputBatch(包含 tokens, Block Tables 等)并分发给 Worker(s)。)
* Worker 利用 Communicator 和 Layer 进行节点内的 TP 计算(Worker 执行其拥有的 Layer,而这些 Layer 如果是张量并行的,会使用注入的 IntraNodeTensorParallelCommunicator 来进行 TP 组内部的通信(如 AllReduce, AllGather)。)
* Worker 完成其节点内的 TP 计算后,将其(作为流水线一个阶段的)输出结果报告/返回给其所在节点的 Engine
* 不同节点上的 Engine 实例之间。一个节点的 Engine (我们称之为 Engine_A,负责阶段 S) 在收到其内部 Worker 的结果后,会使用 InterNodePipelineCommunicator 将这个结果 Send 给负责下一个阶段 S+1 的另一个节点上的 Engine (我们称之为 Engine_B)。同样,Engine_B 会使用这个 Communicator 来 Recv 来自 Engine_A 的数据。
## 整体框架
# LABInfer
* Engine 管理一个 Worker 线程池,并告诉线程池里的 Worker 它的 rank。由线程池内的 Worker 自己控制自己需要的参数、设备上下文等。
* Engine 持有 `std::vector> worker_threads_`。
* 持有任务队列(供外部请求或上游 PP stage 放入)
* 持有结果队列(供下游 PP stage 或最终 Server 拉取)
* WorkerThread 有自己的持久计算线程、输入任务队列(Engine 向其推送)、自己的 CUDA 环境、TP 通信器等,通过回调或共享队列将结果/状态返回给 Engine。
### 任务分发 (Task Dispatching)
* 任务来源:Engine 的 Scheduler(首个 PP Stage Engine)、上游 Engine(中间/后续 PP Stage Engine)。
* Engine 将 ModelInputBatch 的元数据放到共享内存中,
### 结果聚合 (Result Aggregation)
PP(Pipeline Parallelism)之间的通信发生在 Engine 层。内部的 WorkerThread 通过回调或共享队列把结果(WorkerOutput)放给 Engine。Engine 需要根据结果的状态来判断是将结果发给下一个 Stage 还是回传给 Server。
### 错误处理 (Error Handling)
* WorkerOutput 中应包含 status 字段 (WorkerStatus::FAILED)、error_code (枚举类型) 和 error_message (字符串)。
* WorkerThread 在捕获到异常或检测到错误后,应构造这样的 WorkerOutput 并通过回调或共享队列返回给 Engine。
* Engine::on_worker_result()
```
if (output.status == WorkerStatus::FAILED) {
handle_failed_batch(output.bid, output.req_ids, output.error_message, output.error_code);
// 可能还需要标记该 WorkerThread 状态为 ERROR 或需要重启/检查
// mark_worker_as_failed(worker_rank);
}
```
# 接口定义
```
// common.h ----------------------------------------
using RequestId = uint64_t;
using BatchId = uint64_t;
using StageId = uint32_t; // PP stage idx
using TPGroupId = uint32_t;
enum class Precision { FP16, BF16, INT8 };
enum class DeviceType { GPU, NPU, CPU };
enum class EngineErr { OK, TIMEOUT, OOM, INTERNAL };
// INTERNAL: 内部错误,表示引擎内部发生了一些未预料到的问题。
// -------- Model inputs / outputs ---------------
//
struct TokenBlockTable {
std::vector tokens; // flat - row-major 所有序列的token被连续存储
std::vector row_ptr; // CSR-style
};
// 输入给模型的一个批次数据
struct ModelInputBatch {
BatchId bid;
std::vector req_ids;
TokenBlockTable tbl;
};
// 模型对一个输入批次产生的输出。
struct ModelOutputBatch {
BatchId bid;
std::vector req_ids;
std::vector logits; // flattened [B, V]
};
// -------- TP / PP payload ----------------------
// 张量的切片
struct TensorSlices {
BatchId bid;
StageId stage;
/* contiguous slices in column-parallel layout */
std::vector data;
};
// -----------------------------------------------
```
## 接口表
| 组件 | 主要职责 | 关键方法(同步/异步) | 说明 |
| ---------------- | ----------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------- |
| **Engine** | 全局协调;PP stage 控制;结果队列 | `add_request(const Request&)` 异步
`poll_result(EngineResult&) -> bool` 非阻塞
`on_worker_done(const ModelOutputBatch&)`
`on_stage_recv(const TensorSlices&)` | 封装 FSM/Actor;内部线程安全 |
| **Scheduler** | 批处理策略(prefill + decode 拆分、KV 预算、padding 控制) | `schedule(const Request&) -> void`
`try_make_batch() -> std::optional` | 仅暴露给同节点 Engine |
| **Worker** | 负责一个或多层;节点内 TP | `launch(const ModelInputBatch&)` 异步
`progress() -> WorkerStatus` 轮询
`stop()` | Worker 内部持有 `IntraNodeTPCommunicator*` |
| **Communicator** | **Intra-Node TP**:AllReduce/AllGather
**Inter-Node PP**:Send/Recv 激活 | `all_reduce(TensorSlices&)`
`all_gather(TensorSlices&)`
`send(StageId, const TensorSlices&)`
`recv(StageId, TensorSlices&)` | 为方便测试,可提供 Mock |
## 时序图
### 单节点内部:Server → Engine → Scheduler → Worker
```mermaid
sequenceDiagram
participant S as Server
participant E as Engine (Stage S)
participant Sch as Scheduler
participant W as Worker[Layer × k]
S ->> E: add_request(req)
loop (event loop)
E ->> Sch: enqueue(req)
Sch ->> Sch: try_make_batch()
Sch -->> E: ModelInputBatch (bid)
E ->> W: launch(batch)
W ->> W: TP compute + IntraNode AllReduce
W -->> E: ModelOutputBatch (bid)
E ->> S: push_result()
end
```
### 跨节点流水线:Stage S → Stage S+1
```mermaid
sequenceDiagram
participant E_A as Engine_A (Stage S)
participant Cpp as InterNodePipelineComm
participant E_B as Engine_B (Stage S+1)
note right of E_A: 收到 Worker 结果
E_A ->> Cpp: send(stage=S, slices)
Cpp -->> E_B: recv(stage=S, slices)
E_B ->> E_B: schedule\n→ launch to Worker
```
### Engine / Worker 状态机(简版)
```mermaid
stateDiagram-v2
[*] --> Idle
Idle --> Scheduling : batch_ready
Scheduling --> WaitingWorker : dispatch_ok
WaitingWorker --> Idle : worker_done
WaitingWorker --> Error : timeout / oom
Error --> Idle : rollback_done
```
## Back-pressure & 异常流约定
| 场景 | 触发方 | 处理策略 |
| -------------------------- | ------------ | --------------------------------------------------------------------------------------------------------------- |
| **Engine → Scheduler 队列满** | Scheduler | `schedule()` 返回 `false`,Engine 将请求放入“溢出 list”并通过 `Server.reply_queue_full(req_id)` 反馈客户端可重试或等待 |
| **Worker OOM / GPU Reset** | Worker | 1. 触发 `WorkerError{OOM, id}` 事件
2. Engine 标记该 batch 失败,写入统一 `ErrorQueue`
3. Supervisor 可选择重启 Worker 或降级精度 |
| **Inter-Node Send 超时** | Communicator | Engine 进入 `Error` 状态 → 上报监控 → 自动重连或切换备用链路 |
# 详细设计
## Communicator
## KVCacheManager
## Scheduler
* Scheduler 层可以基于 Token-level Cost/资源消耗动态调整 batch,大幅提升 decode 阶段利用率。
# 接下来的方向
1. 具体的 Communicator 实现细节:
* 如何基于 NCCL 或 MPI 实现 IntraNodeTensorParallelCommunicator 和 InterNodePipelineCommunicator?
* 通信组 (Communicator Groups) 的创建和管理。
* 异步通信操作与 CUDA Stream 的结合。
* 错误处理和容错。
2. KVCacheManager 和 PagedAttention 的具体实现:
* 物理块的分配/释放策略 (e.g., best-fit, first-fit)。
* Block Table 的数据结构和在 GPU 上的管理。
* 如何处理 KV Cache 的 swapping (换入换出 GPU/CPU)。
* 与 Scheduler 的交互接口。
3. Scheduler 的具体调度策略实现:
* FCFS (First-Come, First-Served) 的简单实现。
* 更高级的策略如连续批处理 (Continuous Batching) 的核心逻辑,如何优先处理 prefill,如何平衡吞吐量和延迟。
* 如何考虑 KV Cache 占用作为调度决策的一部分。
* 动态批处理大小的调整。
4. Engine 的流水线调度逻辑 (Pipelining Scheduler):
* 微批次 (micro-batch) 的切分和管理。
* 不同流水线调度算法 (如 GPipe, PipeDream, 1F1B/Interleaved) 的实现细节和状态机。
* 如何管理流水线中的“气泡” (bubbles) 以提高效率。
* 前向传播和反向传播(如果考虑训练或微调)在流水线中的协调。
5. Worker 的内部实现:
* 如何加载和管理模型分片 (shards of layers)。
* 任务队列和内部线程池的设计。
* CUDA Stream 和 Event 的使用,以实现计算和通信的重叠。
* 错误处理和向 Engine 报告状态。
6. 数据结构和序列化/反序列化:
* ModelInputBatch, ModelOutputBatch, SequenceEvent 等核心数据结构的具体字段和设计。
* 在跨节点 PP 通信时,张量 (Tensor) 和其他元数据如何进行高效的序列化和反序列化。
7. 性能优化技巧:
* 算子融合 (Operator Fusion)。
* 量化 (Quantization)。
* FlashAttention 等高效 Attention 实现的集成。
* 减少数据拷贝和同步开销。
8. 容错和弹性:
* 如何处理节点故障或 Worker 故障?
* 请求重试机制。
* 状态的持久化和恢复(如果需要)。
9. 配置和部署:
* 系统如何通过配置文件来定义分布式拓扑(节点数、GPU数、TP/PP 维度等)?
* 部署到 Kubernetes 或其他集群管理系统。
1. 先从 Worker 层试点 FSM + Actor:
* 把 Worker 改写成单线程 Actor;Engine 暂保持旧实现,先观测稳定性收益。
* 用 CAF 或自研轻量框架都行,关键是 显式列状态 + 明确转移。
2. 逐步引入动态属性:
* 先只把 precision 做成可热切换(FP16 ↔ INT8),验证 KernelRegistry 路径。
* 再扩展到 quant_scheme、kv_cache_layout 等更细粒度属性。
3. 监控与回滚:
* FSM/Actor 转入 Error 时,用监控上报 + 自动降级(e.g. fallback FP16)机制,而不是直接崩溃服务。