# matrix_swoole **Repository Path**: adminmatrix/matrix_swoole ## Basic Information - **Project Name**: matrix_swoole - **Description**: 基于 Swoole 扩展的高性能 PHP 服务框架,专为 AdminMatrix 生态设计,提供异步任务处理、WebSocket 实时通信、HTTP 服务加速等能力,助力构建高并发、低延迟的企业级应用。 核心特性: 基于 Swoole 4.x+ 构建,支持协程、异步 IO 内置 HTTP/WebSocket 服务,开箱即用 与 ThinkPHP 等主流框架深度兼容 提供任务队列、定时任务、进程管理 - **Primary Language**: PHP - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2025-05-14 - **Last Updated**: 2026-02-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # AdminMatrix Swoole 扩展 [English](./README.md) | 中文 基于 [think-swoole](https://github.com/top-think/think-swoole) 二次开发的 Swoole 扩展,支持 HTTP、WebSocket、RPC 等功能,并新增了 Gateway 服务支持。 ## 交流群 [点击加群](https://jq.qq.com/?_wv=1027&k=VRcdnUKL) ## 功能特性 ### 核心功能 - **HTTP 服务** - 高性能 HTTP Server,支持协程风格 - **WebSocket 服务** - 支持路由调度方式的 WebSocket 实现 - **RPC 远程调用** - 支持 JSON 格式的 RPC 协议 - **Gateway 服务** - 基于 GatewayWorker 架构的 WebSocket Gateway - **热更新** - 文件修改自动重载 - **队列支持** - 集成 think-queue ### 新增 Gateway 服务 基于 GatewayWorker 架构实现的 WebSocket Gateway 服务,支持多 Gateway 服务配置和事件监听机制。 #### 特性 - 支持多个 Gateway 服务实例 - GatewayWorker 风格的静态方法 - 事件监听机制(onopen, onmessage, onclose) - 独立进程启动 ## 安装 ```bash composer require adminmatrix/matrix_swoole ``` 需要先安装 [swoole](https://www.swoole.com/) 扩展。 ## 配置 在 `config/swoole.php` 中配置服务参数: ```php return [ 'gateway' => [ 'enable' => true, 'registerAddress' => '127.0.0.1', 'registerPort' => 1236, 'registerName' => 'websocket_register', 'registerType' => 'text', 'services' => [ [ 'enable' => true, 'name' => '聊天服务', 'port' => 2346, 'address' => '0.0.0.0', 'handler' => \adminmatrix\swoole\gateway\Event::class, 'worker_num' => 2, ], ], ], ]; ``` ## 使用方法 ### 启动服务 ```bash php think swoole ``` 启动后默认在 `0.0.0.0:8080` 启动 HTTP Server。 ### Gateway 使用示例 ```php use adminmatrix\swoole\gateway\Gateway; // 注册事件监听器 Gateway::getInstance()->on('gateway.onopen', function ($clientId, $request) { echo "客户端连接: {$clientId}\n"; }); Gateway::getInstance()->on('gateway.onmessage', function ($clientId, $message) { echo "收到消息: {$clientId} -> {$message}\n"; // 广播消息 Gateway::sendToAll([ 'type' => 'message', $clientId, 'content' => $message ]); }); Gateway 'from' =>::getInstance()->on('gateway.onclose', function ($clientId) { echo "客户端断开: {$clientId}\n"; }); ``` #### 支持的静态方法 - `Gateway::sendToAll($send_data, $client_id_array, $exclude_client_id, $raw)` - 向所有客户端发送消息 - `Gateway::sendToClient($client_id, $send_data, $raw)` - 向指定客户端发送消息 - `Gateway::isOnline($client_id)` - 检查客户端是否在线 - `Gateway::getAllClientCount()` - 获取在线客户端数量 - `Gateway::getAllClientIdList()` - 获取所有在线客户端 ID 列表 ### WebSocket 使用 ```php use \think\swoole\Websocket; use \think\swoole\websocket\Event; use \Swoole\WebSocket\Frame; Route::get('ws', 'websocket/index'); class IndexController extends Controller { public function index() { return \think\swoole\helper\websocket() ->onOpen(function(Websocket $websocket, Request $request) { echo "WebSocket 连接打开\n"; }) ->onMessage(function(Websocket $websocket, Frame $frame) { $websocket->push(['type' => 'reply', 'data' => $frame->data]); }) ->onClose(function(Websocket $websocket) { echo "WebSocket 连接关闭\n"; }); } } ``` ### 流式输出 ```php class IndexController extends Controller { public function stream() { return \think\swoole\helper\iterator(value(function(){ foreach(range(1, 10) as $i) { yield $i; sleep(1); } })); } } ``` ## 进程配置建议 ### Gateway 进程 - 每个 Gateway 进程可处理 5000-10000 连接 - 在线连接 < 10000:2 个 Gateway 进程 - 每增加 10000 连接:增加 1 个 Gateway 进程 ### BusinessWorker 进程 - 阻塞式 IO:CPU 核心数的 1-4 倍 - 非阻塞式 IO:CPU 核心数的 4-8 倍 ## 项目结构 ``` src/ ├── command/ # 命令行相关 ├── concerns/ # Traits 特性 ├── config/ # 配置文件 ├── contract/ # 接口定义 ├── coroutine/ # 协程工具 ├── exception/ # 异常类 ├── gateway/ # Gateway 服务 ├── helpers.php # 辅助函数 ├── ipc/ # 进程间通信 ├── listen/ # 事件监听 ├── lock/ # 分布式锁 ├── message/ # 消息类 ├── middleware/ # 中间件 ├── packet/ # 数据包处理 ├── pool/ # 连接池 ├── response/ # 响应类 ├── resetters/ # 重置器 ├── rpc/ # RPC 实现 ├── support/ # 工具支持 ├── watcher/ # 文件监控 └── websocket/ # WebSocket 实现 ``` ## 注意事项 - 4.0 版本开始协程风格服务端默认不支持静态文件访问,建议使用 nginx 处理静态文件 - 4.0 版本没有 task 进程,使用 think-queue 代替 ## License # 使用方法 ## geteway ### 配置 > config/swoole.php 配置文件 ``` 'gateway' => [ 'enable'=> true, // 开启服务 'services' => [ [ 'enable' => true, 'name' => '聊天服务', 'port' => 2346, 'address' => '0.0.0.0', 'handler' => \adminmatrix\swoole\gateway\Event::class, 'worker_num' => swoole_cpu_num(), 'options' => [], "eventHandler" => '', "pingInterval" => 30, "pingNotResponseLimit" => 1, "pingData" => '{"type":"ping"}', ], ... 更多服务 ] ] ``` ## rpc 服务 ### RPC 服务核心 并发核心:PHP 的传统 FPM 模式不支持真正的并发,必须依赖 Swoole/Workerman 等协程 / 异步扩展,通过协程(Coroutine)实现 “同时发起多个 RPC 请求”。 耗时边界:总耗时≈单个调用的最长耗时(比如如果其中一个调用是 12 秒,另外两个是 10 秒,总耗时就是 12 秒),而非累加。 RPC 的作用:RPC 本身是 “跨服务调用的协议 / 方式”,解决的是不同服务之间的通信问题;而并发是 “调用方式”,二者结合才能实现你说的 “10 秒返回 3 个结果”。 串行执行(原本 30 秒的逻辑) 假设你要做三件事,每件都需要 10 分钟: 给快递员打电话(10 分钟):必须等打完这个电话,才能打下一个 给外卖员打电话(10 分钟):等快递电话打完,再打外卖电话 给维修师傅打电话(10 分钟):等外卖电话打完,再打维修电话 总耗时:10+10+10=30 分钟 → 对应你说的 “3 个 new 各 10 秒,总计 30 秒”。 RPC 并发调用(优化后 10 秒的逻辑) 还是这三件事,但你用了 “免提 + 同时拨号” 的方式: 先拨快递员电话(不挂,放免提),10 分钟后等回复 不等快递员回复,立刻拨外卖员电话(同样放免提),10 分钟后等回复 再立刻拨维修师傅电话(放免提),10 分钟后等回复 你只需要坐在旁边等,10 分钟后三个人几乎同时回复你,总耗时就是 10 分钟 → 对应 “RPC 并发调用,3 个操作同时执行,总耗时 10 秒”。 ### 注意 如果需要使用RPC 必须要开启 swoole:run > php think swoole:run > 运行后配置文件 > config/swoole.php ```php 'rpc' => [ 'server' => [ 'enable' => true, 'host' => '0.0.0.0', 'port' => 9000, 'worker_num' => swoole_cpu_num(), 'services' => [ \app\api\logic\v1\OrderLogic::class, ], ], 'client' => [ ], ], ``` ### 实现实例 > 比如 我们要加载一个用户的订单数据 > RPC ```php // app/rpc/interface/OrderService.php 业务逻辑 实际可以使用模型 ```php // app/api/logic/OrderLogic.php 200, 'msg' => 'success', 'data' => [ 'list' => [ ['id' => 1, 'order_no' => 'ORDER_001', 'amount' => 99.99], ['id' => 2, 'order_no' => 'ORDER_002', 'amount' => 199.99], ], 'page' => $page, 'pageSize' => $pageSize, 'total' => 2, ], ]; } } ``` > 使用RPC 请求返回 ```php getOrderList(1, 10)); } } ``` ## 携程 ## 锁 > 「锁」的作用就是:让同一时间只有一个进程能操作这个资源,其他进程必须等它用完,避免并发冲突。 > 1. enable => false:锁的总开关 这个值是false时,整个锁机制都不生效,多进程可以随意竞争资源;设为true才会启用锁。 类比:办公室里的打印机(共享资源),如果没锁(enable=false),所有人都能同时去用,容易卡纸 / 打错;开了锁(enable=true),只有拿到钥匙的人能先用,其他人排队。 > 1. type => 'table':本机进程的 “轻量锁” table指的是Swoole Table,是 Swoole 内置的共享内存表,基于它实现的锁只适用于「同一台机器」的多进程。 特点:速度极快(内存级操作),但只能管本机的进程,跨机器无效。 类比:办公室内部的打印机钥匙,只有本办公室的人能用来排队,其他办公室的人管不到。 > 1. redis配置:跨机器的 “分布式锁” 当你的 Swoole 服务部署在多台机器上(比如 2 台服务器都跑 Swoole),本机的table锁就没用了(A 机器的进程管不到 B 机器的进程),这时需要用 Redis 实现「分布式锁」—— 所有机器的进程都去 Redis 里 “抢锁”,谁抢到谁用资源。 配置里的max_active/max_wait_time:和之前 IPC 里的 Redis 配置逻辑一样,控制 Redis 连接池的大小和等待时间,避免 Redis 连接不够用导致锁机制卡壳。 类比:总公司的共享打印机,多个分公司(多台机器)的人都要用,就需要一个统一的 “锁管理中心”(Redis),所有人都去这抢钥匙,不管在哪办公都得排队。 ## 协程 ### 并发执行 #### 协程说明 > 协程的核心思维(先理解本质,再看代码) > 协程(Coroutine)是 Swoole 实现 “并发” 的核心,你可以把它理解为: 轻量级的 “微线程”,由 PHP 代码层面调度(而非操作系统),多个协程在同一个进程里 “交替执行”,遇到耗时操作(比如 sleep、网络请求)时,协程会 “让出 CPU” 给其他协程,从而实现 “看似同时执行” 的效果。 类比理解(和你之前的 “打电话” 例子呼应) 串行执行:你挨个打 3 个电话,每个打 1 分钟,总共 3 分钟(必须等上一个挂了才能打下一个); 协程并发:你用 “免提 + 多线拨号”,同时拨 3 个电话,每个电话都要等 1 分钟对方接,但你不用等,拨完就切到下一个,最终 1 分钟后 3 个电话都接通了,总耗时 1 分钟。 ```php /** * 示例1: 协程并发执行多个任务 * 使用 go() 创建协程,可以并发执行多个任务 */ protected function asyncMultipleTasks() { $results = []; $channel = new Channel(3); // 并发执行3个任务 go(function () use ($channel) { // 模拟耗时任务1 Coroutine::sleep(1); // 模拟1秒延迟 $channel->push(['task' => '任务1', 'result' => '完成任务1', 'time' => microtime(true)]); }); go(function () use ($channel) { // 模拟耗时任务2 Coroutine::sleep(1); $channel->push(['task' => '任务2', 'result' => '完成任务2', 'time' => microtime(true)]); }); go(function () use ($channel) { // 模拟耗时任务3 Coroutine::sleep(1); $channel->push(['task' => '任务3', 'result' => '完成任务3', 'time' => microtime(true)]); }); // 接收所有任务的结果(总共需要约1秒,而不是3秒) for ($i = 0; $i < 3; $i++) { $results[] = $channel->pop(); } $channel->close(); return [ 'type' => '并发多任1务', 'total_time' => '约1秒(串行需要3秒)', 'results' => $results ]; } ``` > go() 函数: 作用:创建一个新的协程,立即执行里面的匿名函数; 特点:调用 go () 后不会阻塞当前流程,3 个 go () 会在瞬间全部启动,相当于 “同时按下 3 个任务的启动键”。 > Channel 作用:协程间的 “通信桥梁”,用于接收协程的执行结果(因为协程是异步的,不能直接 return 结果); 容量 3:表示最多能存 3 个结果,push () 会在通道满时阻塞,pop () 会在通道空时阻塞; 核心逻辑:每个协程完成后把结果 “推” 进通道,主流程通过 “取” 通道里的结果,等待所有协程完成。 > Coroutine::sleep(1): 注意:这不是 PHP 原生的 sleep ()!原生 sleep () 会阻塞整个进程,而 Coroutine::sleep () 是 “协程休眠”,只会让当前协程暂停,CPU 会切给其他协程执行; 类比:你打第一个电话时,对方说 “等 1 分钟回你”,你不用干等,而是立刻打第二个电话,这就是协程 sleep 的作用 > Coroutine\run(): 作用:启动 Swoole 的协程调度器,所有协程必须在这个调度器里运行(相当于给协程提供 “执行环境”)。 #### 注意点 1、协程的 “轻量性”:一个进程可以创建上万甚至十万个协程(远多于线程),因为协程是用户态调度,没有操作系统切换的开销; 2、Channel 的容量:如果创建 Channel 时容量设为 1,3 个协程的 push () 会排队,失去并发效果(所以容量要≥协程数); 3、协程安全:多个协程如果操作同一个变量(比如 $results 直接赋值),会出现 “竞态条件”(数据混乱),所以必须用 Channel 传递结果; 4、耗时操作必须是 “协程化” 的:只有 Swoole 提供的协程化函数(比如 Coroutine::sleep ()、Swoole\Coroutine\MySQL、Swoole\Coroutine\Redis)才会让出 CPU,原生的 file_get_contents ()、sleep () 依然会阻塞整个进程。 #### 进阶 协程 可以与 RPC调用 > 可以尝试把示例里的 Coroutine::sleep (1) 换成实际的 RPC 调用 / 数据库查询 ### queue 消息列队 > 首先需要安装 thinkphp 的 topthink/think-queue > composer require topthink/think-queue 使用 命令行后; 可以不用再使用进程守护开启 php think queue:listen 开消息列队了 > php think swoole:run MIT License # 测试数据结果 # =========================================== === 最终压测结果 === CPU核心数:物理8核 / 逻辑16核 压测配置:并发线程数10 | 总请求数10000 成功请求数:9999 | 失败请求数:1 | 成功率:99.99% 总耗时:51.21秒 | 平均响应时间:34.23毫秒 最终QPS:195.25 【系统资源峰值统计】 CPU占用:最高67.7% | 最低26.3% | 最终8.2% 内存占用:最高31.6% | 最低31.5% | 最终31.6% =============================================================================