# nestjs-scheduler-queue **Repository Path**: ukiot/nestjs-scheduler-queue ## Basic Information - **Project Name**: nestjs-scheduler-queue - **Description**: 基于 BullMQ 的任务队列系统,为每个调度器提供独立的任务处理队列。 - 🚀 **动态队列创建**: 为每个调度器动态生成独立的处理队列 - 🔄 **任务类型支持**: 支持取档、归档、移动、维护、盘点等多种任务类型 - 🎯 优先级调度: - 📊 **统计监控**: - 🎮 **队列控制**: 支持暂停、恢复、清空队列操作 - 🔌 可扩展: 支持自定义任务处理器 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-09-11 - **Last Updated**: 2025-09-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 调度器队列系统 基于 BullMQ 的任务队列系统,为每个调度器提供独立的任务处理队列。 ## 功能特性 - 🚀 **动态队列创建**: 为每个调度器动态生成独立的处理队列 - 🔄 **任务类型支持**: 支持取档、归档、移动、维护、盘点等多种任务类型 - 🎯 **优先级调度**: 支持任务优先级设置,高优先级任务优先执行 - 🔁 **失败重试**: 自动重试失败任务,支持指数退避策略 - 📊 **统计监控**: 提供详细的队列统计信息和健康检查 - 🎮 **队列控制**: 支持暂停、恢复、清空队列操作 - 🔌 **可扩展**: 支持自定义任务处理器 - 🎯 **事件驱动**: 完整的事件监听机制 ## 目录结构 ``` src/scheduler/queue/ ├── interfaces/ # 接口定义 │ ├── task.interface.ts # 任务相关接口 │ └── queue-manager.interface.ts # 队列管理器接口 ├── services/ # 服务实现 │ ├── queue-manager.service.ts # 队列管理核心服务 │ └── scheduler-queue.service.ts # 调度器队列服务 ├── processors/ # 任务处理器 │ └── default-task.processor.ts # 默认任务处理器 ├── queue.module.ts # 队列模块 ├── index.ts # 导出索引 └── README.md # 使用文档 ``` ## 快速开始 ### 1. 安装依赖 ```bash pnpm add bullmq ioredis pnpm add -D @types/ioredis ``` ### 2. 环境配置 在 `.env` 文件中配置 Redis 连接: ```env REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD= REDIS_DB=0 ``` ### 3. 导入模块 在 `app.module.ts` 或相关模块中导入: ```typescript import { Module } from '@nestjs/common'; import { QueueModule } from './scheduler/queue'; @Module({ imports: [QueueModule], // ... }) export class AppModule {} ``` ### 4. 使用服务 ```typescript import { Injectable } from '@nestjs/common'; import { SchedulerQueueService, TaskPriority } from './scheduler/queue'; @Injectable() export class MyService { constructor( private readonly queueService: SchedulerQueueService, ) {} async initializeScheduler(schedulerId: number) { // 初始化调度器队列 await this.queueService.initializeSchedulerQueue(schedulerId); } async addRetrieveTask(schedulerId: number) { // 添加取档任务 const job = await this.queueService.addRetrieveTask(schedulerId, { taskId: 'TASK_001', archiveId: 123, robotId: 1, fromLocation: { zone: 1, column: 2, section: 3, layer: 4, slot: 5, side: 'left' }, toLocation: { zone: 2, column: 1, section: 1, layer: 1, slot: 1, side: 'right' }, priority: TaskPriority.HIGH, description: '紧急取档任务', }); console.log('任务已添加:', job.id); } } ``` ## API 参考 ### SchedulerQueueService 主要的队列服务类,提供高级别的队列操作。 #### 队列管理 ```typescript // 初始化调度器队列 await queueService.initializeSchedulerQueue(schedulerId: number, processor?: TaskProcessor); // 暂停队列 await queueService.pauseScheduler(schedulerId: number); // 恢复队列 await queueService.resumeScheduler(schedulerId: number); // 清空队列 await queueService.clearSchedulerQueue(schedulerId: number); // 销毁队列 await queueService.destroySchedulerQueue(schedulerId: number); ``` #### 任务操作 ```typescript // 添加取档任务 await queueService.addRetrieveTask(schedulerId, taskData); // 添加归档任务 await queueService.addReturnTask(schedulerId, taskData); // 添加移动任务 await queueService.addMoveTask(schedulerId, taskData); // 添加维护任务 await queueService.addMaintenanceTask(schedulerId, taskData); // 添加盘点任务 await queueService.addInventoryTask(schedulerId, taskData); // 批量添加任务 await queueService.addBatchTasks(schedulerId, tasks); // 取消任务 await queueService.cancelTask(schedulerId, jobId); // 重试失败任务 await queueService.retryFailedTask(schedulerId, jobId); ``` #### 监控和查询 ```typescript // 获取任务状态 const status = await queueService.getTaskStatus(schedulerId, jobId); // 获取队列统计 const stats = await queueService.getSchedulerStats(schedulerId); // 获取所有队列统计 const allStats = await queueService.getAllSchedulerStats(); // 获取队列任务列表 const tasks = await queueService.getQueueTasks(schedulerId, 'waiting', page, pageSize); // 检查队列健康状态 const health = await queueService.checkSchedulerHealth(schedulerId); ``` ### 任务数据结构 ```typescript interface TaskData { taskId: string; // 任务ID type: TaskType; // 任务类型 schedulerId: number; // 调度器ID priority: TaskPriority; // 优先级 archiveId?: number; // 档案ID robotId?: number; // 机器人ID deviceId?: number; // 设备ID fromLocation?: StorageLocation; // 起始位置 toLocation?: StorageLocation; // 目标位置 coordinate?: DeviceCoordinate; // 设备坐标 parameters?: Record; // 任务参数 description?: string; // 任务描述 createdAt: Date; // 创建时间 timeout?: number; // 超时时间 maxRetries?: number; // 最大重试次数 } ``` ### 任务类型 ```typescript enum TaskType { RETRIEVE = 'retrieve', // 取档任务 RETURN = 'return', // 归档任务 MOVE = 'move', // 移动任务 MAINTENANCE = 'maintenance', // 维护任务 INVENTORY = 'inventory', // 盘点任务 } ``` ### 任务优先级 ```typescript enum TaskPriority { LOW = 1, // 低优先级 NORMAL = 5, // 普通优先级 HIGH = 10, // 高优先级 CRITICAL = 15, // 紧急优先级 } ``` ## 自定义任务处理器 创建自定义处理器来实现特定的业务逻辑: ```typescript import { Injectable } from '@nestjs/common'; import { TaskProcessor, TaskData, TaskResult, TaskStatus } from '../interfaces/task.interface'; @Injectable() export class CustomTaskProcessor implements TaskProcessor { async process(taskData: TaskData): Promise { const startTime = new Date(); try { // 实现自定义任务处理逻辑 const result = await this.executeCustomLogic(taskData); return { taskId: taskData.taskId, status: TaskStatus.COMPLETED, data: result, startTime, endTime: new Date(), duration: Date.now() - startTime.getTime(), }; } catch (error) { return { taskId: taskData.taskId, status: TaskStatus.FAILED, error: error.message, startTime, endTime: new Date(), duration: Date.now() - startTime.getTime(), }; } } async executeCustomLogic(taskData: TaskData): Promise { // 实现具体的业务逻辑 return { message: '自定义处理完成' }; } async onCompleted(taskData: TaskData, result: TaskResult): Promise { // 任务完成回调 console.log(`任务 ${taskData.taskId} 完成`); } async onFailed(taskData: TaskData, error: Error): Promise { // 任务失败回调 console.error(`任务 ${taskData.taskId} 失败:`, error); } async onProgress(taskData: TaskData, progress: number): Promise { // 任务进度回调 console.log(`任务 ${taskData.taskId} 进度: ${progress}%`); } } // 使用自定义处理器 await queueService.initializeSchedulerQueue(schedulerId, new CustomTaskProcessor()); ``` ## 事件监听 监听队列事件以实现更细粒度的控制: ```typescript // 注册事件监听器 queueService.registerEventListener(schedulerId, 'completed', (job, result) => { console.log('任务完成:', job.id, result); }); queueService.registerEventListener(schedulerId, 'failed', (job, error) => { console.error('任务失败:', job.id, error); }); queueService.registerEventListener(schedulerId, 'progress', (job, progress) => { console.log('任务进度:', job.id, progress); }); // 移除事件监听器 queueService.removeEventListener(schedulerId, 'completed', callback); ``` ## 配置选项 ### Redis 配置 ```typescript // 在环境变量中配置 REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD=your_password REDIS_DB=0 ``` ### 队列默认配置 队列会使用以下默认配置: ```typescript { defaultJobOptions: { removeOnComplete: 50, // 保留已完成任务数量 removeOnFail: 100, // 保留失败任务数量 attempts: 3, // 最大尝试次数 backoff: { type: 'exponential', // 指数退避 delay: 2000, // 基础延迟时间 }, }, concurrency: 3, // 并发处理数量 settings: { stalledInterval: 30000, // 停滞检查间隔 maxStalledCount: 1, // 最大停滞次数 }, } ``` ## 监控和调试 ### 队列统计信息 ```typescript const stats = await queueService.getSchedulerStats(schedulerId); console.log('队列统计:', { name: stats.name, waiting: stats.waiting, // 等待中任务数 active: stats.active, // 活跃任务数 completed: stats.completed, // 已完成任务数 failed: stats.failed, // 失败任务数 delayed: stats.delayed, // 延迟任务数 isPaused: stats.isPaused, // 是否暂停 processingRate: stats.processingRate, // 处理速率 avgProcessingTime: stats.avgProcessingTime, // 平均处理时间 }); ``` ### 健康检查 ```typescript const health = await queueService.checkSchedulerHealth(schedulerId); console.log('队列健康状态:', { isHealthy: health.isHealthy, message: health.message, lastCheck: health.lastCheck, stats: health.stats, }); ``` ## 最佳实践 1. **队列命名**: 每个调度器使用唯一的队列名称 `scheduler-{schedulerId}` 2. **任务ID**: 使用有意义且唯一的任务ID 3. **优先级设置**: 合理设置任务优先级,避免低优先级任务被饿死 4. **错误处理**: 实现完善的错误处理和重试机制 5. **监控告警**: 定期检查队列健康状态和统计信息 6. **资源清理**: 及时清理不需要的队列和任务 7. **性能优化**: 根据业务需求调整并发数和其他配置参数 ## 注意事项 - 确保 Redis 服务可用且配置正确 - 任务处理器应该是幂等的,支持重复执行 - 长时间运行的任务应该定期报告进度 - 生产环境中应该启用持久化和集群模式 - 定期清理已完成和失败的任务以节省内存 ## 故障排除 ### 常见问题 1. **Redis 连接失败** - 检查 Redis 服务是否运行 - 验证连接配置是否正确 2. **任务处理缓慢** - 增加并发处理数量 - 检查任务处理逻辑是否有性能瓶颈 3. **任务频繁失败** - 检查任务处理器的错误处理逻辑 - 增加重试次数或调整退避策略 4. **内存使用过高** - 减少保留的已完成任务数量 - 定期清理不需要的任务和队列