欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

SpringCloudAlibabaSeata分布式事务解决方案

时间:2023-06-13
Spring Cloud Alibabaseata 分布式事务解决方案

一、分布式事务问题二、Seata简介三、Seata的部署

3.1 Seata Server端配置

3.1.1 修改配置文件3.1.2 MySQL 数据库配置3.1.3 启动Seata Server端 3.2 Seata Client 客户端配置

3.2.1 业务前置准备3.2.2 创建undo_log表3.2.3 Seata Client配置文件3.2.4 pom.xml文件3.2.5 application.yml文件3.2.6 业务代码编写:加@GlobalTransactional3.2.7 config代码编写3.2.8 主启动类 3.3 测试 一、分布式事务问题

一次业务操作需要跨多个数据源或者需要跨多个系统进行远程调用,就会产生分布式事务问题。

举例:

用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

仓储服务:对给定的商品扣除仓储数量。订单服务:根据采购需求创建订单。帐户服务:从用户帐户中扣除余额。

单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源,业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证,但是全局数据一致性问题是无法保证的。

二、Seata简介

Seata官网地址:http://seata.io/zh-cn/

Seata是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

Seata术语:

TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

Seata处理过程:

TM向TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID;XID在微服务调用链路的上下文传播;RM向TC注册分支事务,将其纳入XID对应全局事务的管辖;TM向TC发起针对XID的全局提交或回滚决议;TC调度XID下管辖的全部分支事务完成提交或者回滚请求 三、Seata的部署

Seata的版本选择: Spring Cloud Alibaba与Spring Boot、Spring Cloud版本对应关系

官网版本说明

本次使用:Seata 1.2.0

3.1 Seata Server端配置

官方配置文档:Seata部署指南

下载seata-server-1.2.0和seata-1.2.0源码

seate-server下载: https://seata.io/zh-cn/blog/download.htmlseata-1.2.0源码下载: https://github.com/seata/seata/releases 3.1.1 修改配置文件

解压后,进入 conf 目录开始参数的配置。我们修改 file.conf 和 registry.conf 这两个文件。

1、进入conf文件夹,修改file.conf文件

2、修改registry.conf文件

如果config设置成了file,则不需要网上的设置nacos的配置 3.1.2 MySQL 数据库配置

我们先创建数据库seata(数据库要与file.conf中db设置那里对应),数据库的建表语句在README文件的server连接中

让后执行mysql.sql

-- -------------------------------- The script used when storeMode is 'db' ---------------------------------- the table to store GlobalSession dataCREATE TABLE IF NOT EXISTS `global_table`( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`)) ENGINE = InnoDB DEFAULT CHARSET = utf8;-- the table to store BranchSession dataCREATE TABLE IF NOT EXISTS `branch_table`( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME(6), `gmt_modified` DATETIME(6), PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`)) ENGINE = InnoDB DEFAULT CHARSET = utf8;-- the table to store lock dataCREATE TABLE IF NOT EXISTS `lock_table`( `row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(96), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_branch_id` (`branch_id`)) ENGINE = InnoDB DEFAULT CHARSET = utf8;

3.1.3 启动Seata Server端 3.2 Seata Client 客户端配置 3.2.1 业务前置准备

业务场景:用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

用户A购买商品,调用 A服务 创建订单完成,调用 B服务 扣减库存,然后调用 C服务 扣减账户余额。每个服务内部的数据一致性由本地事务来保证,多个服务调用来完成业务,全局事务数据一致性则由 Seata 来保证。

订单服务A:根据采购需求创建订单。

仓储服务B:对给定的商品扣除仓储数量。

帐户服务C:从用户帐户中扣除余额。

**业务数据库准备:**配置三个业务分别对应各自的数据库。

A服务 对应数据库:seata_order ;表:t_orderB服务 对应数据库:seata_storage ;表:t_storageC服务 对应数据库:seata_account ;表:t_account

建表语句:

# 创建seata_order数据库CREATE DATAbase seata_order;# 创建t_order表CREATE TABLE seata_order.t_order( `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY, `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id', `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id', `count` INT(11) DEFAULT NULL COMMENT '数量', `money` DECIMAL(11,0) DEFAULT NULL COMMENT '金额', `status` INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中; 1:已完结') ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

# 创建seata_storage数据库CREATE DATAbase seata_storage;# 创建t_storage表CREATE TABLE seata_storage.t_storage( `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY, `product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id', `total` INT(11) DEFAULT NULL COMMENT '总库存', `used` INT(11) DEFAULT NULL COMMENT '已用库存', `residue` INT(11) DEFAULT NULL COMMENT '剩余库存') ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;# 插入一条数据 INSERT INTO seata_storage.t_storage(`id`,`product_id`,`total`,`used`,`residue`)VALUES('1','1','100','0','100');

