- Published on
DTM分布式事务的二阶段消息模式实现与异常处理全解析
- Authors
- Name
- Liant
Dtm简介
DTM是一款开源的分布式事务管理器,解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。
一、Dtm 分布式事务 二阶段消息事务
Saga事务只适用于串行的多个接口,接口之间没有调用结果依赖的场景

- 在正向操作中查询事务ID的状态是否已经执行过,保证接口的幂等性
- 在逆向操作中查询事务ID的状态是否已经执行过,保证接口的幂等性
- DTM使用返回的Json是否包含"SUCCESS"字符串来判断接口调用是否成功,在项目中,如果代码执行成功,或者不希望DTM再次重复调用,都应该在返回的Json中包含"SUCCESS"
- 项目的执行超时时间应该从网关超时时间->Nginx超时时间->PHP超时时间依次递减,防止网关超时但是脚本还在执行的情况,配合事务ID状态来保证幂等
二阶段消息事务正向请求不允许失败(必须有正确的返回FAILURE
或SUCCESS
),没有回滚操作,以下是测试代码
安装依赖
composer require sett/dtmcli-php
二阶段消息业务入口
<?php require_once __DIR__ . "/vendor/autoload.php"; use Medoo\Medoo; use Sett\Dtmcli\transaction\MsgTrans; error_reporting(E_ALL); ini_set("display_errors", "on"); try { $trans = new MsgTrans("127.0.0.1:36789"); $trans->waitResult = false; $gid = $trans->createNewGid(); // 开启本地事务 $trans->withGid($gid) ->withOperate("http://127.0.0.1:9999/transIn.php", ["amount" => 30]) ->doAndSubmit("http://127.0.0.1:9999/query.php", function () use ($gid) { $database = new Medoo([ 'type' => 'mysql', 'host' => '127.0.0.1', 'database' => 'dtm_busi', 'username' => 'root', 'password' => 'root' ]); throw new Exception("xxx"); $updateBalance = "UPDATE `user` SET balance=balance-30 WHERE `user_id`=1"; // 修改用户A余额 $database->query($updateBalance)->errorCode(); error_log("【本地逻辑】扣除用户A余额:" . $database->errorInfo[1]); }); // 提交本地事务 } catch (Exception $exception) { // 回滚本地事务 echo "exception with error " . $exception->getMessage(); } // 本地逻辑失败:会执行正向回查 // 本地逻辑成功、正向接口失败、执行正向回查,最终返回rollback // 本地逻辑成功、正向接口超时,DTM超时触发正向回查,最终返回rollback
二阶段消息正向处理接口
<?php require_once __DIR__ . "/vendor/autoload.php"; use Medoo\Medoo; error_reporting(E_ALL); ini_set("display_errors", "on"); // 用户B账户加余额 $database = new Medoo([ 'type' => 'mysql', 'host' => '127.0.0.1', 'database' => 'dtm_busi', 'username' => 'root', 'password' => 'root' ]); error_log("abc"); $updateBalance = "UPDATE `user` SET balance=balance+30 WHERE `user_id`=2"; $database->query($updateBalance)->errorCode(); error_log("【正向接口】增加用户B余额:" . $database->errorInfo[1]); $params = $_GET; $current = time(); $insertIgnore = "INSERT IGNORE INTO `local_msg_trans` VALUES(null ,'msg','{$params["gid"]}','','00','','committed',{$current},{$current})"; // 插入事务记录到数据表 $database->query($insertIgnore)->errorCode(); error_log("【正向接口】插入数据库事务记录:" . $database->errorInfo[1]); // 返回成功 echo json_encode(["code" => 200, "data" => [], "msg" => "SUCCESS"]);
二阶段消息正向查询接口
<?php use Medoo\Medoo; require_once __DIR__ . "/vendor/autoload.php"; error_reporting(E_ALL); ini_set("display_errors", "on"); // 新增一条事务记录 try { $database = new Medoo([ 'type' => 'mysql', 'host' => '127.0.0.1', 'database' => 'dtm_busi', 'username' => 'root', 'password' => 'root' ]); error_log("456"); $params = $_GET; error_log("【正向回查】回查接口请求参数:" . json_encode($params, 256)); $current = time(); $insertIgnore = "INSERT IGNORE INTO `local_msg_trans` VALUES(null ,'msg','{$params["gid"]}','','00','','rollback',{$current},{$current})"; // 插入事务记录到数据表 $database->query($insertIgnore); error_log("【正向回查】插入数据库事务记录:" . $database->errorInfo[1]); // 通过gid查询事务记录 $trans = $database->get("local_msg_trans", "*", ["gid" => $params["gid"]]); // 如果事务不存在,说明整个事务都没有被记录,返回失败 if (!$trans) { error_log("【正向回查】数据库事务记录不存在:返回失败"); echo json_encode(["code" => 200, "data" => [], "msg" => "FAILURE"]); return; } // 如果原因是rollback,说明第一次事务没有被记录,返回失败 if ($trans["reason"] == "rollback") { error_log("【正向回查】数据库事务记录结果:rollback"); echo json_encode(["code" => 200, "data" => [], "msg" => "FAILURE"]); return; } // 如果原因是rollback,说明第一次事务没有被记录,返回失败 if ($trans["reason"] == "ongoing") { error_log("【正向回查】数据库事务记录结果:ongoing"); echo json_encode(["code" => 200, "data" => [], "msg" => "ONGOING"]); return; } error_log("【正向回查】数据库事务记录正常:返回成功"); echo json_encode(["code" => 200, "data" => [], "msg" => "SUCCESS"]); } catch (Throwable $throwable) { error_log("【正向回查】代码执行异常:".$throwable->getMessage()); var_dump($throwable->getTraceAsString()); echo json_encode(["code" => 200, "data" => [], "msg" => "FAILURE"]); }