Published on

DTM分布式事务的二阶段消息模式实现与异常处理全解析

Authors
  • avatar
    Name
    Liant
    Twitter

Dtm简介

DTM是一款开源的分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。

DTM开源项目文档

GitHub地址

一、Dtm 分布式事务 二阶段消息事务

Saga事务只适用于串行的多个接口,接口之间没有调用结果依赖的场景

图片
  1. 在正向操作中查询事务ID的状态是否已经执行过,保证接口的幂等性
  2. 在逆向操作中查询事务ID的状态是否已经执行过,保证接口的幂等性
  3. DTM使用返回的Json是否包含"SUCCESS"字符串来判断接口调用是否成功,在项目中,如果代码执行成功,或者不希望DTM再次重复调用,都应该在返回的Json中包含"SUCCESS"
  4. 项目的执行超时时间应该从网关超时时间->Nginx超时时间->PHP超时时间依次递减,防止网关超时但是脚本还在执行的情况,配合事务ID状态来保证幂等

二阶段消息事务正向请求不允许失败(必须有正确的返回FAILURESUCCESS),没有回滚操作,以下是测试代码

  • 安装依赖

    composer require sett/dtmcli-php
    
  • 二阶段消息业务入口

    <?php
    require_once __DIR__ . "/vendor/autoload.php";
    
    use Medoo\Medoo;
    use Sett\Dtmcli\transaction\MsgTrans;
    
    error_reporting(E_ALL);
    ini_set("display_errors", "on");
    try {
        $trans             = new MsgTrans("127.0.0.1:36789");
        $trans->waitResult = false;
        $gid               = $trans->createNewGid();
        // 开启本地事务
        $trans->withGid($gid)
            ->withOperate("http://127.0.0.1:9999/transIn.php", ["amount" => 30])
            ->doAndSubmit("http://127.0.0.1:9999/query.php", function () use ($gid) {
                $database     = new Medoo([
                    'type'     => 'mysql',
                    'host'     => '127.0.0.1',
                    'database' => 'dtm_busi',
                    'username' => 'root',
                    'password' => 'root'
                ]);
                throw new Exception("xxx");
                $updateBalance = "UPDATE `user` SET balance=balance-30 WHERE  `user_id`=1";
                // 修改用户A余额
                $database->query($updateBalance)->errorCode();
                error_log("【本地逻辑】扣除用户A余额:" . $database->errorInfo[1]);
            });
        // 提交本地事务
    } catch (Exception $exception) {
        // 回滚本地事务
        echo "exception with error " . $exception->getMessage();
    }
    
    // 本地逻辑失败:会执行正向回查
    // 本地逻辑成功、正向接口失败、执行正向回查,最终返回rollback
    // 本地逻辑成功、正向接口超时,DTM超时触发正向回查,最终返回rollback
    
  • 二阶段消息正向处理接口

    <?php
    require_once __DIR__ . "/vendor/autoload.php";
    
    use Medoo\Medoo;
    
    error_reporting(E_ALL);
    ini_set("display_errors", "on");
    
    // 用户B账户加余额
    $database      = new Medoo([
        'type'     => 'mysql',
        'host'     => '127.0.0.1',
        'database' => 'dtm_busi',
        'username' => 'root',
        'password' => 'root'
    ]);
    error_log("abc");
    $updateBalance = "UPDATE `user` SET balance=balance+30 WHERE  `user_id`=2";
    $database->query($updateBalance)->errorCode();
    error_log("【正向接口】增加用户B余额:" .  $database->errorInfo[1]);
    
    $params   = $_GET;
    $current      = time();
    $insertIgnore = "INSERT IGNORE INTO `local_msg_trans` VALUES(null ,'msg','{$params["gid"]}','','00','','committed',{$current},{$current})";
    // 插入事务记录到数据表
    $database->query($insertIgnore)->errorCode();
    error_log("【正向接口】插入数据库事务记录:" . $database->errorInfo[1]);
    
    // 返回成功
    echo json_encode(["code" => 200, "data" => [], "msg" => "SUCCESS"]);
    
    
  • 二阶段消息正向查询接口

    <?php
    
    use Medoo\Medoo;
    
    require_once __DIR__ . "/vendor/autoload.php";
    
    error_reporting(E_ALL);
    ini_set("display_errors", "on");
    
    // 新增一条事务记录
    try {
        $database = new Medoo([
            'type'     => 'mysql',
            'host'     => '127.0.0.1',
            'database' => 'dtm_busi',
            'username' => 'root',
            'password' => 'root'
        ]);
        error_log("456");
        $params   = $_GET;
        error_log("【正向回查】回查接口请求参数:" . json_encode($params, 256));
        $current      = time();
        $insertIgnore = "INSERT IGNORE INTO `local_msg_trans` VALUES(null ,'msg','{$params["gid"]}','','00','','rollback',{$current},{$current})";
        // 插入事务记录到数据表
        $database->query($insertIgnore);
        error_log("【正向回查】插入数据库事务记录:" . $database->errorInfo[1]);
        // 通过gid查询事务记录
        $trans = $database->get("local_msg_trans", "*", ["gid" => $params["gid"]]);
        // 如果事务不存在,说明整个事务都没有被记录,返回失败
        if (!$trans) {
            error_log("【正向回查】数据库事务记录不存在:返回失败");
            echo json_encode(["code" => 200, "data" => [], "msg" => "FAILURE"]);
            return;
        }
        // 如果原因是rollback,说明第一次事务没有被记录,返回失败
        if ($trans["reason"] == "rollback") {
            error_log("【正向回查】数据库事务记录结果:rollback");
            echo json_encode(["code" => 200, "data" => [], "msg" => "FAILURE"]);
            return;
        }
        // 如果原因是rollback,说明第一次事务没有被记录,返回失败
        if ($trans["reason"] == "ongoing") {
            error_log("【正向回查】数据库事务记录结果:ongoing");
            echo json_encode(["code" => 200, "data" => [], "msg" => "ONGOING"]);
            return;
        }
        error_log("【正向回查】数据库事务记录正常:返回成功");
        echo json_encode(["code" => 200, "data" => [], "msg" => "SUCCESS"]);
    } catch (Throwable $throwable) {
        error_log("【正向回查】代码执行异常:".$throwable->getMessage());
        var_dump($throwable->getTraceAsString());
        echo json_encode(["code" => 200, "data" => [], "msg" => "FAILURE"]);
    }