# 创建seata_account数据库CREATE DATAbase seata_account;# 创建t_account表CREATE TABLE seata_account.t_account( `id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id', `user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id', `total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度', `used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额', `residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度') ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;# 插入一条数据 INSERT INTO seata_account.t_account(`id`,`user_id`,`total`,`used`,`residue`) VALUES('1','1','1000','0','1000');

3.2.2 创建undo_log表

按照上述3库分别建对应的回滚日志表,建表文件在README_ZH文件中的client中

undo_log表建表语句如下:

-- for AT mode you must to init this sql for you business database、the seata server not need it.CREATE TABLE IF NOT EXISTS `undo_log`( `id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id', `branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id', `xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id', `context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info', `log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` DATETIME NOT NULL COMMENT 'create datetime', `log_modified` DATETIME NOT NULL COMMENT 'modify datetime', PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

完成图示:

以下配置以seata-order-service2001为例

项目结构:

3.2.3 Seata Client配置文件

配置文件的编写参考README_ZH文件中的client中:

1、file.conf,修改这一处

transport { # tcp, unix-domain-socket type = "TCP" #NIO, NATIVE server = "NIO" #enable heartbeat heartbeat = true # the tm client batch send request enable enableTmClientBatchSendRequest = false # the rm client batch send request enable enableRmClientBatchSendRequest = true # the rm client rpc request timeout rpcRmRequestTimeout = 2000 # the tm client rpc request timeout rpcTmRequestTimeout = 10000 # the tc client rpc request timeout rpcTcRequestTimeout = 5000 #thread factory for netty threadFactory { bossThreadPrefix = "NettyBoss" workerThreadPrefix = "NettyServerNIOWorker" serverExecutorThread-prefix = "NettyServerBizHandler" shareBossWorker = false clientSelectorThreadPrefix = "NettyClientSelector" clientSelectorThreadSize = 1 clientWorkerThreadPrefix = "NettyClientWorkerThread" # netty boss thread size bossThreadSize = 1 #auto default pin or 8 workerThreadSize = "default" } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none"}service { #transaction service group mapping vgroupMapping.my_test_tx_group = "default" #only support when registry.type=file, please don't set multiple addresses default.grouplist = "127.0.0.1:8091" #degrade, current not support enableDegrade = false #disable seata disableGlobalTransaction = false}client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackonConflict = true } reportRetryCount = 5 tablemetaCheckEnable = false tablemetaCheckerInterval = 60000 reportSuccessEnable = false sagaBranchRegisterEnable = false sagaJsonParser = "fastjson" sagaRetryPersistModeUpdate = false sagaCompensatePersistModeUpdate = false tccActionInterceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000 } tm { commitRetryCount = 5 rollbackRetryCount = 5 defaultGlobalTransactionTimeout = 60000 degradeCheck = false degradeCheckPeriod = 2000 degradeCheckAllowTimes = 10 interceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000 } undo { dataValidation = true onlyCareUpdateColumns = true logSerialization = "jackson" logTable = "undo_log" compress { enable = true # allow zip, gzip, deflater, 7z, lz4, bzip2, zstd default is zip type = zip # if rollback info size > threshold, then will be compress # allow k m g t threshold = 64k } } loadBalance { type = "RandomLoadBalance" virtualNodes = 10 }}log { exceptionRate = 100}tcc { fence { # tcc fence log table name logTableName = tcc_fence_log # tcc fence log clean period cleanPeriod = 1h }}

2、registry.conf,修改这一处

registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom type = "nacos" nacos { application = "seata-server" serverAddr = "127.0.0.1:8848" namespace = "" username = "" password = "" ##if use MSE Nacos with auth, mutex with username/password attribute #accessKey = "" #secretKey = "" } eureka { serviceUrl = "http://localhost:8761/eureka" weight = "1" } redis { serverAddr = "localhost:6379" db = "0" password = "" timeout = "0" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { serverAddr = "127.0.0.1:8500" aclToken = "" } etcd3 { serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } custom { name = "" }}config { # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig、custom type = "file" nacos { serverAddr = "127.0.0.1:8848" namespace = "" group = "SEATA_GROUP" username = "" password = "" ##if use MSE Nacos with auth, mutex with username/password attribute #accessKey = "" #secretKey = "" dataId = "seata.properties" } consul { serverAddr = "127.0.0.1:8500"key = "seata.properties" aclToken = "" } apollo { appId = "seata-server" apollometa = "http://192.168.1.204:8801" namespace = "application" apolloAccesskeySecret = "" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" nodePath = "/seata/seata.properties" } etcd3 { serverAddr = "http://localhost:2379" key = "seata.properties" } file { name = "file.conf" } custom { name = "" }}

3.2.4 pom.xml文件

com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery com.alibaba.cloud spring-cloud-starter-alibaba-seata seata-all io.seata io.seata seata-all 1.2.0 org.springframework.cloud spring-cloud-starter-openfeign org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-actuator mysql mysql-connector-java 5.1.37 com.alibaba druid-spring-boot-starter 1.1.10 org.mybatis.spring.boot mybatis-spring-boot-starter 2.0.0 org.springframework.boot spring-boot-starter-test test org.projectlombok lombok true

3.2.5 application.yml文件

server: port: 2001spring: application: name: seata-order-service cloud: alibaba: seata: # 自定义事务组名称需要与seata-server中的对应 tx-service-group: my_test_tx_group #因为seata的file.conf文件中没有service模块,事务组名默认为my_test_tx_group #service要与tx-service-group对齐,vgroupMapping和grouplist在service的下一级,my_test_tx_group在再下一级 service: vgroupMapping: #要和tx-service-group的值一致 my_test_tx_group: default grouplist: # seata seaver的 地址配置,此处可以集群配置是个数组 default: 127.0.0.1:8091 nacos: discovery: server-addr: localhost:8848 datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/seata_order?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8 username: root password: rootfeign: hystrix: enabled: falselogging: level: io: seata:mybatis: mapperLocations: classpath:mapper/*.xml

3.2.6 业务代码编写:加@GlobalTransactional

dao层、mapper层、controller层代码省略

业务代码使用OpenFeign 的方式进行服务间的调用,来实现一个简单的业务功能。AT模式只用一个 @GlobalTransactional 注解即可实现分布式事务。name 属性为事务唯一性表示,可以随意定义。rollbackFor 属性为指定Exception异常才进行事务回滚。

OrderServiceImpl.java

@Service@Slf4jpublic class OrderServiceImpl implements OrderService { @Resource private OrderDao orderDao; @Resource private StorageService storageService; @Resource private AccountService accountService; @Override @GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class) public void create(Order order) { log.info("------->下单开始"); //本应用创建订单 orderDao.create(order); //远程调用库存服务扣减库存 log.info("------->order-service中扣减库存开始"); storageService.decrease(order.getProductId(),order.getCount()); log.info("------->order-service中扣减库存结束"); //远程调用账户服务扣减余额 log.info("------->order-service中扣减余额开始"); accountService.decrease(order.getUserId(),order.getMoney()); log.info("------->order-service中扣减余额结束"); //修改订单状态为已完成 log.info("------->order-service中修改订单状态开始"); orderDao.update(order.getUserId(),0); log.info("------->order-service中修改订单状态结束"); log.info("------->下单结束"); }}

3.2.7 config代码编写

1、MybatisConfig.java

@Configuration@MapperScan({"com.zb.springcloud.dao"})public class MyBatisConfig {}

2、DataSourceProxyConfig.java

package com.zb.springcloud.config;import com.alibaba.druid.pool.DruidDataSource;import io.seata.rm.datasource.DataSourceProxy;import org.apache.ibatis.session.SqlSessionFactory;import org.mybatis.spring.SqlSessionFactoryBean;import org.mybatis.spring.transaction.SpringManagedTransactionFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.support.PathMatchingResourcePatternResolver;import javax.sql.DataSource;@Configurationpublic class DataSourceProxyConfig { @Value("${mybatis.mapperLocations}") private String mapperLocations; @Bean @ConfigurationProperties(prefix = "spring.datasource") public DataSource druidDataSource(){ return new DruidDataSource(); } @Bean public DataSourceProxy dataSourceProxy(DataSource dataSource) { return new DataSourceProxy(dataSource); } @Bean public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(dataSourceProxy); sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations)); sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory()); return sqlSessionFactoryBean.getObject(); }}

3.2.8 主启动类

@EnableDiscoveryClient@EnableFeignClients@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)//取消数据源的自动创建public class SeataOrderMainApp2001 { public static void main(String[] args) { SpringApplication.run(SeataOrderMainApp2001.class, args); }}

3.3 测试

分别启三个服务,可以看到在项目启动过程中,Seata Sever 服务端会有相对应提示

发送请求 http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100 来模拟下单请求。发送多次请求,可以看到下单都成功了,Seata Server 端也有事务在进行处理的过程。(此时数据库中,因为事务处理完成,并没有任何数据)

参考博客:

Spring Cloud Alibabaseata 分布式事务解决方案简介

SpringCloud 整合Seata 解决分布式事务

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。