• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Seata TCC 模式理论学习、生产级使用展示搭建和注意事项 | Spring Cloud55

武飞扬头像
gmHappy
帮助1

一、前言

通过以下系列章节:

docker-compose 实现Seata Server高可用部署 | Spring Cloud 51

Seata AT 模式理论学习、事务隔离及部分源码解析 | Spring Cloud 52

Spring Boot集成Seata利用AT模式分布式事务示例 | Spring Cloud 53

Seata XA 模式理论学习、使用及注意事项 | Spring Cloud54

我们对Seata及其AT事务模式、XA事务模式的理论、使用有了深入的了解,今天继续对SeataTCC事务模式进行理论学习;并区别与官网,我们利用openfeign进行生产级示例搭建,降低入门难度。

理论部分来自Seata官网:http://seata.io/zh-cn/docs/dev/mode/tcc-mode.html

二、整体机制

回顾前面章节的学习:一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:

  • 一阶段 prepare 行为
  • 二阶段 commitrollback 行为

学新通
上面这个流程中,一共涉及到了三个方法,preparecommit 以及 rollback,这三个方法都完全是用户自定义的方法,都是需要我们自己来实现的。相较于 AT 事务模式 TCC 这种模式其实是不依赖于底层数据库的事务支持的。

三、示例说明

这是一个商品下单的案例,一共有四个服务和一个公共模块:

  • account-tcc:账户服务,可以查询/修改用户的账户信息
  • order-tcc:订单服务,可以下订单。
  • storage-tcc:仓储服务,可以查询/修改商品的库存数量。
  • bussiness-tcc:业务服务,用户下单操作将在这里完成。
  • common-tcc:公共模块,包含:实体类、openfeign接口、统一异常处理等。

具体业务吊牌逻辑如下:

学新通

四、数据库设计

4.1 账户表

account-tcc 账户服务对应账户表:t_account

