# cloud-seata **Repository Path**: ly-springcloud/cloud-seata ## Basic Information - **Project Name**: cloud-seata - **Description**: SpringCloud + ZK + Redis + Seata + Mybatis-Plus 分布式框架及分布式事务 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-09-17 - **Last Updated**: 2021-11-01 ## Categories & Tags **Categories**: Uncategorized **Tags**: SpringCloud ## README ##### 一. 框架构成     本教程,采用SpringCloud Hoxton.SR8版 为分布式底层框架,持久层使用Mybatis-Plus,缓存为Redis,注册中心为阿里巴巴的Zookeeper(zk),各个服务间的通信使用的是openfeign,使用阿里巴巴Seata作为分布式事务,本教程采用分布式的默认AT模式。 ##### 二. 简单介绍阿里巴巴分布式事务Seata     Seata 是阿里开源的分布式事务框架,属于二阶段提交模式,首先简单了解下数据库层面上的分布式解决方案 a.数据库层面的XA方案 ``` RM(Resource Manager): 用于直接执行本地事务的提交和回滚。在分布式集群中,一台MySQL服务器就是一个RM。 TM(Transaction Manager): TM是分布式事务的核心管理者。事务管理器与每个RM进行通信,协调并完成分布式事务的处理。 发起一个分布式事务的MySQL客户端就是一个TM ``` XA的两阶段提交分为Prepare阶段和Commit阶段,过程如下: ``` 准备(prepare)阶段: 即所有的RM锁住需要的资源,在本地执行这个事务(执行sql,写redo/undo log等), 但不提交,然后向Transaction Manager报告已准备就绪。 提交(commit)阶段: 当Transaction Manager确认所有参与者都ready后,向所有参与者发送commit命令。 ``` 如图所示: ![输入图片说明](https://images.gitee.com/uploads/images/2021/0916/155034_1692541e_1198099.png "微信截图_20210916155015.png") b.seata Seata是阿里开源的分布式事务解决方案中间件,对业务侵入小,核心概念包含三个角色: ``` TM:事务发起者。用来告诉TC全局事务的开始,提交,回滚。 RM:事务资源,每一个RM都会作为一个分支事务注册在TC。 TC:事务协调者,即独立运行的seata-server,用于接收事务注册,提交和回滚。 ``` ![输入图片说明](https://images.gitee.com/uploads/images/2021/0916/154954_04db50af_1198099.png "20190818220614755.png") 具体的执行流程如下: ``` 用户服务的 TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID。 用户服务的 RM 向 TC 注册 分支事务,该分支事务在用户服务执行新增用户逻辑,并将其纳入 XID 对应全局事务的管辖。 用户服务执行分支事务,向用户表插入一条记录。 逻辑执行到远程调用积分服务时(XID 在微服务调用链路的上下文中传播)。积分服务的RM 向 TC 注册分支事务,该分支事务执行增加积分的逻辑,并将其纳入 XID 对应全局事务的管辖。 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务。 用户服务分支事务执行完毕。 TM 向 TC 发起针对 XID 的全局提交或回滚决议。 TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。 ``` ##### 三. 环境准备 ###### 1.下载注册中心zk(这里统一在win环境下安装部署) [点击官网下载](http://zookeeper.apache.org/releases.html),我这里使用的是最新版3.7版本 ``` 解压到目录下,例如:D:\Program Files\apache-zookeeper-3.7.0 进入到conf目录下,将zoo_simple.cfg重命名为zoo.cfg,并使用默认配置,不更改文件内的内容 然后进入到bin目录下双击zkServer.cmd文件,启动zk服务 ``` 如下,则表示zk服务启动成功,zk服务的默认端口号是2181 ![输入图片说明](https://images.gitee.com/uploads/images/2021/0917/102902_c3fcb04f_1198099.png "微信截图_20210917102842.png") ###### 2.下载seata,[下载地址](https://github.com/seata/seata/releases),同样选择的是当时最新的版本1.4.2,这里下载seata-server和Source code 源码两个压缩包,一个是seata服务,另外一个压缩包初始化zk服务时会用到 ![输入图片说明](https://images.gitee.com/uploads/images/2021/0917/103757_5ca6b289_1198099.png "微信截图_20210914111144.png") ``` 解压seata-server到目录下,例如D:\Program Files\seata-server-1.4.2 进入到conf目录下,修改file.conf.example 为file.conf,并编辑file.conf文件和registry.conf文件 首先编辑file.conf,因为seata服务需要有数据库支撑,因此我们需要新建一个数据库,数据库名(自定义),但是要保证和配置文件内保持一致。 我这里使用seata作为数据库名 ``` a.编辑file.conf文件 ![输入图片说明](https://images.gitee.com/uploads/images/2021/0917/112959_4ee89825_1198099.png "微信截图_20210917112940.png") b.编辑registry.conf文件,主要修改该文件下的registry对象和config对象内的内容。完整的配置文件在config的server目录下 ![输入图片说明](https://images.gitee.com/uploads/images/2021/0917/113348_9d7fd490_1198099.png "微信截图_20210917113335.png") ![输入图片说明](https://images.gitee.com/uploads/images/2021/0917/113727_5ee38988_1198099.png "微信截图_20210917113714.png") ###### 3.将seata 节点初始化到zk 服务中。 ``` 1.解压刚刚下载的源码文件seata-1.4.2,例如解压到D:\Program Files\seata-1.4.2 2.进入script\config-center,修改config.txt文件 3.修改zk服务的地址以及seata 服务的数据库地址和密码。(如下图),如果没有redis服务可以把redis配置删除,有的话修改。 4.在zk服务的bin目录下找到zkCli.cmd双击(zk服务要先启动)新建seata节点,命令为 create /seata 5.因为我们使用的是zk注册中心,进入到script\config-center\zk目录下,执行zk-config.sh 脚本,再win下可以使用git执行 ``` ![输入图片说明](https://images.gitee.com/uploads/images/2021/0917/135803_c1080b5a_1198099.png "微信截图_20210917135555.png") ###### 4.初始化seata服务的数据库 ``` 此3个表是seata服务的默认表结构和目录,可在github的seata仓库内找到 DROP TABLE IF EXISTS `branch_table`; CREATE TABLE `branch_table` ( `branch_id` bigint(20) NOT NULL, `xid` varchar(128) NOT NULL, `transaction_id` bigint(20) DEFAULT NULL, `resource_group_id` varchar(32) DEFAULT NULL, `resource_id` varchar(256) DEFAULT NULL, `lock_key` varchar(128) DEFAULT NULL, `branch_type` varchar(8) DEFAULT NULL, `status` tinyint(4) DEFAULT NULL, `client_id` varchar(64) DEFAULT NULL, `application_data` varchar(2000) DEFAULT NULL, `gmt_create` datetime DEFAULT NULL, `gmt_modified` datetime DEFAULT NULL, PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; DROP TABLE IF EXISTS `global_table`; CREATE TABLE `global_table` ( `xid` varchar(128) NOT NULL, `transaction_id` bigint(20) DEFAULT NULL, `status` tinyint(4) NOT NULL, `application_id` varchar(64) DEFAULT NULL, `transaction_service_group` varchar(64) DEFAULT NULL, `transaction_name` varchar(64) DEFAULT NULL, `timeout` int(11) DEFAULT NULL, `begin_time` bigint(20) DEFAULT NULL, `application_data` varchar(2000) DEFAULT NULL, `gmt_create` datetime DEFAULT NULL, `gmt_modified` datetime DEFAULT NULL, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`,`status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; DROP TABLE IF EXISTS `lock_table`; CREATE TABLE `lock_table` ( `row_key` varchar(128) NOT NULL, `xid` varchar(96) DEFAULT NULL, `transaction_id` mediumtext, `branch_id` mediumtext, `resource_id` varchar(256) DEFAULT NULL, `table_name` varchar(32) DEFAULT NULL, `pk` varchar(32) DEFAULT NULL, `gmt_create` datetime DEFAULT NULL, `gmt_modified` datetime DEFAULT NULL, PRIMARY KEY (`row_key`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` ###### 5.启动seata服务 ``` 进入到D:\Program Files\seata-server-1.4.2的bin目录下,双击seata-server.bat 启动seata服务 ``` ##### 四. 后端框架服务准备 ###### 1.zk依赖及配置 ``` pom文件内引入zk依赖 org.springframework.cloud spring-cloud-starter-zookeeper-all org.apache.zookeeper zookeeper org.apache.zookeeper zookeeper 3.4.12 org.slf4j slf4j-log4j12 spring: cloud: config: enabled: false zookeeper: #注册中心地址 connect-string: 127.0.0.1:2181 config: enabled: true root: configuration defaultContext: apps profileSeparator: '::' #分布式服务的地址,即软件部署的地址 discovery: instance-host: 127.0.0.1 ``` ###### 2.分布式事务seata的配置 ``` com.alibaba.cloud spring-cloud-alibaba-seata 2.2.0.RELEASE io.seata seata-spring-boot-starter io.seata seata-spring-boot-starter 1.4.2 io.seata seata-config-zk 1.4.2 seata: enabled: true application-id: ${spring.application.name} #my_test_tx_group 必须和前面config.txt初始化zk 配置的一致 tx-service-group: my_test_tx_group enable-auto-data-source-proxy: true #zk服务的地址 config: type: zk zk: server-addr: 127.0.0.1:2181 session-timeout: 6000 connect-timeout: 2000 username: "" password: "" registry: type: zk zk: server-addr: 127.0.0.1:2181 session-timeout: 6000 connect-timeout: 2000 username: "" password: "" ``` ###### 3.新建file.conf 在后端框架的classpath资源目录下新建file.conf文件 ``` transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true #thread factory for netty thread-factory { boss-thread-prefix = "NettyBoss" worker-thread-prefix = "NettyServerNIOWorker" server-executor-thread-prefix = "NettyServerBizHandler" share-boss-worker = false client-selector-thread-prefix = "NettyClientSelector" client-selector-thread-size = 1 client-worker-thread-prefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT boss-thread-size = 1 #auto default pin or 8 worker-thread-size = 8 } } service { #vgroup->rgroup vgroup_mapping.middle-goods-seata-service-group = "default" #only support single node default.grouplist = "127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false } client { async.commit.buffer.limit = 10000 lock { retry.internal = 10 retry.times = 30 } } ``` ###### 4.新建registry.conf 在后端框架的classpath资源目录下新建registry.conf文件,主要是zk 的服务,可以把当前的两个文件配置看成为客户端配置 ``` registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "zk" zk { cluster = "default" serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 5000 username = "" password = "" } } config { # file、nacos 、apollo、zk、consul、etcd3 type = "zk" zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } } ``` ##### 五. Mybatis-Plus 的配置 因为seata需要代理才能真正的使用,[因此查看了github的seata分支](https://github.com/seata/seata-samples/blob/master/springcloud-eureka-feign-mybatis-seata),但都是mybatis的,我这里使用的是mybatis-plus,所以只能进行改造。启动类的@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)排除调自动配置,使用配置文件走代理 ``` @Configuration public class DruidConfig { /** * 初始化druid 数据库 */ @Bean(name = "druidDataSource") @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource(){ DruidDataSource druidDataSource = new DruidDataSource(); return druidDataSource; } /** * init datasource proxy * * @Param: druidDataSource datasource bean instance * @Return: DataSourceProxy datasource proxy */ @Primary @Bean("dataSource") public DataSourceProxy dataSource(DataSource druidDataSource){ return new DataSourceProxy(druidDataSource); } /** * init global transaction scanner * applicationId 这里我使用的是该程序的名称即配置文件内的spring.application.name * 使用注册中心时,程序的名称是必须的,为了区分不同的服务 * my_test_tx_group 是上述配置的,必须和前面保持一致 * @Return: GlobalTransactionScanner */ @Bean public GlobalTransactionScanner globalTransactionScanner() { logger.info("配置seata........"); return new GlobalTransactionScanner(applicationId,"my_test_tx_group"); } } ``` ##### 六. 程序启动前的准备 启动zkCli.cmd 输入命令 ls /seata 查看该seata节点下是否有数据,如果没有,证明初始化seata配置时没有生效,可使用代码执行上述config.txt文件 ``` /** * @author 蚂蚁会花呗 * @date 2021/9/8 11:03 */ public class ZkDataInit { /** * 节点路径 */ private static final String NODE_PATH = "/seata"; /** * zk 服务地址 */ private static final String CONNECT_TOSTRING = "127.0.0.1:2181"; public static ExecutorService executorService = Executors.newCachedThreadPool(); public static void main(String[] args) throws Exception { try { RetryPolicy retryPolicy2 = new RetryUntilElapsed(5000, 1000); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(CONNECT_TOSTRING) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy2) .build(); curatorFramework.start(); Stat stat1 = curatorFramework.checkExists().forPath(NODE_PATH); if(Objects.isNull(stat1)){ curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(NODE_PATH, null); } Properties properties = new Properties(); try { //config.txt的绝对路径 File file=new File("D:/Program Files/seata-1.4.2/script/config-center/config.txt"); InputStream in = new FileInputStream(file); properties.load(in); Set keys = properties.keySet(); for (Object key : keys) { putConfig(curatorFramework,key.toString(), properties.get(key).toString()); } } catch (IOException e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } finally { executorService.shutdown(); } } /** * * @param curatorFramework * @param dataId * @param content * @return */ public static boolean putConfig(CuratorFramework curatorFramework, final String dataId, final String content) { Boolean flag = false; String path = "/seata/" + dataId; try { Stat stat1 = curatorFramework.checkExists().forPath(path); if (Objects.isNull(stat1)) { curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, content.getBytes()); flag = true; } return flag; } catch (Exception e) { e.printStackTrace(); } return false; } } ``` 执行之后再次使用 ls /seata 命令,可查看节点路径下的配置信息 ##### 七. 为每个服务都初始化 undo_log 数据表 ``` 比如我这里有三个服务程序对应三个不同的库: 账户服务:cloud-seata-account 订单服务:cloud-seata-order 仓储服务:cloud-seata-storage 为每个服务的数据库都创建undo_log表 DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC; ``` ##### 八. 模拟实际应用 模拟一个普通的订单库存交易流程,用户创建订单后扣减库存,并扣减账户。在service层加上@GlobalTransactional注解即可,如下是部分代码,完整代码可去 cloud-seata-order程序中查看,这里服务间的通讯使用的是feign ``` @GlobalTransactional(rollbackFor = Exception.class) public void create(OrderCreateDto createDto) { log.info("---------->开始交易"); //创建订单 log.info("创建订单开始!"); this.save(createDto); log.info("创建订单结束!"); //扣减库存,调用cloud-seata-storage服务 log.info("扣减库存开始"); storageFeignService.decreaseStorage(new DecreaseStorageDto(createDto.getProductId(),createDto.getCount())); log.info("扣减库存结束"); //扣减账户,调用cloud-seata-account服务 log.info("开始扣减账户!"); BigDecimal money = createDto.getMoney().multiply(new BigDecimal(createDto.getCount())); DecreaseMoneyDto decreaseMoneyDto = new DecreaseMoneyDto(createDto.getUserId(),money); accountFeignService.decreaseMoney(decreaseMoneyDto); log.info("扣减账户结束!"); log.info("交易结束!"); } ``` ##### 九. 启动及注意事项 ###### 1.启动 分别启动每个服务,如果出现如下,则启动成功 ![输入图片说明](https://images.gitee.com/uploads/images/2021/0917/170610_3cbe3786_1198099.png "微信截图_20210910142900.png") ###### 2.注意事项 分别模拟该流程成功和失败的情况,观察数据库是否回滚。发现正常情况下肯定没问题,但是失败的情况下,只有当前程序回滚,而其他服务没有回滚,这完全不正确,理应是各个服务都回滚才对。虽然理解seata服务时通过xid保证全局唯一的,但是找不到解决的办法,能排除的是程序没问题,而是服务间的调用出了问题,查阅资料才发现因为使用了feign各个服务间没有传递xid导致seata通讯异常([这里用到了feign但是没有提到解决办法,在issues下会看到有人提问该问题](https://github.com/seata/seata-samples/blob/master/springcloud-eureka-feign-mybatis-seata)),解决办法是,在各个服务传递间加入请求头xid,并使用拦截器绑定各个服务传递的xid 实现RequestInterceptor 在请求头内加入xid ``` /** * @author 蚂蚁会花呗 * @date 2021/9/9 20:39 */ @Component public class FeignHeaderInterceptor implements RequestInterceptor { // 这里在feign请求的header中加入xid // 注意:这里一定要将feign.hystrix.enabled设为false,因为为true时feign是通过线程池调用,而XID并不是一个InheritablThreadLocal变量。 @Override public void apply(RequestTemplate requestTemplate) { String xid = RootContext.getXID(); if (StringUtils.isNotBlank(xid)) { requestTemplate.header("xid", xid); } } } ``` 拦截器内接收xid并进行绑定 ``` /** * @author 蚂蚁会花呗 * @date 2021/9/10 14:11 */ @Component public class SeataXidFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { String restXid = request.getHeader("xid"); if (StringUtils.isNotBlank(restXid)) { RootContext.bind(restXid); } filterChain.doFilter(request, response); } } ``` 经过以上配置后,重新启动各个服务,并模拟失败的情况,发现能正常回滚了,至此,SpringCloud、Zk、Mybati-Plus、Seata(AT模式)框架完成