# STS **Repository Path**: lsl6958/sts ## Basic Information - **Project Name**: STS - **Description**: 数据采集 - **Primary Language**: Java - **License**: LGPL-3.0 - **Default Branch**: dev - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 6 - **Forks**: 1 - **Created**: 2025-06-26 - **Last Updated**: 2025-12-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # STS 介绍 使用 jdk24 编写的文件和 jdbc 数据库之间数据迁移的程序 喜欢看视频的可以前往 b 站观看视频 [讲解视频](https://www.bilibili.com/video/BV1rqN7zUEic) ## Feature 1. 低内存占用 1. 基于 Java stream 流实现低内存的数据迁移 2. 内存可控制在 30m 之内完成超大数据迁移(实际可以更低) 2. 高性能(详情可见 [性能测试](#性能测试) 章节) 3. 支持超大数据量 4. 支持多种不同数据源之间的数据迁移(详情可见 [功能](#功能) 章节) 5. 支持使用规则引擎([aviatorscript](https://github.com/killme2008/aviatorscript))进行数据的加工处理 1. mock 数据 2. 数据脱敏 3. 数据过滤 6. 提供良好的扩展性, 方便新增新的实现 ## 功能 **读取支持列表:** 1. csv 2. excel 3. txt(普通文本) 4. jdbc(理论支持 jdbc 的数据库都支持) **写入支持列表:** 1. csv 2. excel 3. txt(普通文本) 4. jdbc(理论支持 jdbc 的数据库都支持, 目前支持数据库类型如下, jdbc 支持 upsert) 1. Mysql 2. Postgresql 3. OpenGauss ## 使用教程 ### sts-core #### 文件到文件 > 例如 csv <----> csv, csv <----> excel, csv <----> txt 等等 ```java // source CsvSourceConfig csvSourceConfig = new CsvSourceConfig("source.csv"); // sink CsvSinkConfig csvSinkConfig = new CsvSinkConfig("sink.csv"); // task StsTask stsTask = new StsTask(csvSourceConfig, csvSinkConfig); stsTask.run(); ``` #### 文件到数据库 ```java // source CsvSourceConfig csvSourceConfig = new CsvSourceConfig("source.csv"); // sink JdbcConnectionInfo jdbcConnectionInfo = new JdbcConnectionInfo(); jdbcConnectionInfo.setName("test"); jdbcConnectionInfo.setDbType("mysql"); jdbcConnectionInfo.setUrl("jdbc:mysql://ip:port/database?rewriteBatchedStatements=true"); jdbcConnectionInfo.setUsername("root"); jdbcConnectionInfo.setPassword("123456"); JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(jdbcConnectionInfo, "schema", "table"); // task StsTask stsTask = new StsTask(csvSourceConfig, jdbcSinkConfig); // 如果要使用 upsert 导入数据库可使用 // StsTask stsTask = new StsTask(csvSourceConfig, jdbcSinkConfig, SinkTypeEnum.UPSERT); stsTask.run(); ``` #### 数据库到文件 ```java // source JdbcConnectionInfo jdbcConnectionInfo = new JdbcConnectionInfo(); jdbcConnectionInfo.setName("test"); jdbcConnectionInfo.setDbType("mysql"); jdbcConnectionInfo.setUrl("jdbc:mysql://ip:port/database?rewriteBatchedStatements=true"); jdbcConnectionInfo.setUsername("root"); jdbcConnectionInfo.setPassword("123456"); JdbcSourceConfig jdbcSourceConfig = new JdbcSourceConfig(jdbcConnectionInfo, "select * from table", Integer.MIN_VALUE); //sink CsvSinkConfig csvSinkConfig = new CsvSinkConfig("sink.csv"); // task StsTask stsTask = new StsTask(jdbcSourceConfig, csvSinkConfig); // 如果要使用 upsert 导入数据库可使用 // StsTask stsTask = new StsTask(csvSourceConfig, jdbcSinkConfig, SinkTypeEnum.UPSERT); stsTask.run(); ``` #### 数据库到数据库 ```java // source JdbcConnectionInfo jdbcConnectionInfo = new JdbcConnectionInfo(); jdbcConnectionInfo.setName("test"); jdbcConnectionInfo.setDbType("mysql"); jdbcConnectionInfo.setUrl("jdbc:mysql://ip:port/database?rewriteBatchedStatements=true"); jdbcConnectionInfo.setUsername("root"); jdbcConnectionInfo.setPassword("123456"); JdbcSourceConfig jdbcSourceConfig = new JdbcSourceConfig(jdbcConnectionInfo, "select * from table1", Integer.MIN_VALUE); //sink JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(jdbcConnectionInfo, "schema", "table2"); // task StsTask stsTask = new StsTask(jdbcSourceConfig, jdbcSinkConfig); stsTask.run(); ``` #### Source 自定义 header > 某些时候文件中只有数据, 可以在构造方法中指定 header ```java // source CsvSourceConfig csvSourceConfig = new CsvSourceConfig(false, List.of("column1", "column2", "column3"), "source.csv"); ``` #### 注册 jdbc sink > 如果想使用自己实现的其他数据库类型, 或者内置的数据库类型无法满足需求, 想进行覆盖 ```java //在 stsTask.run(); 之前调用如下举例代码 DbPairUtil.registryPair("oracle", Pair.of(OracleSinkImpl.class, OracleOperationImpl.class)) ``` #### 规则引擎使用 可以前往 b 站观看视频 [讲解视频](https://www.bilibili.com/video/BV1wkGHz7E5D/) #### 注册自定义函数 > 在 Transform 中使用自定义的规则引擎函数 ```java //在 stsTask.run(); 之前调用如下举例代码 FunctionUtil.registryFunction(new CustomFunction()); ``` ### sts-swing #### Source Source 文件源: 文件源通过文件的后缀去判断是 csv, excel 还是普通 text 文件 .csv 结尾统一识别为 csv 文件, .xlsx 结尾统一识别为 excel 文件, 其他结尾的文件统一识别为普通 text 文件 ![image-20250805111953079](./README.images/image-20250805111953079.png) Source jdbc 源: 除了 File 之外的其他源都是 jdbc 源, jdbc 源就是在配置文件中配置的 ![image-20250805112606792](./README.images/image-20250805112606792.png) jdbc 需要通过写 select sql 去抽取数据, 如果 Source 和 Sink 两边的列不一致需要在 select 语句中通过 as 去映射对应的字段名 ![image-20250805112521722](./README.images/image-20250805112521722.png) #### Transform Transform 在基于规则生成的情况下需要指定开始符号用于表示该内容为一个规则, 开始符号可以自定义 ![image-20250805111609430](./README.images/image-20250805111609430.png) ##### 内置函数 aviatorscript 也内置了很多函数, 可以在下方网址查询: https://www.yuque.com/boyan-avfmj/aviatorscript/ashevw ##### 内置自定义函数 | 函数 | 生成值 | 功能说明 | 值示例 | | ---- | -------- | -------- | ---- | | id() | 15 位数值 | 随机生成唯一键 id, 使用的是 [IdGenerator](https://github.com/yitter/IdGenerator) 开源项目生成的 | 705433731031045 | | uuid() | 36 位 uuid | | 3167e27e-7e70-4e2d-bd87-89efa2a07e5a | | snowId() | 19 位数值 | 使用雪花算法生成唯一键 id | 1952559794698604544 | | randCnName() | 中文姓名 | 随机生成一个中文名 | 华怡冉 | | randCnPhone() | 11 位电话号码 | 随机生成电话号码 | 14717031094 | | randDate() | 随机生成日期 | 随机生成指定范围内的日期, 左闭右开
参数:
参数一: 起始日期(字符串类型, 必填)
参数二: 截止日期(字符串类型, 必填)
参数三: 生成日期格式化类型(字符串类型, 非必填, 仅支持 yyyy-MM-dd 默认和 yyyyMMdd 两种类型)
示例: randDate("2000-01-01", "2000-12-31")
解释: 随机生成 2000-01-01(包含)到 2000-12-31(不包含)范围内的随机日期
示例: randDate("2000-01-01", "2000-12-31", "yyyyMMdd")
解释: 会将生成的随机日期格式化为 8 位 | 2000-10-12
20001012 | | randDecimal() | 随机生成小数 | 随机生成指定范围内的小数, 左闭右开
参数:
参数一: 起始值(数值类型, 必填)
参数二: 终止值(数值类型, 必填)
示例: randDecimal(10, 100)
解释: 随机生成 10(包含)到 100(不包含)范围内的随机小数 | 17.028227454264353 | | randEle() | 随机生成指定列表内的值 | 随机生成指定列表内的值
参数:
参数列表: 字符串类型, 必填
示例: randEle("A", "B", "C", "D", "E")
解释: 随机生成 ABCDE 中的任意一个值 | C | | randEmail() | 随机生成邮箱地址 | 随机生成邮箱地址
参数:
参数一: 生成邮箱地址中@符之前的最长长度(字符串类型, 必填)
参数二: @符之后的固定内容(字符串类型, 非必填)
示例: randomEmail(10)
解释: 参数表示生成邮箱地址中@符之前的最长长度
示例: randomEmail(10, "163.com")
解释: 第二个参数表示@符之后的固定内容 | r2qojh@55v.me
aq3lgswgmn@163.com | | randInt() | 随机生成一个整数 | 随机生成一个整数, 左闭右开
参数:
参数一: 起始值(整数类型, 必填)
参数二: 终止值(整数类型, 必填)
示例: randInt(-10, 10)
解释: 随机生成 -10(包含)到 10(不包含)范围内的随机整数 | -7 | | round() | 保留固定位数的小数 | 保留固定位数的小数
参数:
参数一: 需要保留的小数(数值类型, 必填)
参数二: 需要保留的位数(整数类型, 必填)
参数三: 保留的模式(整数类型, 非必填, 不填默认为四舍五入模式)
示例: round(10.3523452345, 2, 4)
解释: 使用四舍五入的保留方式将 10.3523452345 保留 2 位小数 | 10.35 | | row.get() | 获取当前行指定单元格的值 | 获取当前行指定单元格的值
参数:
参数一: 固定内容 row, 指代当前行(必填)
参数二: 单元格的下标, 从 0 开始(整数类型, 必填)
示例: row.get(row, 0)
解释: 获取当前行第一个单元的内容 | | | seqEle() | 按照给定顺序赋值其中的值 | 按照给定顺序赋值其中的值
参数:
参数列表: 字符串类型, 必填
示例: seqEle("xx", "yy"), 第一次获取到的就是 xx, 第二次是 yy, 第三次是 xx, 第四次是 yy, 第五次是 xx......
注意: 当前该函数只支持单线程, 多线程会有问题 | xx | #### Sink Sink 文件源: 文件源通过文件的后缀去判断保存为 csv, excel 还是普通 text 文件 .csv 结尾统一识别为 csv 文件, .xlsx 结尾统一识别为 excel 文件, 其他结尾的文件统一识别为普通 text 文件 ![image-20250805112954216](./README.images/image-20250805112954216.png) Sink jdbc 源: 除了 File 之外的其他源都是 jdbc 源, jdbc 源就是在配置文件中配置的 jdbc 源需要指定到表, 需要先填写模式, 再填写表名, 可以通过按钮获取 jdbc 源下所有的模式和表名 ![image-20250805113202995](./README.images/image-20250805113202995.png) Sink 提供了一个清空表的按钮, 方便在导入前清空表数据 Sink 配置按钮目前可以配置线程数和批量数 ![image-20250805113322800](./README.images/image-20250805113322800.png) #### Insert 和 Upsert Insert 所有的 Sink 源都支持, Upsert 只有 jdbc 源才支持, Upsert 如果 Sink 选择的文件源默认也会使用 Insert 执行 Upsert 的使用要求 Sink jdbc 源的表必须有主键 ## 性能测试 ### 场景一 | 数据量 | 103 万(csv) | | ---------- | ------------------------------------------------------- | | 时间 | 3.5s | | 数据库硬件 | 本机(mac mini m4 16+256) | | 数据库 | Mysql-9.3.0(docker arm 版本) | | 表 | 6 字段, 单主键(bigint 类型, 雪花算法), 无索引(除主键外) | | 程序 | zulu jdk24, 20 线程 insert 写入, 单批提交 1000 条 | | 程序硬件 | mac mini m4 16+256, 和数据库在同一物理机上 | ### 场景二 | 数据量 | 1.2 亿(csv, 70G+) | | ---------- | ----------------------------------------------------------- | | 时间 | 接近 80 分钟 | | 数据库硬件 | 服务器(未知) | | 数据库 | GaussDB(版本未知) | | 表 | 20+ 字段, 单主键(varchar 类型, uuid 无序), 无索引(除主键外) | | 程序 | zulu jdk21, 10 线程 insert 写入, 单批提交 2000 条 | | 程序硬件 | 虚拟主机(2 逻辑核心, 3.0GHz 主频, 无睿频) + 16G 内存 | ## 注意事项 **问: Mysql 为什么导入特别慢?** 答: 查看 url 中是否开启了批处理, 加上 rewriteBatchedStatements = true 参数 **问: Mysql 多线程导入没有单线程快?** 答: sts-core 中使用了虚拟线程, mysql-9.0(具体版本忘记了)版本之前对虚拟线程存在兼容性问题, 如果想使用 mysql 多线程, 建议修改源码在 sink 中使用平台线程 **问: Excel 导出数据丢失?** 答: Excel 单个 sheet 页最多存放 1048576 条数据, 因此 ExcelSinkImpl 中存在 limit 的限制 ## 开源库 项目中使用的第三方开源库列表如下(sts-core 中核心三方库): 1. [eec](https://gitee.com/wangguanquan/eec) - 读取 excel 数据为 stream 流 2. [fastcsv](https://github.com/osiegmar/FastCSV) - 读取 csv 数据为 stream 流 3. [aviatorscript](https://github.com/killme2008/aviatorscript) - 规则引擎 4. [hutool](https://gitee.com/chinabugotech/hutool) - 工具库 5. [beecp](https://gitee.com/Chris2018998/BeeCP) - 高性能数据库连接池 ## 分支说明 | 分支 | 说明 | | ------ | ----------------------------------------- | | master | 主分支,保护分支,此分支不接受 PR。 | | dev | 开发分支,接受 PR,PR 请提交到 dev 分支。 | ## 参与贡献 ### source 扩展方式 ```java // 1. 创建 CustomSourceConfig 继承 BaseSourceConfig, 实现 1 个方法即可 public class CustomSourceConfig extends BaseSourceConfig { @Override public StsSource buildImpl() { // 返回对应的 CustomSourceImpl 实现类 } } // 2. 创建 CustomSourceImpl 实现 StsSource 接口, 重写 3 个方法即可 public class CustomSourceImpl implements StsSource { @Override public List getHeader() { } @Override public Stream> getDataStream() { } @Override public void close() throws Exception { } } ``` ### 普通 sink 扩展方式 ```java // 1. 创建 CustomSinkConfig 继承 BaseSinkConfig, 实现 1 个方法即可 public class CustomSinkConfig extends BaseSinkConfig { @Override public StsSink buildImpl(StsSource stsSource, StsTask stsTask) { // 返回对应的 CustomSinkImpl 实现类 } } // 2. 创建 CustomSinkImpl 实现 StsSink 接口, 重写 2 个方法即可 public class CustomSinkImpl implements StsSink { @Override public void insert() { } @Override public void upsert() { } } ``` ### Jdbc Sink 扩展方式 > 如果某个数据库是基于已有实现的数据库进行二开的, 如果有细微区别可以继承具体对应的 JdbcSinkImpl, 重写有区别的方法即可 ```java // 1. 创建 CustomJdbcSinkImpl 继承 JdbcSinkImpl, 重写 3 个方法即可 public class CustomJdbcSinkImpl extends JdbcSinkImpl { // 提供构造函数 public CustomJdbcSinkImpl(StsSource stsSource, JdbcSinkConfig jdbcSinkConfig, StsTask stsTask, JdbcOperation jdbcOperation) { super(stsSource, jdbcSinkConfig, stsTask, jdbcOperation); } // insert 需要的方法 @Override public void insertSetCellValue(PreparedStatement preparedStatement, List columnInfoList, List rowValueList) throws Exception { } // 构建 upsert sql @Override public String buildUpsertSql(List columnInfoList, Map tableAllPrimaryKeyColumnMap) { } // upsert 需要的方法 @Override public void upsertSetCellValue(PreparedStatement preparedStatement, List columnInfoList, List rowValueList, Map tableAllPrimaryKeyColumnMap) throws Exception { } } // 2. 创建 CustomOperationImpl 实现 JdbcOperation 接口, 实现里面的方法即可 public class CustomOperationImpl implements JdbcOperation { } // 3. 在 DbPairUtil 的静态代码块中加入新的实现类 public class DbPairUtil { private static final Map, Class>> DB_PAIR_MAP = new HashMap<>(); static { DB_PAIR_MAP.put("MYSQL", Pair.of(MysqlSinkImpl.class, MysqlOperationImpl.class)); DB_PAIR_MAP.put("OPENGAUSS", Pair.of(OpenGaussSinkImpl.class, PostgresqlOperationImpl.class)); DB_PAIR_MAP.put("POSTGRESQL", Pair.of(PostgresqlSinkImpl.class, PostgresqlOperationImpl.class)); // 将自己实现的 put 进 DB_PAIR_MAP 中 } } ``` ## 构建 本人使用组件版本如下, 仅供参考: 1. zulu jdk24 2. maven 3.9.9 3. idea 2025.1.3 Swing 程序 JVM 参数如下: ``` -Xms128m -Xmx1024m -XX:MetaspaceSize=64m --enable-native-access=ALL-UNNAMED -XX:+UnlockExperimentalVMOptions -XX:+UseCompactObjectHeaders ```