# Zi **Repository Path**: dunimd/zi ## Basic Information - **Project Name**: Zi - **Description**: 统一的大模型数据集处理引擎 — 数据质量评估、清洗、转换、采样与增强框架。 - **Primary Language**: Rust - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: https://gitee.com/dunimd/zi - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2025-11-15 - **Last Updated**: 2026-02-23 ## Categories & Tags **Categories**: big-data **Tags**: dataset, AI, PiscesLx, Dunimd ## README
# Zi [English](README.md) | 简体中文 BiliBili Gitee GitHub Hugging Face ModelScope **统一的大模型数据集处理引擎 — 数据质量评估、清洗、转换、采样与增强框架。**

🏗️ 核心架构

### 📐 模块化设计 Zi 采用针对 LLM 数据处理工作流优化的模块化架构:
| 模块 | 描述 | |:--------|:-------------| | **pipeline** | 通过可配置算子进行顺序/并行/条件处理 | | **dag** | 基于 DAG 的执行,支持拓扑排序实现并行优化 | | **operator** | 基于 trait 的类型安全算子系统 | | **operators** | 算子实现(过滤、质量、语言、LLM 等) | | **ingest** | 数据摄入(JSONL/JSON/CSV/Parquet 流式读取) | | **export** | 数据导出(压缩、分片、Manifest 清单) | | **inspect** | 数据检查(Profile、Diff、Statistics) | | **enrich** | 数据增强(合成、标注、增强) | | **dsl** | DSL 解析器(YAML/JSON 配置) | | **version** | 三哈希版本控制(数据/代码/环境) | | **orbit** | 用于动态加载算子的插件系统 | | **distributed** | 分布式处理支持 | | **context** | DMSC 集成(日志/缓存/指标/追踪) |
### 🚀 核心特性 #### 🔍 管道处理 - 通过可配置算子进行顺序/并行/条件处理 - 基于 DAG 的执行,支持拓扑排序 - 使用三哈希的内容寻址缓存 - 支持增量处理 #### 📊 质量评估 - 多指标文本质量评分(ASCII 比例、熵、可读性) - 使用内置词典的毒性检测 - 基于脚本分析的语言检测 - 可配置的质量阈值和过滤 #### 🔧 数据转换 - 丰富的过滤算子(等于、包含、正则、范围等) - 元数据丰富和操作 - 支持自定义模式的 PII 编辑 - 文本规范化和标准化 #### 📝 去重 - 基于 SimHash 的近重复检测 - 基于 MinHash 的相似度估计 - 支持语义去重 #### 🤖 LLM 专用算子 - Token 统计(支持中英文混合估算) - 对话格式转换(ChatML、ShareGPT、Alpaca、OpenAI) - 上下文长度过滤/截断/分割 - QA 对提取(Markdown、编号、自动检测) - 指令微调数据格式化(Alpaca、Vicuna、Llama2、ChatML) #### 📥 数据摄入/导出 - 流式读取(支持大文件) - 格式自动检测(JSONL/JSON/CSV/Parquet) - 压缩文件支持(Gzip、Zstd) - 分片写入、原子写入 - Manifest 清单与血缘追踪 #### 🔬 数据检查 - 数据 Profile(字段统计、频率分布、异常检测) - 数据集 Diff(记录级、字段级对比) - 文本统计(词频、N-gram) #### ✨ 数据增强 - 模板化数据合成 - 规则驱动数据生成(随机数、UUID、Faker) - LLM 辅助合成接口

⚡ 快速开始

### Rust ```rust use serde_json::json; use Zi::pipeline::ZiCPipelineBuilder; use Zi::record::ZiCRecord; let records = vec![ ZiCRecord::ZiFNew(Some("1".into()), json!({"text": "Hello world"})), ZiCRecord::ZiFNew(Some("2".into()), json!({"text": "你好世界"})), ]; let steps = [ json!({"operator": "lang.detect", "config": {"path": "payload.text"}}), json!({"operator": "quality.score", "config": {"path": "payload.text"}}), json!({"operator": "llm.token_count", "config": {"text_field": "payload.text"}}), json!({"operator": "quality.filter", "config": {"min": 0.5}}), ]; let pipeline = ZiCPipelineBuilder::with_defaults() .build_from_config(&steps) .expect("合法的管道配置"); let result = pipeline.run(records).expect("管道执行成功"); ``` ### 数据摄入与导出 ```rust use Zi::ingest::{ZiCStreamReader, ZiCReaderConfig}; use Zi::export::{ZiCStreamWriter, ZiCWriterConfig, ZiCOutputFormat}; use std::path::Path; // 读取数据 let reader = ZiCStreamReader::ZiFNew() .ZiFWithConfig(ZiCReaderConfig { batch_size: 10000, skip_errors: true, ..Default::default() }); let batch = reader.ZiFReadPath(Path::new("data.jsonl"))?; // 导出数据 let mut writer = ZiCStreamWriter::ZiFNew(); let config = ZiCWriterConfig { format: ZiCOutputFormat::Jsonl, compression: ZiCCompression::Gzip, split_by_count: Some(100000), ..Default::default() }; let stats = writer.ZiFWrite(&batch, Path::new("output.jsonl.gz"))?; ``` ### DSL 配置 ```yaml # pipeline.yaml steps: - operator: lang.detect config: path: payload.text - operator: quality.score config: path: payload.text - operator: llm.token_count config: text_field: payload.text output_field: metadata.token_count - operator: llm.context_length config: text_field: payload.text max_tokens: 8192 action: Filter - operator: quality.filter config: min: 0.5 ``` ```rust use Zi::dsl::{ZiCDSLParser, ZiCDSLCompiler}; let parser = ZiCDSLParser::ZiFNew(); let result = parser.ZiFParseFile(Path::new("pipeline.yaml"))?; let compiler = ZiCDSLCompiler::ZiFNew(); let pipeline = compiler.ZiFCompile(&result.program)?; let output = pipeline.ZiFRun(batch)?; ```

