# pipe **Repository Path**: happyape/pipe ## Basic Information - **Project Name**: pipe - **Description**: 作为mysql的slave client ,可实时监控数据流入状态,可对被监控的表进行任意后续操作。 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-07-13 - **Last Updated**: 2021-09-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: Java, MySQL ## README # Pipe #### 介绍 - 这是一个mysql的slave client 的java网络接口, 实现了与master的连接,登录,注册,同步database的metainfo,以及binlog的insert和update事件的监控功能。 - 你可以利用此包,打开一个与mysql的master的连接,以实现你自己在业务上非侵入式的数仓数据收集接口,数据库异构,数据实时监控等功能。 如下所示,它可以帮助你专注于业务数据流的管理: ![输入图片说明](https://images.gitee.com/uploads/images/2021/0928/145033_3783417c_128629.png "1.png") #### 软件架构 基于Netty以及mysql-5.0+ row 模式 #### 安装教程 - 打包pipe-mysql-connector到你的环境里,参考下面的代码实例来使用。 - pipe-mysql-parser已废弃 #### 使用说明 - 目前版本仅支持insert和update, 视情况支持其他命令。 - 生产环境慎用, 想采用成熟的产品,请下载阿里巴巴的canal #### 代码示例 ```xml pipe-mysql-connector com.kiven.pipe 1.0-SNAPSHOT ``` ```java /**声明一个连接,需要: 地址,端口,用户名,密码*/ MysqlConnector connector = new MysqlConnector("127.0.0.1", 3306, "root", "test"); /**你要如何处理client的数据修改或插入事件,实现你自己的listener即可, 下面只是一个示例,如果你需要进行业务处理,比如同步数据到其他数据库,请务必采用线程池!*/ connector.addNewRowDataListener(rows -> { if (rows.getTableName().equals("your_table")){ /**由于在mysql协议里,多行修改是整合在一次传输里的,因此rows.getRows()里面其实只有一行数据, get(0)就可以了*/ WriteRowEventV2Rows.Row row = rows.getRows().get(0); /**这是一个update操作 * row.getValues()所得到的list内部,是按列的定义顺序存储的相关的内容 */ if (row.getUpdateValues().size() != 0){ //获取第一列的数据 Object columnOneValue = row.getvalues().get(0); //获取第二列的数据 Object columnTwoValue = row.getvalues().get(1); //循环获取全部数据 for (Object rowData : row.getUpdateValues()) { System.out.println("updated rowData:" + rowData); } } else { /**这是一个insert操作*/ for (Object rowData : row.getValues()) { System.out.println("inserted rowData:" + rowData); } } } }); /**请先用命令查询 show MASTER status 查询一下你的 binlog 信息*/ connector.startBinlogDumpTask("test", 11776, "mysql-bin.000024"); ``` ### 既然已经有了canal,为什么开发这个东东? 造轮子嘛,你懂的,自己写的东西,没那么复杂,也用的明白。 ### 其他想法! 我觉得基础组件搭建好之后,数据库异构是可以直接用类似DDL的脚本来搞定的。 比如: t_user表和t_order表由user_id互相连接起来之后。我想在从库把这两张表异构为一张新表t_user_order,并且同步他们的数据到这张新表。 我在想能不能用如下的类似语句来搞定即可: create t_user_order from t_user, t_order where t_user.user_id = t_order_user_id; 这样更省事,是不是,当然这是我脱离业务的随意猜想,欢迎大家拍砖。 #### 参考资料 1. [MySQL Internals Manual](https://dev.mysql.com/doc/internals/en/) 3. [网络上的优秀博客](https://cloud.tencent.com/developer/article/1768901)