-- ----------------------------
-- Table structure for t_account
-- ----------------------------
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE `t_account`  (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '用户ID',
  `money` decimal(10, 2) NULL DEFAULT 0.00 COMMENT '账户余额',
  `freeze_money` decimal(10, 2) NULL DEFAULT 0.00 COMMENT '冻结金额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of t_account
-- ----------------------------
INSERT INTO `t_account` VALUES (1, 'user1', 500.00, 0.00);

SET FOREIGN_KEY_CHECKS = 1;

4.2 仓储表

storage-tcc 仓储服务对应账户表:t_storage

-- ----------------------------
-- Table structure for t_storage
-- ----------------------------
DROP TABLE IF EXISTS `t_storage`;
CREATE TABLE `t_storage`  (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
  `count` int NULL DEFAULT 0,
  `freeze_count` int NULL DEFAULT 0,
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE INDEX `commodity_code`(`commodity_code` ASC) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of t_storage
-- ----------------------------
INSERT INTO `t_storage` VALUES (1, 'iphone', 6, 0);

SET FOREIGN_KEY_CHECKS = 1;

4.3 订单表

order-tcc 订单服务对应账户表:t_order

-- ----------------------------
-- Table structure for t_order
-- ----------------------------
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order`  (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
  `commodity_code` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL,
  `count` int NULL DEFAULT 0,
  `money` decimal(10, 2) NULL DEFAULT 0.00,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci ROW_FORMAT = Dynamic;

4.4 事务控制表

请见:Seata TCC 模式下解决幂等、悬挂、空回滚问题 | Spring Cloud56

CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(
    `xid`           VARCHAR(128)  NOT NULL COMMENT 'global id',
    `branch_id`     BIGINT        NOT NULL COMMENT 'branch id',
    `action_name`   VARCHAR(64)   NOT NULL COMMENT 'action name',
    `status`        TINYINT       NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
    `gmt_create`    DATETIME(3)   NOT NULL COMMENT 'create time',
    `gmt_modified`  DATETIME(3)   NOT NULL COMMENT 'update time',
    PRIMARY KEY (`xid`, `branch_id`),
    KEY `idx_gmt_modified` (`gmt_modified`),
    KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

五、示例搭建

5.1 项目总体结构

学新通
学新通
学新通

5.2 common-tcc 搭建

5.2.1 实体类

com/gm/seata/openfeign/entity/Account.java

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.math.BigDecimal;

@Data
@TableName("t_account")
public class Account {

    @TableId(type = IdType.ASSIGN_ID)
    private long id;

    private String userId;

    private BigDecimal money;

    private BigDecimal freezeMoney;
}

com/gm/seata/openfeign/entity/Order.java

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.math.BigDecimal;

@Data
@TableName("t_order")
public class Order {

    @TableId(type = IdType.ASSIGN_ID)
    private long id;

    private String userId;

    private String commodityCode;

    private int count;

    private BigDecimal money;
}

com/gm/seata/openfeign/entity/Storage.java

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.math.BigDecimal;

@Data
@TableName("t_order")
public class Order {

    @TableId(type = IdType.ASSIGN_ID)
    private long id;

    private String userId;

    private String commodityCode;

    private int count;

    private BigDecimal money;
}

5.2.2 feign接口

com/gm/seata/openfeign/feign/AccountServiceApi.java

import com.gm.seata.openfeign.entity.Account;
import com.gm.seata.openfeign.util.R;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import java.math.BigDecimal;

@FeignClient(value = "account-tcc")
public interface AccountServiceApi {

    /**
     * 扣除账户余额
     * @param userId
     * @param money
     * @return
     */
    @RequestMapping(value = "deduct", method = RequestMethod.GET)
    R<Boolean> deduct(@RequestParam("userId") String userId, @RequestParam("money") BigDecimal money);
}

com/gm/seata/openfeign/feign/OrderServiceApi.java

import com.gm.seata.openfeign.util.R;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(value = "order-tcc")
public interface OrderServiceApi {

    /**
     * 创建订单
     * @param userId
     * @param commodityCode
     * @param count
     * @return
     */
    @RequestMapping(value = "createOrder", method = RequestMethod.GET)
    R<Boolean> createOrder(@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count);
}

com/gm/seata/openfeign/feign/StorageServiceApi.java

import com.gm.seata.openfeign.entity.Storage;
import com.gm.seata.openfeign.util.R;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;

@FeignClient(value = "storage-tcc")
public interface StorageServiceApi {

    /**
     * 扣除库存
     * @param commodityCode
     * @param count
     * @return
     */
    @RequestMapping(value = "deduct", method = RequestMethod.GET)
    R<Boolean> deduct(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count);
}

5.2.3 feign 微服务调用异码处理

com/gm/seata/openfeign/handle/FeignErrorDecoder.java

import com.alibaba.fastjson.JSONObject;
import com.gm.seata.openfeign.util.ErrorEnum;
import feign.Response;
import feign.RetryableException;
import feign.Util;
import feign.codec.ErrorDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;

import java.nio.charset.Charset;

@Slf4j
@Configuration
public class FeignErrorDecoder extends ErrorDecoder.Default {

    @Override
    public Exception decode(String methodKey, Response response) {
        try {
            // 可以自定义一些逻辑
            String message = Util.toString(response.body().asReader(Charset.forName("utf8")));
            JSONObject jsonObject = JSONObject.parseObject(message);
            int code = jsonObject.getInteger("code");
            ErrorEnum errorEnum = ErrorEnum.getEnumByCode(code);
            // 包装成自己自定义的异常
            return new RuntimeException(String.valueOf(errorEnum.getCode()));
        } catch (Exception e) {
            log.error("非已知异常", e.getMessage(), e);
        }

        Exception exception = super.decode(methodKey, response);
        // 如果是RetryableException,则返回继续重试
        if (exception instanceof RetryableException) {
            return exception;
        }
        return new RuntimeException(String.valueOf(ErrorEnum.UNKNOWN_EXCEPTION.getCode()));
    }
}

5.2.4 Controller 统一异常处理

com/gm/seata/openfeign/handle/GlobalBizExceptionHandler.java

import com.gm.seata.openfeign.util.ErrorEnum;
import com.gm.seata.openfeign.util.R;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestControllerAdvice;

/**
 * 全局异常处理器
 */
@Slf4j
@Order(10000)
@RestControllerAdvice
public class GlobalBizExceptionHandler {

    /**
     * 全局异常.
     *
     * @param e the e
     * @return R
     */
    @ExceptionHandler(Exception.class)
    @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
    public R handleGlobalException(Exception e) {
        log.error("全局异常信息 ex={}", e.getMessage(), e);
        R r = null;
        // 根据异常信息与已知异常进行匹配
        try {
            int code = Integer.parseInt(e.getLocalizedMessage());
            ErrorEnum errorEnum = ErrorEnum.getEnumByCode(code);
            if (errorEnum != null) {
                r = R.restResult(null, errorEnum.getCode(), errorEnum.getTitle());
            }
        } finally {
            if (e instanceof feign.FeignException) {
                ErrorEnum errorEnum = ErrorEnum.UNKNOWN_EXCEPTION;
                r = R.restResult(null, errorEnum.getCode(), errorEnum.getTitle());
            }
            if (r == null) {
                r = R.failed(e.getLocalizedMessage());
            }
        }
        return r;
    }
}

5.2.5 已知异常枚举类

com/gm/seata/openfeign/util/ErrorEnum.java

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public enum ErrorEnum {

    NO_SUCH_COMMODITY(3000, "无此商品"),
    STORAGE_LOW_PREPARE(3001, "库存不足,预扣库存失败"),
    STORAGE_LOW_COMMIT(3002, "库存不足,扣库存失败"),
    NO_SUCH_ACCOUNT(4000, "无此账户"),
    ACCOUNT_LOW_PREPARE(4001, "余额不足,预扣款失败"),
    ACCOUNT_LOW_COMMIT(4002, "余额不足,扣款失败"),
    UNKNOWN_EXCEPTION(9999, "远程方法调用异常");

    private final Integer code;
    private final String title;

    public static ErrorEnum getEnumByCode(int code) {
        for (ErrorEnum error : ErrorEnum.values()) {
            if (error.getCode().equals(code)) {
                return error;
            }
        }
        return null;
    }
}

5.2.6 响应信息结构体

com/gm/seata/openfeign/util/R.java

import lombok.*;
import lombok.experimental.Accessors;
import java.io.Serializable;

/**
 * 响应信息主体
 *
 */
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class R<T> implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 成功标记
     */
    private static final Integer SUCCESS = 0;

    /**
     * 失败标记
     */
    private static final Integer FAIL = 1;

    @Getter
    @Setter
    private int code;

    @Getter
    @Setter
    private String msg;

    @Getter
    @Setter
    private T data;

    public static <T> R<T> ok() {
        return restResult(null, SUCCESS, null);
    }

    public static <T> R<T> ok(T data) {
        return restResult(data, SUCCESS, null);
    }

    public static <T> R<T> ok(T data, String msg) {
        return restResult(data, SUCCESS, msg);
    }

    public static <T> R<T> failed() {
        return restResult(null, FAIL, null);
    }

    public static <T> R<T> failed(String msg) {
        return restResult(null, FAIL, msg);
    }

    public static <T> R<T> failed(T data) {
        return restResult(data, FAIL, null);
    }

    public static <T> R<T> failed(T data, String msg) {
        return restResult(data, FAIL, msg);
    }

    public static <T> R<T> restResult(T data, int code, String msg) {
        R<T> apiResult = new R<>();
        apiResult.setCode(code);
        apiResult.setData(data);
        apiResult.setMsg(msg);
        return apiResult;
    }
}

5.2.7 自动配置实现

src/main/resources/META-INF/spring路径下新建文件org.springframework.boot.autoconfigure.AutoConfiguration.imports内容如下:

com.gm.seata.openfeign.handle.GlobalBizExceptionHandler
com.gm.seata.openfeign.handle.FeignErrorDecoder

新建文件org.springframework.cloud.openfeign.FeignClient.imports内容如下:

com.gm.seata.openfeign.feign.AccountServiceApi
com.gm.seata.openfeign.feign.OrderServiceApi
com.gm.seata.openfeign.feign.StorageServiceApi

通过上述方式实现自动配置。

5.3 account-tcc 搭建

5.3.1 完整依赖

seata/openfeign-tcc/account-tcc/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>openfeign-tcc</artifactId>
        <groupId>com.gm</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>account-tcc</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.gm</groupId>
            <artifactId>common-tcc</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>


        <!-- 注意一定要引入对版本,要引入spring-cloud版本seata,而不是springboot版本的seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <!-- 排除掉springcloud默认的seata版本,以免版本不一致出现问题-->
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-all</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- 上面排除掉了springcloud默认色seata版本,此处引入和seata-server版本对应的seata包-->
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.6.1</version>
        </dependency>

        <!--<dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
        </dependency>-->
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.3.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

5.3.2 配置文件

src/main/resources/bootstrap.yml

server:
  port: 3011

spring:
  application:
    name: @artifactId@
  cloud:
    nacos:
      username: @nacos.username@
      password: @nacos.password@
      discovery:
        server-addr: ${NACOS_HOST:nacos1.kc}:${NACOS_PORT:8848}
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.0.46:3306/seata-tcc?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&allowMultiQueries=true&serverTimezone=Asia/Shanghai
    username: root
    password: '1qaz@WSX'
seata:
  # 是否开启spring-boot自动装配,seata-spring-boot-starter 专有配置,默认true
  enabled: true
  # 是否开启数据源自动代理,seata-spring-boot-starter专有配置,默认会开启数据源自动代理,可通过该配置项关闭
  enable-auto-data-source-proxy: true
  # 配置自定义事务组名称,需与下方server.vgroupMapping配置一致,程序会通过用户配置的配置中心去寻找service.vgroupMapping
  tx-service-group: mygroup
  config: # 从nacos配置中心获取client端配置
    type: nacos
    nacos:
      server-addr: ${NACOS_HOST:nacos1.kc}:${NACOS_PORT:8848}
      group : DEFAULT_GROUP
      namespace: a4c150aa-fd09-4595-9afe-c87084b22105
      dataId: seataServer.properties
      username: @nacos.username@
      password: @nacos.username@
  registry: # 通过服务中心通过服务发现获取seata-server服务地址
    type: nacos
    nacos:
      # 注:客户端注册中心配置的serverAddr和namespace与Server端一致,clusterName与Server端cluster一致
      application: seata-server # 此处与seata-server的application一致,才能通过服务发现获取服务地址
      group : DEFAULT_GROUP
      server-addr: ${NACOS_HOST:nacos1.kc}:${NACOS_PORT:8848}
      userName: @nacos.username@
      password: @nacos.username@
      namespace: a4c150aa-fd09-4595-9afe-c87084b22105
  service:
    # 应用程序(客户端)会通过用户配置的配置中心去寻找service.vgroupMapping.[事务分组配置项]
    vgroup-mapping:
      # 事务分组配置项[mygroup]对应的值为TC集群名[default],与Seata-Server中的seata.registry.nacos.cluster配置一致
      mygroup : default
    # 全局事务开关,默认false。false为开启,true为关闭
    disable-global-transaction: false
  client:
    rm:
      report-success-enable: true
management:
  endpoints:
    web:
      exposure:
        include: '*'

logging:
  level:
    io.seata: debug

# mybatis-plus配置控制台打印完整带参数SQL语句
mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

5.3.3 功能搭建

5.3.3.1 启动类

com/gm/seata/openfeign/AccountTCCApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDiscoveryClient
public class AccountTCCApplication {

    public static void main(String[] args) {
        SpringApplication.run(AccountTCCApplication.class, args);
    }
}
5.3.3.2 Mapper类

com/gm/seata/openfeign/mapper/AccountMapper.java

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.gm.seata.openfeign.entity.Account;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

@Mapper
public interface AccountMapper extends BaseMapper<Account> {

    @Select("SELECT * FROM t_account WHERE user_id = #{userId} limit 1")
    Account getAccountByUserId(@Param("userId") String userId);
}

5.3.3.3 Service类

业务重点来了,敲黑板!!!

com/gm/seata/openfeign/service/AccountService.java

import com.gm.seata.openfeign.entity.Account;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import java.math.BigDecimal;

@LocalTCC
public interface AccountService {

    /**
     * 执行资源检查及预业务操作
     */
    // @BusinessActionContextParameter 注解就是将对应的参数放入到 BusinessActionContext 中,将来可以从 BusinessActionContext 中取出对应的参数。
    @TwoPhaseBusinessAction(name = "accountService", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true)
    boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "money") BigDecimal money);

    /**
     * 全局事物进行提交
     */
    boolean commit(BusinessActionContext actionContext);

    /**
     * 全局事务进行回滚
     */
    boolean rollback(BusinessActionContext actionContext);
}
  • 首先接口的定义上,需要加一个注解 @LocalTCC,这个表示开启 Seata 中的 TCC 模式。
  • 然后就是 @TwoPhaseBusinessAction 注解,两阶段提交的注解,这个注解有三个属性,第一个 name 就是处理两阶段提交的 bean 的名字,其实就是当前 bean 的名字,当前类名首字母小写。两阶段第一阶段就是 prepare 阶段,也就是预处理阶段 。@TwoPhaseBusinessAction 注解所在的方法,第二阶段则分为两种情况,提交或者回滚,分别对应了两个不同的方法,commitMethodrollbackMethod 就指明了相应的方法。
  • 一阶段的 prepare 需要开发者手动调用,二阶段的 commit 或者 rollback 则是系统自动调用。prepare 中的方法是由开发者来传递的,而在二阶段的方法中,相关的参数我们需要从 BusinessActionContext 中获取,@BusinessActionContextParameter 注解就是将对应的参数放入到 BusinessActionContext 中(注意需要给每一个参数取一个名字),将来可以从 BusinessActionContext 中取出对应的参数。
  • 通过在@TwoPhaseBusinessAction 注解中设置useTCCFence = true解决幂等、悬挂、空回滚问题
  • 另外需要注意,接口的返回值设计成 boolean,用以表示相应的操作执行成功还是失败,返回 false 表示执行失败,默认会有重试机制进行重试。

com/gm/seata/openfeign/service/impl/AccountServiceImpl.java

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.gm.seata.openfeign.entity.Account;
import com.gm.seata.openfeign.mapper.AccountMapper;
import com.gm.seata.openfeign.service.AccountService;
import com.gm.seata.openfeign.util.ErrorEnum;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;

@Slf4j
@Service
public class AccountServiceImpl implements AccountService {

    @Autowired
    AccountMapper accountMapper;

    /**
     * 预扣款阶段,检查账户余额
     *
     * @param userId
     * @param money
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean prepare(BusinessActionContext actionContext, String userId, BigDecimal money) {
        Account account = accountMapper.getAccountByUserId(userId);
        if (account == null) {
            //throw new RuntimeException("账户不存在");
            throw new RuntimeException(String.valueOf(ErrorEnum.NO_SUCH_ACCOUNT.getCode()));
        }
        // 账户余额 与 本次消费金额进行 比较
        if (account.getMoney().compareTo(money) < 0) {
            //throw new RuntimeException("余额不足,预扣款失败");
            throw new RuntimeException(String.valueOf(ErrorEnum.ACCOUNT_LOW_PREPARE.getCode()));
        }
        account.setFreezeMoney(account.getFreezeMoney().add(money));
        account.setMoney(account.getMoney().subtract(money));

        QueryWrapper query = new QueryWrapper();
        query.eq("user_id", userId);
        Integer i = accountMapper.update(account, query);
        log.info("{} 账户预扣款 {} 元", userId, money);
        return i == 1;
    }

    /**
     * 实际扣款阶段
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean commit(BusinessActionContext actionContext) {
        String userId = (String) actionContext.getActionContext("userId");
        BigDecimal money = new BigDecimal(actionContext.getActionContext("money").toString());

        Account account = accountMapper.getAccountByUserId(userId);
        // 账户冻结金额 与 本次消费金额进行 比较
        if (account.getFreezeMoney().compareTo(money) < 0) {
            // 抛出指定异常
            throw new RuntimeException(String.valueOf(ErrorEnum.ACCOUNT_LOW_COMMIT.getCode()));
        }
        account.setFreezeMoney(account.getFreezeMoney().subtract(money));

        QueryWrapper query = new QueryWrapper();
        query.eq("user_id", userId);
        Integer i = accountMapper.update(account, query);
        log.info("{} 账户扣款 {} 元", userId, money);
        return i == 1;
    }

    /**
     * 账户回滚阶段
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean rollback(BusinessActionContext actionContext) {
        String userId = (String) actionContext.getActionContext("userId");
        BigDecimal money = new BigDecimal(actionContext.getActionContext("money").toString());

        Account account = accountMapper.getAccountByUserId(userId);

        if (account.getFreezeMoney().compareTo(money) >= 0) {
            account.setFreezeMoney(account.getFreezeMoney().subtract(money));
            account.setMoney(account.getMoney().add(money));

            QueryWrapper query = new QueryWrapper();
            query.eq("user_id", userId);
            Integer i = accountMapper.update(account, query);

            log.info("{} 账户释放冻结金额 {} 元", userId, money);
            return i == 1;
        }
        log.info("{} 账户资金已释放", userId);
        // 说明prepare中抛出异常,未冻结资金
        return true;
    }
}

5.3.3.4 Controller类

com/gm/seata/openfeign/controller/AccountController.java

import com.gm.seata.openfeign.service.AccountService;
import com.gm.seata.openfeign.util.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.math.BigDecimal;

@RestController
public class AccountController {

    @Autowired
    AccountService accountService;

    /**
     * 扣除账户余额
     *
     * @param userId
     * @param money
     * @return
     */
    @RequestMapping(value = "deduct", method = RequestMethod.GET)
    public R<Boolean> deduct(@RequestParam("userId") String userId, @RequestParam("money") BigDecimal money) {
        return R.ok(accountService.prepare(null, userId, money));
    }
}

5.4 storage-tcc 搭建

5.4.1 完整依赖

减少重复内容,请参考 5.3.1 部分,自动修改

5.4.2 配置文件

减少重复内容,请参考 5.3.2 部分,自动修改

5.4.3 功能搭建

5.4.3.1 启动类

com/gm/seata/openfeign/StorageTCCApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@SpringBootApplication
@EnableDiscoveryClient
public class StorageTCCApplication {

    public static void main(String[] args) {
        SpringApplication.run(StorageTCCApplication.class, args);
    }
}
5.4.3.2 Mapper类

com/gm/seata/openfeign/mapper/StorageMapper.java

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.gm.seata.openfeign.entity.Storage;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

@Mapper
public interface StorageMapper extends BaseMapper<Storage> {

    @Select("SELECT * FROM t_storage WHERE commodity_code = #{commodityCode} limit 1")
    Storage getStorageByCommodityCode(@Param("commodityCode") String commodityCode);
}
5.4.3.3 Service类

com/gm/seata/openfeign/service/StorageService.java

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

@LocalTCC
public interface StorageService {

    /**
     * 执行资源检查及预业务操作
     */
    // @BusinessActionContextParameter 注解就是将对应的参数放入到 BusinessActionContext 中,将来可以从 BusinessActionContext 中取出对应的参数。
    @TwoPhaseBusinessAction(name = "storageService", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true)
    boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "commodityCode") String commodityCode, @BusinessActionContextParameter(paramName = "count") Integer  count);

    /**
     * 全局事物进行提交
     */
    boolean commit(BusinessActionContext actionContext);

    /**
     * 全局事务进行回滚
     */
    boolean rollback(BusinessActionContext actionContext);
}

减少重复内容,请参考 5.3.3 部分说明

com/gm/seata/openfeign/service/impl/StorageServiceImpl.java

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.gm.seata.openfeign.entity.Storage;
import com.gm.seata.openfeign.mapper.StorageMapper;
import com.gm.seata.openfeign.service.StorageService;
import com.gm.seata.openfeign.util.ErrorEnum;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
public class StorageServiceImpl implements StorageService {

    @Autowired
    StorageMapper storageMapper;

    /**
     * 扣除商品库存预处理阶段,进行商品库存冻结
     *
     * @param commodityCode
     * @param count
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean prepare(BusinessActionContext actionContext, String commodityCode, Integer count) {
        Storage storage = storageMapper.getStorageByCommodityCode(commodityCode);
        if (storage == null) {
            //throw new RuntimeException("商品不存在");
            throw new RuntimeException(String.valueOf(ErrorEnum.NO_SUCH_COMMODITY.getCode()));
        }
        if (storage.getCount() < count) {
            //throw new RuntimeException("库存不足,预扣库存失败");
            throw new RuntimeException(String.valueOf(ErrorEnum.STORAGE_LOW_PREPARE.getCode()));
        }
        storage.setFreezeCount(storage.getFreezeCount()   count);
        storage.setCount(storage.getCount() - count);

        QueryWrapper query = new QueryWrapper();
        query.eq("commodity_code", commodityCode);
        Integer i = storageMapper.update(storage, query);
        log.info("{} 商品库存冻结 {} 个", commodityCode, count);
        return i == 1;
    }


    /**
     * 扣除商品库存提交阶段,进行商品库存扣除
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean commit(BusinessActionContext actionContext) {
        String commodityCode = (String) actionContext.getActionContext("commodityCode");
        Integer count = (Integer) actionContext.getActionContext("count");

        Storage storage = storageMapper.getStorageByCommodityCode(commodityCode);
        if (storage.getFreezeCount() < count) {
            //throw new RuntimeException("库存不足,扣库存失败");
            throw new RuntimeException(String.valueOf(ErrorEnum.STORAGE_LOW_COMMIT.getCode()));
        }
        storage.setFreezeCount(storage.getFreezeCount() - count);

        QueryWrapper query = new QueryWrapper();
        query.eq("commodity_code", commodityCode);
        int i = storageMapper.update(storage, query);
        log.info("{} 商品库存扣除 {} 个", commodityCode, count);
        return i == 1;
    }


    /**
     * 扣除商品库存回滚阶段
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean rollback(BusinessActionContext actionContext) {
        String commodityCode = (String) actionContext.getActionContext("commodityCode");
        Integer count = (Integer) actionContext.getActionContext("count");
        Storage storage = storageMapper.getStorageByCommodityCode(commodityCode);
        if (storage.getFreezeCount() >= count) {
            storage.setFreezeCount(storage.getFreezeCount() - count);
            storage.setCount(storage.getCount()   count);

            QueryWrapper query = new QueryWrapper();
            query.eq("commodity_code", commodityCode);
            int i = storageMapper.update(storage, query);
            log.info("{} 商品释放库存 {} 个", commodityCode, count);
            return i == 1;
        }
        // 说明 prepare 阶段就没有冻结
        return true;
    }
}
5.4.3.4 Controller类

com/gm/seata/openfeign/controller/StorageController.java

import com.gm.seata.openfeign.service.StorageService;
import com.gm.seata.openfeign.util.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class StorageController {

    @Autowired
    StorageService storageService;

    /**
     * 扣除商品库存
     *
     * @param commodityCode
     * @param count
     * @return
     */
    @RequestMapping(value = "deduct", method = RequestMethod.GET)
    public R<Boolean> deduct(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) {
        return R.ok(storageService.prepare(null, commodityCode, count));
    }
}

5.5 order-tcc 搭建

5.5.1 完整依赖

减少重复内容,请参考 5.3.1 部分,自动修改

5.5.2 配置文件

减少重复内容,请参考 5.3.2 部分,自动修改

5.5.3 功能搭建

5.5.3.1 启动类

com/gm/seata/openfeign/OrderTCCApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients("com.gm.seata.openfeign.feign")
public class OrderTCCApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderTCCApplication.class, args);
    }
}
5.5.3.2 Mapper类

com/gm/seata/openfeign/mapper/OrderMapper.java

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.gm.seata.openfeign.entity.Order;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface OrderMapper extends BaseMapper<Order> {

}
5.5.3.3 Service类

com/gm/seata/openfeign/service/OrderService.java

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

@LocalTCC
public interface OrderService {

    /**
     * 执行资源检查及预业务操作
     */
    // @BusinessActionContextParameter 注解就是将对应的参数放入到 BusinessActionContext 中,将来可以从 BusinessActionContext 中取出对应的参数。
    @TwoPhaseBusinessAction(name = "orderService", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true)
    boolean prepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "userId") String userId, @BusinessActionContextParameter(paramName = "commodityCode") String commodityCode, @BusinessActionContextParameter(paramName = "count") Integer count);

    /**
     * 全局事物进行提交
     */
    boolean commit(BusinessActionContext actionContext);

    /**
     * 全局事务进行回滚
     */
    boolean rollback(BusinessActionContext actionContext);
}

减少重复内容,请参考 5.3.3 部分说明

com/gm/seata/openfeign/service/impl/OrderServiceImpl.java

import com.gm.seata.openfeign.entity.Order;
import com.gm.seata.openfeign.feign.AccountServiceApi;
import com.gm.seata.openfeign.mapper.OrderMapper;
import com.gm.seata.openfeign.service.OrderService;
import com.gm.seata.openfeign.util.R;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;

@Slf4j
@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    OrderMapper orderMapper;

    @Autowired
    AccountServiceApi accountServiceApi;

    /**
     * 创建商品订单预处理阶段,扣除账户余额
     *
     * @param commodityCode
     * @param count
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean prepare(BusinessActionContext actionContext, String userId, String commodityCode, Integer count) {
        //先去扣款,假设每个产品100块钱
        R<Boolean> result = null;
        try {
            result = accountServiceApi.deduct(userId, new BigDecimal(count * 100.0));
        } catch (Exception e) {
            e.printStackTrace();
            // 远程方法调用失败
            throw new RuntimeException(e.getMessage());
        }

        log.info("{} 用户购买的 {} 商品共计 {} 件,预下单成功", userId, commodityCode, count);

        return result.getData();
    }

    /**
     * 创建商品订单提交阶段,创建订单记录
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean commit(BusinessActionContext actionContext) {
        String userId = (String) actionContext.getActionContext("userId");
        String commodityCode = (String) actionContext.getActionContext("commodityCode");
        Integer count = (Integer) actionContext.getActionContext("count");

        Order order = new Order();
        order.setCount(count);
        order.setCommodityCode(commodityCode);
        order.setUserId(userId);
        order.setMoney(new BigDecimal(count * 100.0));

        int i = orderMapper.insert(order);
        log.info("{} 用户购买的 {} 商品共计 {} 件,下单成功", userId, commodityCode, count);
        return i == 1;

    }

    /**
     * 创建商品订单回滚阶段,暂无业务操作
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean rollback(BusinessActionContext actionContext) {
        String userId = (String) actionContext.getActionContext("userId");
        String commodityCode = (String) actionContext.getActionContext("commodityCode");
        Integer count = (Integer) actionContext.getActionContext("count");
        log.info("{} 用户购买的 {} 商品共计 {} 件,订单回滚成功", userId, commodityCode, count);
        return true;
    }
}
5.5.3.4 Controller类

com/gm/seata/openfeign/controller/OrderController.java

import com.gm.seata.openfeign.service.OrderService;
import com.gm.seata.openfeign.util.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class OrderController {

    @Autowired
    OrderService orderService;

    /**
     * 创建订单
     *
     * @param userId
     * @param commodityCode
     * @param count
     * @return
     */
    @RequestMapping(value = "createOrder", method = RequestMethod.GET)
    public R<Boolean> createOrder(@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) {
       return R.ok(orderService.prepare(null, userId, commodityCode, count));
    }
}

5.6 business-tcc 搭建

5.6.1 完整依赖

减少重复内容,请参考 5.3.1 部分,自动修改

5.6.2 配置文件

减少重复内容,请参考 5.3.2 部分,自动修改

5.6.3 功能搭建

5.6.3.1 启动类

com/gm/seata/openfeign/BusinessTCCApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients("com.gm.seata.openfeign.feign")
public class BusinessTCCApplication {

    public static void main(String[] args) {
        SpringApplication.run(BusinessTCCApplication.class, args);
    }
}

5.6.3.2 Service类

com/gm/seata/openfeign/service/BusinessService.java

public interface BusinessService {
    void buy(String userId, String commodityCode, Integer count);
}

com/gm/seata/openfeign/service/impl/BusinessServiceImpl.java

import com.gm.seata.openfeign.feign.OrderServiceApi;
import com.gm.seata.openfeign.feign.StorageServiceApi;
import com.gm.seata.openfeign.service.BusinessService;
import io.seata.core.context.RootContext;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class BusinessServiceImpl implements BusinessService {

    @Autowired
    StorageServiceApi storageServiceApi;
    @Autowired
    OrderServiceApi orderServiceApi;

    /**
     * 下单购买,先扣除库存再创建订单
     *
     * @param userId
     * @param commodityCode
     * @param count
     */
    @GlobalTransactional
    public void buy(String userId, String commodityCode, Integer count) {
        String xid = RootContext.getXID();
        log.info("xid={}", xid);

        /**
         * 扣除库存
         */
        // 只有抛出异常,才能触发GlobalTransactional 的回滚逻辑处理
        try {
            storageServiceApi.deduct(commodityCode, count);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }

        /**
         * 创建订单
         */
        try {
            orderServiceApi.createOrder(userId, commodityCode, count);
        } catch (Exception e) {
            // 远程方法调用失败
            throw new RuntimeException(e.getMessage());
        }
    }
}
5.6.3.4 Controller类

com/gm/seata/openfeign/controller/OrderController.java

import com.gm.seata.openfeign.service.BusinessService;
import com.gm.seata.openfeign.util.ErrorEnum;
import com.gm.seata.openfeign.util.R;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class BusinessController {

    @Autowired
    BusinessService businessService;

    /**
     * 商品下单购买
     *
     * @param userId
     * @param commodityCode
     * @param count
     * @return
     */
    @RequestMapping(value = "buy", method = RequestMethod.GET)
    public R<String> buy(@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("count") Integer count) {
        try {
            businessService.buy(userId, commodityCode, count);
            return R.ok("下单成功", "");
        } catch (Exception e) {
            e.printStackTrace();
            int code = Integer.parseInt(e.getMessage());
            return R.restResult("下单失败", code, ErrorEnum.getEnumByCode(code).getTitle());
        }
    }
}

六、示例说明

由第四章节可知:账户余额500,库存6,每件商品单价100元。

请求地址:http://127.0.0.1:4000/buy?userId=user1&count=2&commodityCode=iphone

每请求一次,扣除余额200元,扣除库存2个,已知可正常下单2次

学新通

第三次请求因余额不足,进行全局事务回滚

学新通

以下为仓储服务回滚信息,释放已冻结的2个库存

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgaacfi
系列文章
更多 icon
同类精品
更多 icon
继续加载