🔧 配置

### 配置格式 ```json [ { "operator": "operator.name", "config": { "path": "payload.text", "key": "field_name" } } ] ``` ### 字段路径语法 - `payload.text` — 访问 payload 字段 - `metadata.field` — 访问元数据字段 - `payload.nested.field` — 访问嵌套字段 ### 特性标志 ```toml [features] default = ["full"] full = ["parquet", "csv", "parallel", "domain", "distributed", "plugin", "compression"] parquet = ["dep:parquet", "dep:arrow"] csv = ["dep:csv"] parallel = ["rayon"] domain = [] distributed = [] plugin = ["wasmtime"] compression = ["dep:flate2", "dep:zstd"] pyo3 = ["dep:pyo3", "pyo3/extension-module"] ```

🧪 安装与环境

### 前置要求 - **Rust**: 1.70+ - **Cargo**: 1.70+ - **平台**: Linux、macOS、Windows ### 快速安装 在项目的 `Cargo.toml` 中添加 Zi: ```toml [dependencies] zi = { git = "https://github.com/mf2023/Zi" } ``` 或使用 cargo add: ```bash cargo add zi --git https://github.com/mf2023/Zi ``` ### 构建 ```bash # 默认(完整功能) cargo build --release # 显式完整功能 cargo build --release --features full # 包含 Python 绑定 cargo build --release --features pyo3 cargo test cargo bench ```

🛠️ 插件系统

### 插件使用 通过共享库动态加载算子: ```rust let mut builder = ZiCPipelineBuilder::with_defaults(); builder.load_plugin("path/to/plugin.so")?; ``` 插件必须实现 `zi_register_operators` 函数。

🔒 版本管理

### 三哈希版本控制 Zi 使用三哈希版本控制实现可重复处理: - **数据哈希** — 输入数据哈希 - **代码哈希** — 算子代码哈希 - **环境哈希** — 执行环境哈希 这实现了精确的数据血缘追踪和结果精确重现。

📋 算子列表

### 过滤算子 (filter.*) | 算子 | 描述 | |:-----|:-----| | `filter.equals` | 字段相等过滤 | | `filter.not_equals` | 字段不等过滤 | | `filter.in` / `filter.not_in` | 包含/排除过滤 | | `filter.contains` | 字符串包含过滤 | | `filter.regex` | 正则表达式过滤 | | `filter.range` | 数值范围过滤 | | `filter.exists` / `filter.not_exists` | 字段存在检查 | ### 质量算子 (quality.*) | 算子 | 描述 | |:-----|:-----| | `quality.score` | 文本质量评分 | | `quality.filter` | 质量阈值过滤 | | `quality.toxicity` | 毒性检测 | ### 去重算子 (dedup.*) | 算子 | 描述 | |:-----|:-----| | `dedup.simhash` | SimHash 去重 | | `dedup.minhash` | MinHash 去重 | | `dedup.semantic` | 语义去重 | ### LLM 算子 (llm.*) | 算子 | 描述 | |:-----|:-----| | `llm.token_count` | Token 统计 | | `llm.conversation_format` | 对话格式转换 | | `llm.context_length` | 上下文长度过滤 | | `llm.qa_extract` | QA 对提取 | | `llm.instruction_format` | 指令格式化 | ### 其他算子 | 算子 | 描述 | |:-----|:-----| | `lang.detect` | 语言检测 | | `metadata.enrich` | 元数据丰富 | | `limit` | 记录数量限制 | | `sample.random` | 随机采样 | | `pii.redact` | PII 脱敏 |

❓ 常见问题

**Q: 如何添加新算子?** A: 实现 `ZiCOperator` trait 并通过算子注册表注册。 **Q: 如何启用并行执行?** A: 启用 `parallel` 特性标志并配置 DAG 调度器进行并行执行。 **Q: 如何处理大文件?** A: 使用 `ZiCRecordIterator` 进行流式批处理。 **Q: 如何使用 DSL 配置?** A: 使用 `ZiCDSLParser` 解析 YAML/JSON 配置文件。 **Q: 如何追踪数据血缘?** A: 使用 `ZiCManifest` 和 `ZiCLineage` 记录处理过程。

🌏 社区

- GitHub: https://github.com/mf2023/Zi - Gitee: https://gitee.com/dunimd/zi
## 📄 许可证与开源协议 ### 🏛️ 项目许可证

Apache License 2.0

本项目使用 **Apache License 2.0** 开源协议,详见 [LICENSE](LICENSE) 文件。 ### 📋 依赖包开源协议
| 📦 包 | 📜 许可证 | |:-----------|:-----------| | dmsc | Apache 2.0 | | serde | Apache 2.0 / MIT | | serde_json | MIT | | serde_yaml | MIT / Apache 2.0 | | regex | MIT | | rayon | Apache 2.0 / MIT | | pyo3 | Apache 2.0 / MIT | | arrow | Apache 2.0 | | parquet | Apache 2.0 | | csv | MIT | | blake3 | Apache 2.0 / MIT | | chrono | MIT / Apache 2.0 | | tokio | MIT | | rand | MIT / Apache 2.0 | | flate2 | MIT | | zstd | MIT | | thiserror | MIT | | anyhow | MIT |