???关注微信公众号有福利:
- RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
1. 概述
数据库拆分后,业务上会碰到需要分布式事务的场景。MyCAT 基于 XA 实现分布式事务。国内目前另外一款很火的数据库中间件 Sharding-JDBC 准备基于 TCC 实现分布式事务。
本文内容分成三部分:
- XA 概念简述
- MyCAT 代码如何实现 XA
- MyCAT 在实现 XA 存在的一些缺陷
2. XA 概念
>
X/Open 组织(即现在的 Open Group )定义了分布式事务处理模型。 X/Open DTP 模型( 1994 )包括:- 应用程序( AP )
- 事务管理器( TM )
- 资源管理器( RM )
- 通信资源管理器( CRM )一般,常见的事务管理器( TM )是交易中间件,常见的资源管理器( RM )是数据库,常见的通信资源管理器( CRM )是消息中间件,下图是X/Open DTP模型:
一般的编程方式是这样的:
- 配置 TM ,通过 TM 或者 RM 提供的方式,把 RM 注册到 TM。可以理解为给 TM 注册 RM 作为数据源。一个 TM 可以注册多个 RM。
- AP 从 TM 获取资源管理器的代理(例如:使用JTA接口,从TM管理的上下文中,获取出这个TM所管理的RM的JDBC连接或JMS连接)AP 向 TM 发起一个全局事务。这时,TM 会通知各个 RM。XID(全局事务ID)会通知到各个RM。
- AP 通过 TM 中获取的连接,间接操作 RM 进行业务操作。这时,TM 在每次 AP 操作时把 XID(包括所属分支的信息)传递给 RM,RM 正是通过这个 XID 关联来操作和事务的关系的。
- AP 结束全局事务时,TM 会通知 RM 全局事务结束。开始二段提交,也就是prepare - commit的过程。
XA协议指的是TM(事务管理器)和RM(资源管理器)之间的接口。目前主流的关系型数据库产品都是实现了XA接口的。JTA(Java Transaction API)是符合X/Open DTP模型的,事务管理器和资源管理器之间也使用了XA协议。 本质上也是借助两阶段提交协议来实现分布式事务的,下面分别来看看XA事务成功和失败的模型图:
? 看到这里是不是有种黑人问号的感觉?淡定!我们接下来看 MyCAT 代码层面是如何实现 XA 的。另外,有兴趣对概念了解更多的,可以参看如下文章:
3. MyCAT 代码实现
- MyCAT :TM,协调者。
- 数据节点 :RM,参与者。
3.1 JDBC Demo 代码
public class MyCATXAClientDemo { public static void main(String[] args) throws ClassNotFoundException, SQLException { // 1. 获得数据库连接 Class.forName("com.mysql.jdbc.Driver"); Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:8066/dbtest", "root", "123456"); conn.setAutoCommit(false); // 2. 开启 MyCAT XA 事务 conn.prepareStatement("set xa=on").execute(); // 3. 插入 SQL // 3.1 SQL1 A库 long uid = Math.abs(new Random().nextLong()); String username = UUID.randomUUID().toString(); String password = UUID.randomUUID().toString(); String sql1 = String.format("insert into t_user(id, username, password) VALUES (%d, '%s', '%s')", uid, username, password); conn.prepareStatement(sql1).execute(); // 3.2 SQL2 B库 long orderId = Math.abs(new Random().nextLong()); String nickname = UUID.randomUUID().toString(); String sql2 = String.format("insert into t_order(id, uid, nickname) VALUES(%d, %s, '%s')", orderId, uid, nickname); conn.prepareStatement(sql2).execute(); // 4. 提交 XA 事务 conn.commit(); }}复制代码
set xa=on
MyCAT 开启 XA 事务。conn.commit
提交 XA 事务。
3.2 MyCAT 开启 XA 事务
当 MyCAT 接收到 set xa = on
命令时,开启 XA 事务,并生成 XA 事务编号。XA 事务编号生成算法为 UUID。核心代码如下:
// SetHandler.javapublic static void handle(String stmt, ServerConnection c, int offset) { int rs = ServerParseSet.parse(stmt, offset); switch (rs & 0xff) { // ... 省略代码 case XA_FLAG_ON: { if (c.isAutocommit()) { c.writeErrMessage(ErrorCode.ERR_WRONG_USED, "set xa cmd on can't used in autocommit connection "); return; } c.getSession2().setXATXEnabled(true); c.write(c.writeToBuffer(OkPacket.OK, c.allocate())); break; } case XA_FLAG_OFF: { c.writeErrMessage(ErrorCode.ERR_WRONG_USED, "set xa cmd off not for external use "); return; } // ... 省略代码 }}// NonBlockingSession.javapublic void setXATXEnabled(boolean xaTXEnabled) { if (xaTXEnabled) { if (this.xaTXID == null) { xaTXID = genXATXID(); // ??��获得 XA 事务编号 } } else { this.xaTXID = null; }}private String genXATXID() { return MycatServer.getInstance().getXATXIDGLOBAL();}// MycatServer.javapublic String getXATXIDGLOBAL() { return "'" + getUUID() + "'";}public static String getUUID() { // ??? String s = UUID.randomUUID().toString(); return s.substring(0, 8) + s.substring(9, 13) + s.substring(14, 18) + s.substring(19, 23) + s.substring(24);}复制代码
3.3 MyCAT 接收 SQL
此处 SQL 指的是 insert
、update
、delete
操作。
当向某个数据节点第一次发起 SQL 时,会在 SQL 前面附加 XA START 'xaTranId'
,并设置该数据节点连接事务状态为 TxState.TX_STARTED_STATE
(分布式事务状态,下文会专门整理)。核心代码如下:
// MySQLConnection.javaprivate void synAndDoExecute(String xaTxID, RouteResultsetNode rrn, int clientCharSetIndex, int clientTxIsoLation, boolean clientAutoCommit) { String xaCmd = null; boolean conAutoComit = this.autocommit; String conSchema = this.schema; // never executed modify sql,so auto commit boolean expectAutocommit = !modifiedSQLExecuted || isFromSlaveDB() || clientAutoCommit; if (expectAutocommit == false && xaTxID != null && xaStatus == TxState.TX_INITIALIZE_STATE) { // ??? xaCmd = "XA START " + xaTxID + ';'; this.xaStatus = TxState.TX_STARTED_STATE; } // .... 省略代码 StringBuilder sb = new StringBuilder(); // .... 省略代码 if (xaCmd != null) { sb.append(xaCmd); } // and our query sql to multi command at last sb.append(rrn.getStatement() + ";"); // syn and execute others this.sendQueryCmd(sb.toString());}复制代码
举个 变量sb
的例子:
SET names utf8;SET autocommit=0;XA START '1f2da7353e8846e5833b8d8dd041cfb1','db2';insert into t_user(id, username, password) VALUES (3400, 'b7c5ec1f-11cc-4599-851c-06ad617fec42', 'd2694679-f6a2-4623-a339-48d4a868be90');复制代码
3.4 MySQL 接收 COMMIT
3.4.1 单节点事务 or 多节点事务
COMMIT
执行时,MyCAT 会判断 XA 事务里,涉及到的数据库节点数量。
- 如果节点数量为 1,单节点事务,使用
CommitNodeHandler
处理。 - 如果节点数量 > 1,多节点事务,使用
MultiNodeCoordinator
处理。
CommitNodeHandler
相比 MultiNodeCoordinator
来说,只有一个数据节点,不需要进行多节点协调,逻辑会相对简单,有兴趣的同学可以另外看。我们主要分析 MultiNodeCoordinator
。
3.4.2 协调日志
协调日志,记录协调过程中各数据节点 XA 事务状态,处理MyCAT异常奔溃或者数据节点部分XA COMMIT,另外部分 XA PREPARE下的状态恢复。
XA 事务共有种:
- TX_INITIALIZE_STATE :事务初始化
- TX_STARTED_STATE :事务开始完成
- TX_PREPARED_STATE :事务准备完成
- TX_COMMITED_STATE :事务提交完成
- TX_ROLLBACKED_STATE :事务回滚完成
状态变更流 :TX_INITIALIZE_STATE => TX_STARTED_STATE => TX_PREPARED_STATE => TX_COMMITED_STATE / TX_ROLLBACKED_STATE 。
协调日志包含两个部分:
- CoordinatorLogEntry :协调者日志
- ParticipantLogEntry :参与者日志。此处,数据节点扮演参与者的角色。下文中,可能会出现参与者与数据节点混用的情况,望见谅。
一次 XA 事务,对应一条 CoordinatorLogEntry
。一条CoordinatorLogEntry
包含 N条ParticipantLogEntry
。 核心代码如下:
// CoordinatorLogEntry :协调者日志public class CoordinatorLogEntry implements Serializable { /** * XA 事务编号 */ public final String id; /** * 参与者日志数组 */ public final ParticipantLogEntry[] participants;}// ParticipantLogEntry :参与者日志public class ParticipantLogEntry implements Serializable { /** * XA 事务编号 */ public String coordinatorId; /** * 数据库 uri */ public String uri; /** * 过期描述 */ public long expires; /** * XA 事务状态 */ public int txState; /** * 参与者名字 */ public String resourceName;}复制代码
MyCAT 记录协调日志以 JSON格式 到文件。每行包含一条CoordinatorLogEntry
。举个例子:
{ "id":"'e827b3fe666c4d968961350d19adda31'","participants":[{ "uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db3"},{ "uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db1"}]}{ "id":"'f00b61fa17cb4ec5b8264a6d82f847d0'","participants":[{ "uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db2"},{ "uri":"127.0.0.1","state":"3","expires":0,"resourceName":"db1"}]}复制代码
实现类为:
// XA 协调者日志 存储接口:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/Repository.javapublic interface Repository {}// XA 协调者日志 文件存储:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/impl/FileSystemRepository.javapublic class FileSystemRepository implements Repository {}// XA 协调者日志 文件存储:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/impl/InMemoryRepository.javapublic class InMemoryRepository implements Repository {}复制代码
目前日志文件写入的方式性能较差,这里我们不做分析,在【4. MyCAT 实现缺陷】里一起讲。
3.4.3 MultiNodeCoordinator
敲敲敲,这里是本文的重点之一噢。?
第一阶段:发起 PREPARE。
public void executeBatchNodeCmd(SQLCtrlCommand cmdHandler) { this.cmdHandler = cmdHandler; final int initCount = session.getTargetCount(); runningCount.set(initCount); nodeCount = initCount; failed.set(false); faileCount.set(0); //recovery nodes log ParticipantLogEntry[] participantLogEntry = new ParticipantLogEntry[initCount]; // 执行 int started = 0; for (RouteResultsetNode rrn : session.getTargetKeys()) { if (rrn == null) { continue; } final BackendConnection conn = session.getTarget(rrn); if (conn != null) { conn.setResponseHandler(this); //process the XA_END XA_PREPARE Command MySQLConnection mysqlCon = (MySQLConnection) conn; String xaTxId = null; if (session.getXaTXID() != null) { xaTxId = session.getXaTXID() + ",'" + mysqlCon.getSchema() + "'"; } if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) { // XA 事务 //recovery Log participantLogEntry[started] = new ParticipantLogEntry(xaTxId, conn.getHost(), 0, conn.getSchema(), ((MySQLConnection) conn).getXaStatus()); String[] cmds = new String[]{ "XA END " + xaTxId, // XA END 命令 "XA PREPARE " + xaTxId}; // XA PREPARE 命令 mysqlCon.execBatchCmd(cmds); } else { // 非 XA 事务 // recovery Log participantLogEntry[started] = new ParticipantLogEntry(xaTxId, conn.getHost(), 0, conn.getSchema(), ((MySQLConnection) conn).getXaStatus()); cmdHandler.sendCommand(session, conn); } ++started; } } // xa recovery log if (session.getXaTXID() != null) { CoordinatorLogEntry coordinatorLogEntry = new CoordinatorLogEntry(session.getXaTXID(), false, participantLogEntry); inMemoryRepository.put(session.getXaTXID(), coordinatorLogEntry); fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries()); } if (started < nodeCount) { // TODO 疑问:如何触发 runningCount.set(started); LOGGER.warn("some connection failed to execute " + (nodeCount - started)); /** * assumption: only caused by front-end connection close. * Otherwise, packet must be returned to front-end */ failed.set(true); }}复制代码
- 向各数据节点发送
XA END
+XA PREPARE
指令。举个 变量cmds
例子:
XA END '4cbb18214d0b47adbdb0658598666677','db3';XA PREPARE '4cbb18214d0b47adbdb0658598666677','db3';复制代码
- 记录协调日志。每条参与者日志状态为
TxState.TX_STARTED_STATE
。
第二阶段:发起 COMMIT。
@Overridepublic void okResponse(byte[] ok, BackendConnection conn) { // process the XA Transatcion 2pc commit if (conn instanceof MySQLConnection) { MySQLConnection mysqlCon = (MySQLConnection) conn; switch (mysqlCon.getXaStatus()) { case TxState.TX_STARTED_STATE: //if there have many SQL execute wait the okResponse,will come to here one by one //should be wait all nodes ready ,then send xa commit to all nodes. if (mysqlCon.batchCmdFinished()) { String xaTxId = session.getXaTXID(); String cmd = "XA COMMIT " + xaTxId + ",'" + mysqlCon.getSchema() + "'"; if (LOGGER.isDebugEnabled()) { LOGGER.debug("Start execute the cmd :" + cmd + ",current host:" + mysqlCon.getHost() + ":" + mysqlCon.getPort()); } // recovery log CoordinatorLogEntry coordinatorLogEntry = inMemoryRepository.get(xaTxId); for (int i = 0; i < coordinatorLogEntry.participants.length; i++) { LOGGER.debug("[In Memory CoordinatorLogEntry]" + coordinatorLogEntry.participants[i]); if (coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())) { coordinatorLogEntry.participants[i].txState = TxState.TX_PREPARED_STATE; } } inMemoryRepository.put(xaTxId, coordinatorLogEntry); fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries()); // send commit mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE); mysqlCon.execCmd(cmd); } return; case TxState.TX_PREPARED_STATE: { // recovery log String xaTxId = session.getXaTXID(); CoordinatorLogEntry coordinatorLogEntry = inMemoryRepository.get(xaTxId); for (int i = 0; i < coordinatorLogEntry.participants.length; i++) { if (coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())) { coordinatorLogEntry.participants[i].txState = TxState.TX_COMMITED_STATE; } } inMemoryRepository.put(xaTxId, coordinatorLogEntry); fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries()); // XA reset status now mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE); break; } default: } } // 释放连接 if (this.cmdHandler.relaseConOnOK()) { session.releaseConnection(conn); } else { session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false); } // 是否所有节点都完成commit,如果是,则返回Client 成功 if (this.finished()) { cmdHandler.okResponse(session, ok); if (cmdHandler.isAutoClearSessionCons()) { session.clearResources(false); } /* 1. 事务提交后,xa 事务结束 */ if (session.getXaTXID() != null) { session.setXATXEnabled(false); } /* 2. preAcStates 为true,事务结束后,需要设置为true。preAcStates 为ac上一个状态 */ if (session.getSource().isPreAcStates()) { session.getSource().setAutocommit(true); } }}复制代码
mysqlCon.batchCmdFinished()
每个数据节点,第一次返回的是XA END
成功,第二次返回的是XA PREPARE
。在XA PREPARE
成功后,记录该数据节点的参与者日志状态为TxState.TX_PREPARED_STATE
。之后,向该数据节点发起XA COMMIT
命令。XA COMMIT
返回成功后,记录该数据节点的事务参与者日志状态为TxState.TX_COMMITED_STATE
。- 当所有数据节点(参与者)都执行完成
XA COMMIT
返回,即this.finished() == true
,返回 MySQL Client XA 事务提交成功。
[x] XA PREPARE
和 XA COMMIT
,数据节点可能返回失败,目前暂时没模拟出来,对应方法为 #errorResponse(....)
。
3.5 MyCAT 启动回滚 XA事务
MyCAT 启动时,会回滚处于TxState.TX_PREPARED_STATE的 ParticipantLogEntry
对应的数据节点的 XA 事务。代码如下:
// MycatServer.javaprivate void performXARecoveryLog() { // fetch the recovery log CoordinatorLogEntry[] coordinatorLogEntries = getCoordinatorLogEntries(); for (int i = 0; i < coordinatorLogEntries.length; i++) { CoordinatorLogEntry coordinatorLogEntry = coordinatorLogEntries[i]; boolean needRollback = false; for (int j = 0; j < coordinatorLogEntry.participants.length; j++) { ParticipantLogEntry participantLogEntry = coordinatorLogEntry.participants[j]; if (participantLogEntry.txState == TxState.TX_PREPARED_STATE) { needRollback = true; break; } } if (needRollback) { for (int j = 0; j < coordinatorLogEntry.participants.length; j++) { ParticipantLogEntry participantLogEntry = coordinatorLogEntry.participants[j]; //XA rollback String xacmd = "XA ROLLBACK " + coordinatorLogEntry.id + ';'; OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(new String[0], new XARollbackCallback()); outloop: for (SchemaConfig schema : MycatServer.getInstance().getConfig().getSchemas().values()) { for (TableConfig table : schema.getTables().values()) { for (String dataNode : table.getDataNodes()) { PhysicalDBNode dn = MycatServer.getInstance().getConfig().getDataNodes().get(dataNode); if (dn.getDbPool().getSource().getConfig().getIp().equals(participantLogEntry.uri) && dn.getDatabase().equals(participantLogEntry.resourceName)) { //XA STATE ROLLBACK participantLogEntry.txState = TxState.TX_ROLLBACKED_STATE; SQLJob sqlJob = new SQLJob(xacmd, dn.getDatabase(), resultHandler, dn.getDbPool().getSource()); sqlJob.run(); break outloop; } } } } } } } // init into in memory cached for (int i = 0; i < coordinatorLogEntries.length; i++) { MultiNodeCoordinator.inMemoryRepository.put(coordinatorLogEntries[i].id, coordinatorLogEntries[i]); } // discard the recovery log MultiNodeCoordinator.fileRepository.writeCheckpoint(MultiNodeCoordinator.inMemoryRepository.getAllCoordinatorLogEntries());}复制代码
4. MyCAT 实现缺陷
MyCAT 1.6.5 版本实现弱XA事务,相对来说,笔者认为距离实际生产使用存在一些差距。下面罗列可能存在的缺陷,如有错误,麻烦指出。?希望 MyCAT 在分布式事务的实现上,能够越来越给力。
4.1 协调日志写入性能
1、CoordinatorLogEntry
、ParticipantLogEntry
在每次写入文件时,是将内存中所有的日志全部重新写入,导致写入性能随着 XA 事务次数的增加,性能会越来越糟糕,导致 XA 事务整体性能会非常差。另外,该方法是同步的,也加大了写入的延迟。
建议:先获得可写入文件的 OFFSET,写入协调日志到文件,内存维护好 XA事务编号 与 OFFSET 的映射关系,从而实现顺序写入 + 并行写入。
2、内存里维护了所有的协调日志,占用内存会越来越大,并且无释放机制。即使重启,协调日志也会重新加载到内存。
建议:已完全回滚或者提交的协调日志不放入内存。另外有文件存储好 XA事务编号 与 OFFSET 的映射关系。
3、协调日志只写入单个文件。
建议:分拆协调日志文件。
PS:有兴趣的同学可以看下 RocketMQ
对 CommitLog
的存储,性能上很赞!
4.2 数据节点未全部 PREPARE 就进行 COMMIT
XA 事务定义,需要等待所有参与者全部 XA PREPARE
成功完成后发起 XA COMMIT
。目前 MyCAT 是某个数据节点 XA PREPARE
完成后立即进行 XA COMMIT
。比如说:第一个数据节点提交了 XA END;XA PREPARE
时,第二个数据节在进行 XA END;XA PREAPRE;
前挂了,第一个节点依然会 XA COMMIT
成功。
建议:按照严格的 XA 事务定义。
4.3 MyCAT 启动回滚 PREPARE 的 XA事务
1、MyCAT 启动时,回滚所有的 PREPARE
的 XA 事务,可能某个 XA 事务,部分 COMMIT
,部分 PREPARE
。此时直接回滚,会导致数据不一致。
建议:当判断到某个 XA 事务存在 PREPARE
的参与者,同时判断该 XA 事务里其他参与者的事务状态以及数据节点里 XA 事务状态,比如参与者为 MySQL
时,可以使用 XA RECOVER
查询处于 PREPARE
所有的 XA 事务。
2、回滚 PREPARE
是异步进行的,在未进行完成时已经设置文件里回滚成功。如果异步过程中失败,会导致 XA 事务状态不一致。
建议:回调成功后,更新该 XA 事务状态。
4.4 单节点事务未记录协调日志
该情况较为极端。发起 XA PREPARE
完后,MyCAT 挂了。重启后,该 XA 事务在 MyCAT 里就“消失“了,参与者的该 XA 事务一直处于 PREPARE
状态。从理论上来说,需要回滚该 XA 事务。
建议:记录协调日志。
4.5 XA COMMIT 部分节点挂了重新恢复后,未进一步处理
当一部分节点 XA COMMIT
完成,另外一部分此时挂了。在管理员重启挂掉的节点,其对应的 XA 事务未进一步处理,导致数据不一致。
建议:?木有建议。也很好奇,如果是这样的情况,如何处理较为合适。如有大大知道,烦请告知下。
5. 彩蛋
例行“彩蛋”?
- 来自 MyCAT Committer 的文章
- 笔者拙作
- 笔者拙作