找回密码
 立即注册
首页 业界区 业界 Seata源码—6.Seata AT模式的数据源代理

Seata源码—6.Seata AT模式的数据源代理

庾签 2025-6-2 23:56:34
大纲
1.Seata的Resource资源接口源码
2.Seata数据源连接池代理的实现源码
3.Client向Server发起注册RM的源码
4.Client向Server注册RM时的交互源码
5.数据源连接代理与SQL句柄代理的初始化源码
6.Seata基于SQL句柄代理执行SQL的源码
7.执行SQL语句前取消自动提交事务的源码
8.执行SQL语句前后构建数据镜像的源码
9.构建全局锁的key和UndoLog数据的源码
10.Seata Client发起分支事务注册的源码
11.Seata Server处理分支事务注册请求的源码
12.将UndoLog写入到数据库与提交事务的源码
13.通过全局锁重试策略组件执行事务的提交
14.注册分支事务时获取全局锁的入口源码
15.Seata Server获取全局锁的具体逻辑源码
16.全局锁和分支事务及本地事务总结
17.提交全局事务以及提交各分支事务的源码
18.全局事务回滚的过程源码
 
1.Seata的Resource资源接口源码
数据源代理DataSourceProxy不仅实现了Seata的Resource资源接口,同时还继承了实现了SeataDataSourceProxy接口的抽象类AbstractDataSourceProxy。
 
由于SeataDataSourceProxy接口又继承自JDK提供的DataSource接口,所以通过数据源连接池DataSource接口的方法,可以获取数据源的连接。
 
注意:这里的数据源==数据库。
  1. public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
  2.     ...
  3. }
  4. public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy {
  5.     ...
  6. }
  7. public interface SeataDataSourceProxy extends DataSource {
  8.     ...
  9. }
  10. public interface DataSource extends CommonDataSource, Wrapper {
  11.     //获取数据源连接
  12.     Connection getConnection() throws SQLException;
  13.     Connection getConnection(String username, String password) throws SQLException;
  14. }
复制代码
Seata的Resource资源接口有三个方法:
 
一.getResourceGroupId()方法用来获取资源分组
比如主从节点同属一个分组。
 
二.getResourceId()方法用来获取数据源ID
比如数据源连接URL可作为数据源ID。
 
三.getBranchType()方法用来获取分支事务类型
比如类型有:AT、TCC、SAGA、XA。
  1. //Resource that can be managed by Resource Manager and involved into global transaction.
  2. //资源是由RM资源管理组件来负责管理的
  3. //RM资源管理器组件会负责把一个个的资源纳入到全局事务里去
  4. //比如RM可以管理数据库资源,把一个数据库本地事务纳入到全局事务里去
  5. public interface Resource {
  6.     //Get the resource group id.
  7.     //e.g. master and slave data-source should be with the same resource group id.
  8.     //获取到资源分组ID
  9.     //主从架构的数据源关联到同一个资源分组ID
  10.     //比如MySQL部署了主从架构,主节点和从节点是两个数据源,但是关联到一个分组ID
  11.     String getResourceGroupId();
  12.     //Get the resource id.
  13.     //e.g. url of a data-source could be the id of the db data-source resource.
  14.     //比如数据源连接URL可以作为数据源的ID
  15.     String getResourceId();
  16.     //get resource type, AT, TCC, SAGA and XA
  17.     //branchType表示分支事务类型:AT、TCC、SAGA、XA
  18.     BranchType getBranchType();
  19. }
复制代码
 
2.Seata数据源连接池代理的实现源码
(1)Seata的数据源连接池代理接口SeataDataSourceProxy
(2)Seata的数据源连接池代理抽象类AbstractDataSourceProxy
(3)Seata的数据源连接池代理DataSourceProxy的变量和初始化
 
(1)Seata的数据源连接池代理接口SeataDataSourceProxy
SeataDataSourceProxy数据源代理在继承DataSource数据源连接池的基础上,增加了两个方法:一个是获取代理的目标数据源连接池的方法,一个是获取代理的目标数据源连接池对应的分支事务类型的方法。
  1. public interface SeataDataSourceProxy extends DataSource {
  2.     //Gets target data source.
  3.     //获取代理的目标数据源连接池
  4.     DataSource getTargetDataSource();
  5.     //Gets branch type.
  6.     //获取代理的目标数据源连接池对应的分支事务类型
  7.     BranchType getBranchType();
  8. }
复制代码
(2)Seata的数据源连接池代理抽象类AbstractDataSourceProxy
AbstractDataSourceProxy抽象类的主要工作是封装代理的目标数据源连接池targetDataSource。
  1. //The type Abstract data source proxy.
  2. //AbstractDataSourceProxy主要的工作就是:
  3. //封装了代理的目标数据源连接池targetDataSource
  4. public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy {
  5.     //The Target data source.
  6.     //代理目标的连接池,可以通过targetDataSource来获取连接
  7.     protected DataSource targetDataSource;
  8.     //Instantiates a new Abstract data source proxy.
  9.     public AbstractDataSourceProxy(){ }
  10.     //Instantiates a new Abstract data source proxy.
  11.     public AbstractDataSourceProxy(DataSource targetDataSource) {
  12.         this.targetDataSource = targetDataSource;
  13.     }
  14.     //Gets target data source.
  15.     @Override
  16.     public DataSource getTargetDataSource() {
  17.         return targetDataSource;
  18.     }
  19.     @Override
  20.     public <T> T unwrap(Class<T> iface) throws SQLException {
  21.         return targetDataSource.unwrap(iface);
  22.     }
  23.     //判断目标连接池targetDataSource是否包装了指定的接口iface
  24.     @Override
  25.     public boolean isWrapperFor(Class<?> iface) throws SQLException {
  26.         return targetDataSource.isWrapperFor(iface);
  27.     }
  28.     @Override
  29.     public PrintWriter getLogWriter() throws SQLException {
  30.         return targetDataSource.getLogWriter();
  31.     }
  32.     @Override
  33.     public void setLogWriter(PrintWriter out) throws SQLException {
  34.         targetDataSource.setLogWriter(out);
  35.     }
  36.     @Override
  37.     public void setLoginTimeout(int seconds) throws SQLException {
  38.         targetDataSource.setLoginTimeout(seconds);
  39.     }
  40.     @Override
  41.     public int getLoginTimeout() throws SQLException {
  42.         return targetDataSource.getLoginTimeout();
  43.     }
  44.     @Override
  45.     public Logger getParentLogger() throws SQLFeatureNotSupportedException {
  46.         return targetDataSource.getParentLogger();
  47.     }
  48. }
复制代码
(3)Seata的数据源连接池代理DataSourceProxy的变量和初始化
初始化数据源连接池代理DataSourceProxy的具体逻辑是:首先从目标数据库连接池dataSource中获取一个数据库连接,然后根据这个数据库连接Connection去初始化jdbcUrl和dbType,接着根据数据库连接地址jdbcUrl初始化resourceId,然后把当前数据库连接池代理DataSourceProxy作为一个资源注册到默认的RM即DefaultResourceManager里去,最后设置RootContext上下文即线程本地变量副本中的分支事务类型。
  1. public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
  2.     private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceProxy.class);
  3.     //默认资源分组ID
  4.     private static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT";
  5.     //Enable the table meta checker,默认是不启用的
  6.     private static boolean ENABLE_TABLE_META_CHECKER_ENABLE = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_TABLE_META_CHECK_ENABLE, DEFAULT_CLIENT_TABLE_META_CHECK_ENABLE);
  7.     //Table meta checker interval,默认是60s
  8.     private static final long TABLE_META_CHECKER_INTERVAL = ConfigurationFactory.getInstance().getLong(ConfigurationKeys.CLIENT_TABLE_META_CHECKER_INTERVAL, DEFAULT_TABLE_META_CHECKER_INTERVAL);
  9.     //资源组ID,比如MySQL部署了主从架构,主节点和从节点是两个数据源,但是关联到一个分组ID
  10.     private String resourceGroupId;
  11.     //代理的目标数据源连接url,这个数据源连接url也可以作为resourceId
  12.     private String jdbcUrl;
  13.     //数据源ID,比如数据库连接url就可以作为一个数据源ID
  14.     private String resourceId;
  15.     //数据源类型
  16.     private String dbType;
  17.     //数据源连接用户名
  18.     private String userName;
  19.     //定时调度的线程池,定时检查表里的元数据
  20.     private final ScheduledExecutorService tableMetaExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("tableMetaChecker", 1, true));
  21.     //Instantiates a new Data source proxy.
  22.     public DataSourceProxy(DataSource targetDataSource) {
  23.         this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID);
  24.     }
  25.     //Instantiates a new Data source proxy.
  26.     //@param targetDataSource the target data source
  27.     //@param resourceGroupId  the resource group id
  28.     public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
  29.         if (targetDataSource instanceof SeataDataSourceProxy) {
  30.             LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
  31.             targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
  32.         }
  33.         this.targetDataSource = targetDataSource;
  34.         init(targetDataSource, resourceGroupId);
  35.     }
  36.     //初始化数据源连接池代理DataSourceProxy
  37.     private void init(DataSource dataSource, String resourceGroupId) {
  38.         //资源分组ID
  39.         this.resourceGroupId = resourceGroupId;
  40.         //从目标数据库连接池dataSource中,获取一个数据库连接
  41.         try (Connection connection = dataSource.getConnection()) {
  42.             //获取数据库连接connection里的元数据的连接url
  43.             jdbcUrl = connection.getMetaData().getURL();
  44.             //根据连接url获取到数据库类型
  45.             dbType = JdbcUtils.getDbType(jdbcUrl);
  46.             if (JdbcConstants.ORACLE.equals(dbType)) {
  47.                 //如果数据库类型等于oracle,则需要获取数据库连接connection的元数据的用户名
  48.                 userName = connection.getMetaData().getUserName();
  49.             } else if (JdbcConstants.MARIADB.equals(dbType)) {
  50.                 //如果数据库类型等于mariadb,则需要对数据库类型进行赋值为MySQL
  51.                 dbType = JdbcConstants.MYSQL;
  52.             }
  53.         } catch (SQLException e) {
  54.             throw new IllegalStateException("can not init dataSource", e);
  55.         }
  56.         //初始化资源ID,也就是获取数据库连接url来初始化resourceID
  57.         initResourceId();
  58.         //把当前数据库连接池代理,作为一个资源,注册到默认的RM里,也就是DefaultResourceManager
  59.         DefaultResourceManager.get().registerResource(this);
  60.         if (ENABLE_TABLE_META_CHECKER_ENABLE) {
  61.             tableMetaExecutor.scheduleAtFixedRate(() -> {
  62.                 try (Connection connection = dataSource.getConnection()) {
  63.                     TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()).refresh(connection, DataSourceProxy.this.getResourceId());
  64.                 } catch (Exception ignore) {
  65.                      
  66.                 }
  67.             }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
  68.         }
  69.         //Set the default branch type to 'AT' in the RootContext.
  70.         //设置RootContext上下文,即线程本地变量副本中的分支事务类型
  71.         RootContext.setDefaultBranchType(this.getBranchType());
  72.     }
  73.    
  74.     private void initResourceId() {
  75.         if (JdbcConstants.POSTGRESQL.equals(dbType)) {
  76.             initPGResourceId();
  77.         } else if (JdbcConstants.ORACLE.equals(dbType) && userName != null) {
  78.             initDefaultResourceId();
  79.             resourceId = resourceId + "/" + userName;
  80.         } else if (JdbcConstants.MYSQL.equals(dbType)) {
  81.             initMysqlResourceId();
  82.         } else {
  83.             initDefaultResourceId();
  84.         }
  85.     }
  86.    
  87.     private void initMysqlResourceId() {
  88.         String startsWith = "jdbc:mysql:loadbalance://";
  89.         if (jdbcUrl.startsWith(startsWith)) {
  90.             String url;
  91.             if (jdbcUrl.contains("?")) {
  92.                 url = jdbcUrl.substring(0, jdbcUrl.indexOf('?'));
  93.             } else {
  94.                 url = jdbcUrl;
  95.             }
  96.             resourceId = url.replace(",", "|");
  97.         } else {
  98.             initDefaultResourceId();
  99.         }
  100.     }
  101.     ...
  102. }
复制代码
 
3.Client向Server发起注册RM的源码
初始化数据源连接池代理DataSourceProxy时,会将数据库连接池代理作为资源,注册到DefaultResourceManager资源管理器中。
 
而初始化DefaultResourceManager时,会通过SPI机制加载所有的ResourceManager。
 
因此在执行DataSourceProxy的init()方法进行初始化时,由于会调用DefaultResourceManager的registerResource()方法,所以最后会执行到DataSourceManager的registerResource()方法。
 
在DataSourceManager的registerResource()方法中,首先会把数据源连接池代理DataSourceProxy放入一个Map中进行缓存,然后通过RmNettyRemotingClient构造一个注册RM的请求把数据源连接池代理DataSourceProxy作为资源注册到Seata Server中。
1.png
  1. public class DefaultResourceManager implements ResourceManager {
  2.     //all resource managers
  3.     protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();
  4.    
  5.     private static class SingletonHolder {
  6.         private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
  7.     }
  8.    
  9.     //Get resource manager.
  10.     public static DefaultResourceManager get() {
  11.         return SingletonHolder.INSTANCE;
  12.     }
  13.    
  14.     private DefaultResourceManager() {
  15.         initResourceManagers();
  16.     }
  17.    
  18.     protected void initResourceManagers() {
  19.         //init all resource managers
  20.         //通过SPI加载所有的ResourceManager资源管理器
  21.         //比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXA
  22.         List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
  23.         if (CollectionUtils.isNotEmpty(allResourceManagers)) {
  24.             for (ResourceManager rm : allResourceManagers) {
  25.                 resourceManagers.put(rm.getBranchType(), rm);
  26.             }
  27.         }
  28.     }
  29.    
  30.     @Override
  31.     public void registerResource(Resource resource) {
  32.         getResourceManager(resource.getBranchType()).registerResource(resource);
  33.     }
  34.    
  35.     public ResourceManager getResourceManager(BranchType branchType) {
  36.         ResourceManager rm = resourceManagers.get(branchType);
  37.         if (rm == null) {
  38.             throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());
  39.         }
  40.         return rm;
  41.     }
  42.     ...
  43. }
  44. //The type Data source manager.
  45. //DataSourceManager是AT模式下的资源管理器
  46. public class DataSourceManager extends AbstractResourceManager {
  47.     //异步化worker
  48.     private final AsyncWorker asyncWorker = new AsyncWorker(this);
  49.     //RM负责管理的一些resource资源
  50.     private final Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();
  51.     ...
  52.    
  53.     @Override
  54.     public void registerResource(Resource resource) {
  55.         DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;
  56.         //根据资源ID和数据源代理,把数据源连接池代理DataSourceProxy放入到map里去
  57.         dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
  58.         super.registerResource(dataSourceProxy);
  59.     }
  60.     ...
  61. }
  62. public abstract class AbstractResourceManager implements ResourceManager {
  63.     ...
  64.     @Override
  65.     public void registerResource(Resource resource) {
  66.         //通过RmNettyRemotingClient把RM注册到Seata Server中
  67.         RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
  68.     }
  69.     ...
  70. }
复制代码
 
4.Client向Server注册RM时的交互源码
(1)Client异步发送注册RM的请求给Server
(2)Server收到注册RM的请求后的处理及异步响应
 
(1)Client异步发送注册RM的请求给Server
  1. public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {
  2.     ...
  3.     //Register new db key.
  4.     public void registerResource(String resourceGroupId, String resourceId) {
  5.         //Resource registration cannot be performed until the RM client is initialized
  6.         if (StringUtils.isBlank(transactionServiceGroup)) {
  7.             return;
  8.         }
  9.         if (getClientChannelManager().getChannels().isEmpty()) {
  10.             getClientChannelManager().reconnect(transactionServiceGroup);
  11.             return;
  12.         }
  13.         synchronized (getClientChannelManager().getChannels()) {
  14.             //向每一个Server发起注册
  15.             for (Map.Entry<String, Channel> entry : getClientChannelManager().getChannels().entrySet()) {
  16.                 String serverAddress = entry.getKey();
  17.                 Channel rmChannel = entry.getValue();
  18.                 if (LOGGER.isInfoEnabled()) {
  19.                     LOGGER.info("will register resourceId:{}", resourceId);
  20.                 }
  21.                 sendRegisterMessage(serverAddress, rmChannel, resourceId);
  22.             }
  23.         }
  24.     }
  25.    
  26.     public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) {
  27.         RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
  28.         message.setResourceIds(resourceId);
  29.         try {
  30.             //异步发送注册RM的请求
  31.             super.sendAsyncRequest(channel, message);
  32.         } catch (FrameworkException e) {
  33.             if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {
  34.                 getClientChannelManager().releaseChannel(channel, serverAddress);
  35.                 if (LOGGER.isInfoEnabled()) {
  36.                     LOGGER.info("remove not writable channel:{}", channel);
  37.                 }
  38.             } else {
  39.                 LOGGER.error("register resource failed, channel:{},resourceId:{}", channel, resourceId, e);
  40.             }
  41.         }
  42.     }
  43.     ...
  44. }
  45. public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
  46.     ...
  47.     @Override
  48.     public void sendAsyncRequest(Channel channel, Object msg) {
  49.         if (channel == null) {
  50.             LOGGER.warn("sendAsyncRequest nothing, caused by null channel.");
  51.             return;
  52.         }
  53.         RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
  54.             ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
  55.             : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
  56.         if (rpcMessage.getBody() instanceof MergeMessage) {
  57.             mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());
  58.         }
  59.         super.sendAsync(channel, rpcMessage);
  60.     }
  61.     ...
  62. }
  63. public abstract class AbstractNettyRemoting implements Disposable {
  64.     ...
  65.     //rpc async request.
  66.     protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
  67.         channelWritableCheck(channel, rpcMessage.getBody());
  68.         if (LOGGER.isDebugEnabled()) {
  69.             LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
  70.         }
  71.         doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
  72.         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
  73.             if (!future.isSuccess()) {
  74.                 destroyChannel(future.channel());
  75.             }
  76.         });
  77.     }
  78.     ...
  79. }
复制代码
(2)Server收到注册RM的请求后的处理及异步响应
2.png
  1. public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
  2.     ...
  3.     @ChannelHandler.Sharable
  4.     class ServerHandler extends ChannelDuplexHandler {
  5.         @Override
  6.         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  7.             if (!(msg instanceof RpcMessage)) {
  8.                 return;
  9.             }
  10.             //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
  11.             processMessage(ctx, (RpcMessage) msg);
  12.         }
  13.         ...
  14.     }
  15.     ...
  16. }
  17. public abstract class AbstractNettyRemoting implements Disposable {
  18.     ...
  19.     //Rpc message processing.
  20.     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  21.         if (LOGGER.isDebugEnabled()) {
  22.             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
  23.         }
  24.         Object body = rpcMessage.getBody();
  25.         if (body instanceof MessageTypeAware) {
  26.             MessageTypeAware messageTypeAware = (MessageTypeAware) body;
  27.             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
  28.             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
  29.             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
  30.             if (pair != null) {
  31.                 if (pair.getSecond() != null) {
  32.                     try {
  33.                         pair.getSecond().execute(() -> {
  34.                             try {
  35.                                 pair.getFirst().process(ctx, rpcMessage);
  36.                             } catch (Throwable th) {
  37.                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  38.                             } finally {
  39.                                 MDC.clear();
  40.                             }
  41.                         });
  42.                     } catch (RejectedExecutionException e) {
  43.                         ...
  44.                     }
  45.                 } else {
  46.                     try {
  47.                         pair.getFirst().process(ctx, rpcMessage);
  48.                     } catch (Throwable th) {
  49.                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  50.                     }
  51.                 }
  52.             } else {
  53.                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
  54.             }
  55.         } else {
  56.             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
  57.         }
  58.     }
  59.     ...
  60. }
  61. public class RegRmProcessor implements RemotingProcessor {
  62.     ...
  63.     @Override
  64.     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  65.         onRegRmMessage(ctx, rpcMessage);
  66.     }
  67.     private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
  68.         RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
  69.         //获取请求的发送地址
  70.         String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
  71.         boolean isSuccess = false;
  72.         String errorInfo = StringUtils.EMPTY;
  73.         try {
  74.             if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
  75.                 //通过Channel管理组件ChannelManager,注册RM网络连接
  76.                 ChannelManager.registerRMChannel(message, ctx.channel());
  77.                 Version.putChannelVersion(ctx.channel(), message.getVersion());
  78.                 isSuccess = true;
  79.                 if (LOGGER.isDebugEnabled()) {
  80.                     LOGGER.debug("RM checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
  81.                 }
  82.             } else {
  83.                 if (LOGGER.isWarnEnabled()) {
  84.                     LOGGER.warn("RM checkAuth for client:{},vgroup:{},applicationId:{} is FAIL", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
  85.                 }
  86.             }
  87.         } catch (Exception exx) {
  88.             isSuccess = false;
  89.             errorInfo = exx.getMessage();
  90.             LOGGER.error("RM register fail, error message:{}", errorInfo);
  91.         }
  92.         RegisterRMResponse response = new RegisterRMResponse(isSuccess);
  93.         if (StringUtils.isNotEmpty(errorInfo)) {
  94.             response.setMsg(errorInfo);
  95.         }
  96.         //返回响应给客户端
  97.         remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
  98.         if (isSuccess && LOGGER.isInfoEnabled()) {
  99.             LOGGER.info("RM register success,message:{},channel:{},client version:{}", message, ctx.channel(), message.getVersion());
  100.         }
  101.     }
  102.     ...
  103. }
  104. public class ChannelManager {
  105.     ...
  106.     public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel) throws IncompatibleVersionException {
  107.         Version.checkVersion(resourceManagerRequest.getVersion());
  108.         Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
  109.         RpcContext rpcContext;
  110.         if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
  111.             rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
  112.                 resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
  113.                 resourceManagerRequest.getResourceIds(), channel);
  114.             rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
  115.         } else {
  116.             rpcContext = IDENTIFIED_CHANNELS.get(channel);
  117.             rpcContext.addResources(dbkeySet);
  118.         }
  119.         if (dbkeySet == null || dbkeySet.isEmpty()) {
  120.             return;
  121.         }
  122.         for (String resourceId : dbkeySet) {
  123.             String clientIp;
  124.             ConcurrentMap<Integer, RpcContext> portMap =
  125.                 CollectionUtils.computeIfAbsent(RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>())
  126.                 .computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>())
  127.                 .computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());
  128.             rpcContext.holdInResourceManagerChannels(resourceId, portMap);
  129.             updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
  130.         }
  131.     }
  132.     ...
  133. }
  134. public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
  135.     ...
  136.     @Override
  137.     public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {
  138.         Channel clientChannel = channel;
  139.         if (!(msg instanceof HeartbeatMessage)) {
  140.             clientChannel = ChannelManager.getSameClientChannel(channel);
  141.         }
  142.         if (clientChannel != null) {
  143.             RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage
  144.                 ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE : ProtocolConstants.MSGTYPE_RESPONSE);
  145.             super.sendAsync(clientChannel, rpcMsg);
  146.         } else {
  147.             throw new RuntimeException("channel is error.");
  148.         }
  149.     }
  150.     ...
  151. }
  152. public abstract class AbstractNettyRemoting implements Disposable {
  153.     ...
  154.     //rpc async request.
  155.     protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
  156.         channelWritableCheck(channel, rpcMessage.getBody());
  157.         if (LOGGER.isDebugEnabled()) {
  158.             LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
  159.         }
  160.         doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
  161.         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
  162.             if (!future.isSuccess()) {
  163.                 destroyChannel(future.channel());
  164.             }
  165.         });
  166.     }
  167.     ...
  168. }
复制代码
 
5.数据源连接代理与SQL句柄代理的初始化源码
(1)数据库操作的三剑客之连接、句柄和结果
(2)数据源连接代理的初始化
(3)数据源连接代理对SQL进行预编译
(4)SQL句柄代理的初始化
(5)SQL句柄代理执行SQL
 
(1)数据库操作的三剑客之连接、句柄和结果
Seata Client或者Seata Server进行数据库操作的大致流程如下所示:
3.png
  1. public class LogStoreDataBaseDAO implements LogStore {
  2.     //The Log store data source. 数据源连接池
  3.     protected DataSource logStoreDataSource = null;
  4.     ...
  5.     @Override
  6.     public GlobalTransactionDO queryGlobalTransactionDO(long transactionId) {
  7.         String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getQueryGlobalTransactionSQLByTransactionId(globalTable);
  8.         Connection conn = null;//连接
  9.         PreparedStatement ps = null;//句柄
  10.         ResultSet rs = null;//结果
  11.         try {
  12.             //1.从数据源连接池中获取数据源连接
  13.             conn = logStoreDataSource.getConnection();
  14.             conn.setAutoCommit(true);
  15.             //2.对sql语句进行预编译
  16.             ps = conn.prepareStatement(sql);
  17.             ps.setLong(1, transactionId);
  18.             //3.执行sql语句
  19.             rs = ps.executeQuery();
  20.             if (rs.next()) {
  21.                 return convertGlobalTransactionDO(rs);
  22.             } else {
  23.                 return null;
  24.             }
  25.         } catch (SQLException e) {
  26.             throw new DataAccessException(e);
  27.         } finally {
  28.             IOUtil.close(rs, ps, conn);
  29.         }
  30.     }
  31.     ...
  32. }
复制代码
(2)数据源连接代理的初始化
Seata Client或者Seata Server进行数据库操作时,首先会通过数据库连接池代理DataSourceProxy获取数据库连接,也就是会通过DataSourceProxy的getConnection()方法获取数据源连接代理ConnectionProxy,其中就会根据获取到的一个数据源连接Connection初始化一个数据源连接代理ConnectionProxy。
  1. public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
  2.     ...
  3.     @Override
  4.     public ConnectionProxy getConnection() throws SQLException {
  5.         //从目标数据源连接池中获取一个数据库连接,然后封装到ConnectionProxy数据源连接代理中,并进行返回
  6.         Connection targetConnection = targetDataSource.getConnection();
  7.         return new ConnectionProxy(this, targetConnection);
  8.     }
  9.     @Override
  10.     public ConnectionProxy getConnection(String username, String password) throws SQLException {
  11.         //从目标数据源连接池中获取一个数据库连接,然后封装到ConnectionProxy数据源连接代理中,并进行返回
  12.         Connection targetConnection = targetDataSource.getConnection(username, password);
  13.         return new ConnectionProxy(this, targetConnection);
  14.     }
  15.     ...
  16. }
  17. public class ConnectionProxy extends AbstractConnectionProxy {
  18.     //Instantiates a new Connection proxy.
  19.     public ConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
  20.         super(dataSourceProxy, targetConnection);
  21.     }
  22.     ...
  23. }
  24. public abstract class AbstractConnectionProxy implements Connection {
  25.     //The Data source proxy. 数据源连接池代理
  26.     protected DataSourceProxy dataSourceProxy;
  27.     //The Target connection. 目标数据源连接
  28.     protected Connection targetConnection;
  29.     //Instantiates a new Abstract connection proxy.
  30.     public AbstractConnectionProxy(DataSourceProxy dataSourceProxy, Connection targetConnection) {
  31.         this.dataSourceProxy = dataSourceProxy;
  32.         this.targetConnection = targetConnection;
  33.     }
  34.     ...
  35. }
复制代码
(3)数据源连接代理对SQL进行预编译
数据源连接代理ConnectionProxy在进行数据库操作时,获取到数据库连接Connection之后,就需要对要执行的SQL进行预编译,也就是会调用AbstractConnectionProxy的prepareStatement()方法。
  1. public abstract class AbstractConnectionProxy implements Connection {
  2.     ...
  3.     //对SQL进行预编译
  4.     @Override
  5.     public PreparedStatement prepareStatement(String sql) throws SQLException {
  6.         String dbType = getDbType();
  7.         //support oracle 10.2+
  8.         PreparedStatement targetPreparedStatement = null;
  9.         //如果是AT模式
  10.         if (BranchType.AT == RootContext.getBranchType()) {
  11.             List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
  12.             if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
  13.                 SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
  14.                 if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
  15.                     TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(
  16.                         getTargetConnection(),
  17.                         sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId()
  18.                     );
  19.                     String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
  20.                     tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
  21.                     targetPreparedStatement = getTargetConnection().prepareStatement(sql, pkNameArray);
  22.                 }
  23.             }
  24.         }
  25.         if (targetPreparedStatement == null) {
  26.             targetPreparedStatement = getTargetConnection().prepareStatement(sql);
  27.         }
  28.         //返回一个SQL句柄代理
  29.         return new PreparedStatementProxy(this, targetPreparedStatement, sql);
  30.     }
  31.     ...
  32. }
复制代码
(4)SQL句柄代理的初始化
SQL句柄代理PreparedStatementProxy的初始化主要是设置目标SQL、目标句柄和数据源连接代理。
  1. public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {
  2.     //Instantiates a new Prepared statement proxy.
  3.     public PreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {
  4.         super(connectionProxy, targetStatement, targetSQL);
  5.     }
  6.     ...
  7. }
  8. public abstract class AbstractPreparedStatementProxy extends StatementProxy<PreparedStatement> implements PreparedStatement {
  9.     protected Map<Integer, ArrayList<Object>> parameters;
  10.    
  11.     private void initParameterHolder() {
  12.         this.parameters = new HashMap<>();
  13.     }
  14.    
  15.     //Instantiates a new Abstract prepared statement proxy.
  16.     public AbstractPreparedStatementProxy(AbstractConnectionProxy connectionProxy, PreparedStatement targetStatement, String targetSQL) throws SQLException {
  17.         super(connectionProxy, targetStatement, targetSQL);
  18.         initParameterHolder();
  19.     }
  20.     ...
  21. }
  22. public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
  23.     //Instantiates a new Statement proxy.
  24.     public StatementProxy(AbstractConnectionProxy connectionWrapper, T targetStatement, String targetSQL) throws SQLException {
  25.         super(connectionWrapper, targetStatement, targetSQL);
  26.     }
  27.     ...
  28. }
  29. public abstract class AbstractStatementProxy<T extends Statement> implements Statement {
  30.     //The Connection proxy.
  31.     protected AbstractConnectionProxy connectionProxy;
  32.     //The Target statement.
  33.     protected T targetStatement;
  34.     //The Target sql.
  35.     protected String targetSQL;
  36.     ...
  37.     //Instantiates a new Abstract statement proxy.
  38.     public AbstractStatementProxy(AbstractConnectionProxy connectionProxy, T targetStatement, String targetSQL) throws SQLException {
  39.         this.connectionProxy = connectionProxy;
  40.         this.targetStatement = targetStatement;
  41.         this.targetSQL = targetSQL;
  42.     }
  43.     ...
  44. }
复制代码
(5)SQL句柄代理执行SQL
从数据源连接池中获取到数据源连接,以及对SQL语句进行预编译后,就可以调用SQL句柄代理PreparedStatementProxy的executeQuery()等方法执行SQL语句。
 
6.Seata基于SQL句柄代理执行SQL的源码
(1)Spring的JdbcTemplate操作数据库的三剑客
(2)基于SQL句柄代理执行SQL的流程
 
(1)Spring的JdbcTemplate操作数据库的三剑客
连接、句柄和结果。
  1. @Disabled
  2. public class LocalTransactionWithGlobalLockDataSourceBasicTest {
  3.     private static ClassPathXmlApplicationContext context;
  4.     private static JdbcTemplate jdbcTemplate;
  5.    
  6.     @BeforeAll
  7.     public static void before() {
  8.         context = new ClassPathXmlApplicationContext("basic-test-context.xml");
  9.         jdbcTemplate = (JdbcTemplate) context.getBean("jdbcTemplate");
  10.     }
  11.    
  12.     @Test
  13.     public void testInsert() {
  14.         RootContext.bindGlobalLockFlag();
  15.         jdbcTemplate.update("insert into user0 (id, name, gmt) values (?, ?, ?)", new Object[]{2, "xxx", new Date()});
  16.     }
  17.     ...
  18. }
  19. public class JdbcTemplate extends JdbcAccessor implements JdbcOperations {
  20.     ...
  21.     @Override
  22.     public int update(String sql, @Nullable Object... args) throws DataAccessException {
  23.         return update(sql, newArgPreparedStatementSetter(args));
  24.     }
  25.    
  26.     @Override
  27.     public int update(String sql, @Nullable PreparedStatementSetter pss) throws DataAccessException {
  28.         return update(new SimplePreparedStatementCreator(sql), pss);
  29.     }
  30.    
  31.     protected int update(final PreparedStatementCreator psc, @Nullable final PreparedStatementSetter pss) throws DataAccessException {
  32.         logger.debug("Executing prepared SQL update");
  33.         return updateCount(execute(psc, ps -> {
  34.             try {
  35.                 if (pss != null) {
  36.                     pss.setValues(ps);
  37.                 }
  38.                 //PreparedStatement执行SQL
  39.                 int rows = ps.executeUpdate();
  40.                 if (logger.isTraceEnabled()) {
  41.                     logger.trace("SQL update affected " + rows + " rows");
  42.                 }
  43.                 return rows;
  44.             } finally {
  45.                 if (pss instanceof ParameterDisposer) {
  46.                     ((ParameterDisposer) pss).cleanupParameters();
  47.                 }
  48.             }
  49.         }, true));
  50.     }
  51.    
  52.     @Nullable
  53.     private <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action, boolean closeResources) throws DataAccessException {
  54.         Assert.notNull(psc, "PreparedStatementCreator must not be null");
  55.         Assert.notNull(action, "Callback object must not be null");
  56.         if (logger.isDebugEnabled()) {
  57.             String sql = getSql(psc);
  58.             logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
  59.         }
  60.         //1.获取连接
  61.         Connection con = DataSourceUtils.getConnection(obtainDataSource());
  62.         PreparedStatement ps = null;
  63.         try {
  64.             //2.创建句柄
  65.             ps = psc.createPreparedStatement(con);
  66.             applyStatementSettings(ps);
  67.             //3.执行SQL的结果
  68.             T result = action.doInPreparedStatement(ps);
  69.             handleWarnings(ps);
  70.             return result;
  71.         } catch (SQLException ex) {
  72.             if (psc instanceof ParameterDisposer) {
  73.                 ((ParameterDisposer) psc).cleanupParameters();
  74.             }
  75.             String sql = getSql(psc);
  76.             psc = null;
  77.             JdbcUtils.closeStatement(ps);
  78.             ps = null;
  79.             DataSourceUtils.releaseConnection(con, getDataSource());
  80.             con = null;
  81.             throw translateException("PreparedStatementCallback", sql, ex);
  82.         } finally {
  83.             if (closeResources) {
  84.                 if (psc instanceof ParameterDisposer) {
  85.                     ((ParameterDisposer) psc).cleanupParameters();
  86.                 }
  87.                 JdbcUtils.closeStatement(ps);
  88.                 DataSourceUtils.releaseConnection(con, getDataSource());
  89.             }
  90.         }
  91.     }
  92.     ...
  93. }
复制代码
(2)基于SQL句柄代理执行SQL的流程
SQL句柄代理PreparedStatementProxy在调用execute()方法执行SQL时,就会调用到ExecuteTemplate执行模版的execute()方法。
 
而ExecuteTemplate执行模版的execute()方法,如果发现不需要全局锁 + 没有开启全局事务,那么就普通执行本地事务。否则,最终就会调用到BaseTransactionalExecutor的excute()方法。
 
在BaseTransactionalExecutor的excute()方法中,首先会从线程本地变量副本中获取xid,然后再执行SQL。
4.png
  1. public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {
  2.     ...
  3.     @Override
  4.     public boolean execute() throws SQLException {
  5.         return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
  6.     }
  7.    
  8.     @Override
  9.     public ResultSet executeQuery() throws SQLException {
  10.         return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());
  11.     }
  12.    
  13.     @Override
  14.     public int executeUpdate() throws SQLException {
  15.         return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
  16.     }
  17.     ...
  18. }
  19. public class ExecuteTemplate {
  20.     ...
  21.     public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {
  22.         return execute(null, statementProxy, statementCallback, args);
  23.     }
  24.    
  25.     public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException {
  26.         //如果发现不需要全局锁,而且没有开启AT模式下的全局事务,那么就普通执行本地事务
  27.         if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
  28.             //Just work as original statement
  29.             return statementCallback.execute(statementProxy.getTargetStatement(), args);
  30.         }
  31.         //获取到DB的类型
  32.         String dbType = statementProxy.getConnectionProxy().getDbType();
  33.         if (CollectionUtils.isEmpty(sqlRecognizers)) {
  34.             sqlRecognizers = SQLVisitorFactory.get(statementProxy.getTargetSQL(), dbType);
  35.         }
  36.         Executor<T> executor;
  37.         if (CollectionUtils.isEmpty(sqlRecognizers)) {
  38.             executor = new PlainExecutor<>(statementProxy, statementCallback);
  39.         } else {
  40.             if (sqlRecognizers.size() == 1) {
  41.                 SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
  42.                 switch (sqlRecognizer.getSQLType()) {
  43.                     case INSERT:
  44.                         //通过SPI机制加载InsertExecutor
  45.                         executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer});
  46.                         break;
  47.                     case UPDATE:
  48.                         executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  49.                         break;
  50.                     case DELETE:
  51.                         executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  52.                         break;
  53.                     case SELECT_FOR_UPDATE:
  54.                         executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
  55.                         break;
  56.                     case INSERT_ON_DUPLICATE_UPDATE:
  57.                         switch (dbType) {
  58.                             case JdbcConstants.MYSQL:
  59.                             case JdbcConstants.MARIADB:
  60.                                 executor = new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
  61.                                 break;
  62.                             default:
  63.                                 throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
  64.                         }
  65.                         break;
  66.                     default:
  67.                         executor = new PlainExecutor<>(statementProxy, statementCallback);
  68.                         break;
  69.                 }
  70.             } else {
  71.                 executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
  72.             }
  73.         }
  74.         T rs;
  75.         try {
  76.             //比如下面最终会调用BaseTransactionalExecutor.excute()方法
  77.             rs = executor.execute(args);
  78.         } catch (Throwable ex) {
  79.             if (!(ex instanceof SQLException)) {
  80.                 // Turn other exception into SQLException
  81.                 ex = new SQLException(ex);
  82.             }
  83.             throw (SQLException) ex;
  84.         }
  85.         return rs;
  86.     }
  87.     ...
  88. }
  89. @LoadLevel(name = JdbcConstants.MYSQL, scope = Scope.PROTOTYPE)
  90. public class MySQLInsertExecutor extends BaseInsertExecutor implements Defaultable {
  91.     ...
  92.     //Instantiates a new Abstract dml base executor.
  93.     public MySQLInsertExecutor(StatementProxy statementProxy, StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
  94.         super(statementProxy, statementCallback, sqlRecognizer);
  95.     }
  96.     ...
  97. }
  98. public abstract class BaseInsertExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> implements InsertExecutor<T> {
  99.     ...
  100.     public BaseInsertExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
  101.         super(statementProxy, statementCallback, sqlRecognizer);
  102.     }
  103.     ...
  104. }
  105. public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  106.     ...
  107.     public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
  108.         super(statementProxy, statementCallback, sqlRecognizer);
  109.     }
  110.    
  111.     @Override
  112.     public T doExecute(Object... args) throws Throwable {
  113.         AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  114.         //判断是否是自动提交本地事务,默认情况本地事务都是自动提交的,此时需要阻止自动提交
  115.         if (connectionProxy.getAutoCommit()) {
  116.             return executeAutoCommitTrue(args);
  117.         } else {
  118.             return executeAutoCommitFalse(args);
  119.         }
  120.     }
  121.     ...
  122. }
  123. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
  124.     //The Statement proxy.
  125.     protected StatementProxy<S> statementProxy;
  126.     //The Statement callback.
  127.     protected StatementCallback<T, S> statementCallback;
  128.     //The Sql recognizer.
  129.     protected SQLRecognizer sqlRecognizer;
  130.     ...
  131.     public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
  132.         SQLRecognizer sqlRecognizer) {
  133.         this.statementProxy = statementProxy;
  134.         this.statementCallback = statementCallback;
  135.         this.sqlRecognizer = sqlRecognizer;
  136.     }
  137.     ...
  138.     @Override
  139.     public T execute(Object... args) throws Throwable {
  140.         //获取xid
  141.         String xid = RootContext.getXID();
  142.         if (xid != null) {
  143.             statementProxy.getConnectionProxy().bind(xid);
  144.         }
  145.         statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
  146.         return doExecute(args);
  147.     }
  148.    
  149.     //Do execute object.
  150.     protected abstract T doExecute(Object... args) throws Throwable;
  151.     ...
  152. }
复制代码
 
7.执行SQL语句前取消自动提交事务的源码
执行ExecuteTemplate执行模版的execute()方法时,最终会调用到BaseTransactionalExecutor基础事务执行器的excute()方法。
 
执行BaseTransactionalExecutor的execute()方法时,又会执行到AbstractDMLBaseExecutor的doExecute()方法。该方法会判断目标数据库连接是否会自动提交本地事务,默认情况下本地事务都是自动提交的。如果是,则取消自动提交本地事务。
5.png
  1. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
  2.     //The Statement proxy.
  3.     protected StatementProxy<S> statementProxy;
  4.     //The Statement callback.
  5.     protected StatementCallback<T, S> statementCallback;
  6.     //The Sql recognizer.
  7.     protected SQLRecognizer sqlRecognizer;
  8.     ...
  9.     public BaseTransactionalExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback,
  10.         SQLRecognizer sqlRecognizer) {
  11.         this.statementProxy = statementProxy;
  12.         this.statementCallback = statementCallback;
  13.         this.sqlRecognizer = sqlRecognizer;
  14.     }
  15.     ...
  16.     @Override
  17.     public T execute(Object... args) throws Throwable {
  18.         //获取xid
  19.         String xid = RootContext.getXID();
  20.         if (xid != null) {
  21.             statementProxy.getConnectionProxy().bind(xid);
  22.         }
  23.         statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
  24.         return doExecute(args);
  25.     }
  26.    
  27.     //Do execute object.
  28.     protected abstract T doExecute(Object... args) throws Throwable;
  29.     ...
  30. }
  31. public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  32.     ...
  33.     public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
  34.         super(statementProxy, statementCallback, sqlRecognizer);
  35.     }
  36.    
  37.     @Override
  38.     public T doExecute(Object... args) throws Throwable {
  39.         AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  40.         //判断是否是自动提交本地事务,默认情况本地事务都是自动提交的,此时需要阻止自动提交
  41.         if (connectionProxy.getAutoCommit()) {
  42.             return executeAutoCommitTrue(args);
  43.         } else {
  44.             return executeAutoCommitFalse(args);
  45.         }
  46.     }
  47.     ...
  48. }
  49. public abstract class AbstractConnectionProxy implements Connection {
  50.     ...
  51.     @Override
  52.     public boolean getAutoCommit() throws SQLException {
  53.         //判断目标数据库连接是否是自动提交,默认情况是都是自动提交的
  54.         return targetConnection.getAutoCommit();
  55.     }
  56.     ...
  57. }
  58. public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  59.     ...
  60.     //Execute auto commit true t.
  61.     protected T executeAutoCommitTrue(Object[] args) throws Throwable {
  62.         ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  63.         try {
  64.             //修改自动提交事务的设置,此时需要阻止自动提交事务
  65.             connectionProxy.changeAutoCommit();
  66.             return new LockRetryPolicy(connectionProxy).execute(() -> {
  67.                 T result = executeAutoCommitFalse(args);//执行SQL语句
  68.                 connectionProxy.commit();//手动提交本地事务
  69.                 return result;
  70.             });
  71.         } catch (Exception e) {
  72.             //when exception occur in finally,this exception will lost, so just print it here
  73.             LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
  74.             if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
  75.                 connectionProxy.getTargetConnection().rollback();
  76.             }
  77.             throw e;
  78.         } finally {
  79.             connectionProxy.getContext().reset();
  80.             connectionProxy.setAutoCommit(true);
  81.         }
  82.     }
  83.     ...
  84. }
  85. public class ConnectionProxy extends AbstractConnectionProxy {
  86.     private final ConnectionContext context = new ConnectionContext();
  87.     ...
  88.     //change connection autoCommit to false by seata
  89.     public void changeAutoCommit() throws SQLException {
  90.         getContext().setAutoCommitChanged(true);
  91.         setAutoCommit(false);
  92.     }
  93.    
  94.     //Gets context.
  95.     public ConnectionContext getContext() {
  96.         return context;
  97.     }
  98.    
  99.     @Override
  100.     public void setAutoCommit(boolean autoCommit) throws SQLException {
  101.         if ((context.inGlobalTransaction() || context.isGlobalLockRequire()) && autoCommit && !getAutoCommit()) {
  102.             //change autocommit from false to true, we should commit() first according to JDBC spec.
  103.             doCommit();
  104.         }
  105.         //把目标数据源连接的自动提交事务设置为false
  106.         targetConnection.setAutoCommit(autoCommit);
  107.     }
  108.     ...
  109. }
复制代码
 
8.执行SQL语句前后构建数据镜像的源码
(1)AbstractDMLBaseExecutor的doExecute()方法的执行流程
(2)以UpdateExecuto为例构建前后镜像
 
(1)AbstractDMLBaseExecutor的doExecute()方法的执行流程
一.首先设置数据源连接阻止其自动提交事务
二.根据目标SQL语句构建beforeImage前镜像
三.执行目标SQL语句(但还没提交其对应的事务)
四.根据beforeImage前镜像构建afterImage后镜像
五.根据前镜像和后镜像构建UndoLog数据
六.手动提交数据源连接代理的事务
6.png
  1. public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  2.     ...
  3.     //Execute auto commit true t.
  4.     protected T executeAutoCommitTrue(Object[] args) throws Throwable {
  5.         ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  6.         try {
  7.             //修改数据源连接的自动提交事务的设置,此时需要阻止自动提交事务
  8.             connectionProxy.changeAutoCommit();
  9.             return new LockRetryPolicy(connectionProxy).execute(() -> {
  10.                 T result = executeAutoCommitFalse(args);//执行SQL语句
  11.                 connectionProxy.commit();//手动提交本地事务
  12.                 return result;
  13.             });
  14.         } catch (Exception e) {
  15.             // when exception occur in finally,this exception will lost, so just print it here
  16.             LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
  17.             if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
  18.                 connectionProxy.getTargetConnection().rollback();
  19.             }
  20.             throw e;
  21.         } finally {
  22.             connectionProxy.getContext().reset();
  23.             connectionProxy.setAutoCommit(true);
  24.         }
  25.     }
  26.    
  27.     //Execute auto commit false t.
  28.     protected T executeAutoCommitFalse(Object[] args) throws Exception {
  29.         if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
  30.             throw new NotSupportYetException("multi pk only support mysql!");
  31.         }
  32.         //根据目标SQL语句构建beforeImage,表示目标SQL执行前的数据镜像
  33.         TableRecords beforeImage = beforeImage();
  34.         //接下来真正去执行这条SQL语句,但是此时本地事务还不会提交
  35.         T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
  36.         int updateCount = statementProxy.getUpdateCount();
  37.         if (updateCount > 0) {
  38.             //根据beforeImage构建afterImage,表示目标SQL执行后的数据镜像
  39.             TableRecords afterImage = afterImage(beforeImage);
  40.             //根据beforeImage和afterImage准备undoLog数据到数据源连接代理中
  41.             prepareUndoLog(beforeImage, afterImage);
  42.         }
  43.         return result;
  44.     }
  45.     ...
  46. }
复制代码
(2)以UpdateExecutor为例构建前后镜像
  1. public class TableRecords implements java.io.Serializable {
  2.     //表的元数据
  3.     private transient TableMeta tableMeta;
  4.     //表的名称
  5.     private String tableName;
  6.     //表的多行数据
  7.     private List<Row> rows = new ArrayList<Row>();
  8.     ...
  9. }
  10. public class UpdateExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {
  11.     private static final Configuration CONFIG = ConfigurationFactory.getInstance();
  12.     private static final boolean ONLY_CARE_UPDATE_COLUMNS = CONFIG.getBoolean(ConfigurationKeys.TRANSACTION_UNDO_ONLY_CARE_UPDATE_COLUMNS, DefaultValues.DEFAULT_ONLY_CARE_UPDATE_COLUMNS);
  13.    
  14.     //Instantiates a new Update executor.
  15.     public UpdateExecutor(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
  16.         super(statementProxy, statementCallback, sqlRecognizer);
  17.     }
  18.     @Override
  19.     protected TableRecords beforeImage() throws SQLException {
  20.         ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
  21.         TableMeta tmeta = getTableMeta();
  22.         //根据主键ID值拼接一个SQL语句,查询这条数据更新前的镜像
  23.         String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
  24.         return buildTableRecords(tmeta, selectSQL, paramAppenderList);
  25.     }
  26.     private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
  27.         SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
  28.         List<String> updateColumns = recognizer.getUpdateColumns();
  29.         StringBuilder prefix = new StringBuilder("SELECT ");
  30.         StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
  31.         String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
  32.         String orderByCondition = buildOrderCondition(recognizer, paramAppenderList);
  33.         String limitCondition = buildLimitCondition(recognizer, paramAppenderList);
  34.         if (StringUtils.isNotBlank(whereCondition)) {
  35.             suffix.append(WHERE).append(whereCondition);
  36.         }
  37.         if (StringUtils.isNotBlank(orderByCondition)) {
  38.             suffix.append(" ").append(orderByCondition);
  39.         }
  40.         if (StringUtils.isNotBlank(limitCondition)) {
  41.             suffix.append(" ").append(limitCondition);
  42.         }
  43.         suffix.append(" FOR UPDATE");
  44.         StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
  45.         if (ONLY_CARE_UPDATE_COLUMNS) {
  46.             if (!containsPK(updateColumns)) {
  47.                 selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
  48.             }
  49.             for (String columnName : updateColumns) {
  50.                 selectSQLJoin.add(columnName);
  51.             }
  52.             //The on update xxx columns will be auto update by db, so it's also the actually updated columns
  53.             List<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();
  54.             onUpdateColumns.removeAll(updateColumns);
  55.             for (String onUpdateColumn : onUpdateColumns) {
  56.                 selectSQLJoin.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));
  57.             }
  58.         } else {
  59.             for (String columnName : tableMeta.getAllColumns().keySet()) {
  60.                 selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
  61.             }
  62.         }
  63.         return selectSQLJoin.toString();
  64.     }
  65.     @Override
  66.     protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
  67.         TableMeta tmeta = getTableMeta();
  68.         if (beforeImage == null || beforeImage.size() == 0) {
  69.             return TableRecords.empty(getTableMeta());
  70.         }
  71.         String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
  72.         ResultSet rs = null;
  73.         try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
  74.             SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
  75.             rs = pst.executeQuery();
  76.             return TableRecords.buildRecords(tmeta, rs);
  77.         } finally {
  78.             IOUtil.close(rs);
  79.         }
  80.     }
  81.     private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {
  82.         StringBuilder prefix = new StringBuilder("SELECT ");
  83.         String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(tableMeta.getPrimaryKeyOnlyName(), beforeImage.pkRows().size(), getDbType());
  84.         String suffix = " FROM " + getFromTableInSQL() + " WHERE " + whereSql;
  85.         StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
  86.         if (ONLY_CARE_UPDATE_COLUMNS) {
  87.             SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
  88.             List<String> updateColumns = recognizer.getUpdateColumns();
  89.             if (!containsPK(updateColumns)) {
  90.                 selectSQLJoiner.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
  91.             }
  92.             for (String columnName : updateColumns) {
  93.                 selectSQLJoiner.add(columnName);
  94.             }
  95.             //The on update xxx columns will be auto update by db, so it's also the actually updated columns
  96.             List<String> onUpdateColumns = tableMeta.getOnUpdateColumnsOnlyName();
  97.             onUpdateColumns.removeAll(updateColumns);
  98.             for (String onUpdateColumn : onUpdateColumns) {
  99.                 selectSQLJoiner.add(ColumnUtils.addEscape(onUpdateColumn, getDbType()));
  100.             }
  101.         } else {
  102.             for (String columnName : tableMeta.getAllColumns().keySet()) {
  103.                 selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType()));
  104.             }
  105.         }
  106.         return selectSQLJoiner.toString();
  107.     }
  108. }
复制代码
 
9.构建全局锁的key和UndoLog数据的源码
(1)prepareUndoLog()方法会构建全局锁的key和UndoLog数据
(2)构建全局锁的key的源码
(3)构建UndoLog数据的源码
 
(1)prepareUndoLog()方法会构建全局锁的key和UndoLog数据
在基础事务执行器BaseTransactionalExecutor的prepareUndoLog()方法中,会构建全局锁的key和构建UndoLog数据,并把它们设置到数据源连接代理ConnectionProxy中。
7.png
  1. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
  2.     ...
  3.     //prepare undo log.
  4.     //@param beforeImage the before image
  5.     //@param afterImage  the after image
  6.     protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
  7.         if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
  8.             return;
  9.         }
  10.         if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
  11.             if (beforeImage.getRows().size() != afterImage.getRows().size()) {
  12.                 throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
  13.             }
  14.         }
  15.         ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
  16.         TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
  17.         //构建全局锁的key
  18.         //比如更新了一批数据,那么需要针对这批数据的主键ID,来构建这批数据的全局锁的key
  19.         String lockKeys = buildLockKey(lockKeyRecords);
  20.         if (null != lockKeys) {
  21.             //将全局锁key设置到数据源连接代理ConnectionProxy中
  22.             connectionProxy.appendLockKey(lockKeys);
  23.             //构建UndoLog
  24.             SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
  25.             //将UndoLog设置到数据源连接代理ConnectionProxy中
  26.             connectionProxy.appendUndoLog(sqlUndoLog);
  27.         }
  28.     }
  29.     ...
  30. }
复制代码
(2)构建全局锁的key的源码
  1. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
  2.     ...
  3.     //build lockKey
  4.     //@param rowsIncludingPK the records
  5.     //@return the string as local key. the local key example(multi pk): "t_user:1_a,2_b"
  6.     protected String buildLockKey(TableRecords rowsIncludingPK) {
  7.         if (rowsIncludingPK.size() == 0) {
  8.             return null;
  9.         }
  10.         //构建出来的全局锁的key形式为:table_name:id_11001
  11.         StringBuilder sb = new StringBuilder();
  12.         sb.append(rowsIncludingPK.getTableMeta().getTableName());
  13.         sb.append(":");
  14.         int filedSequence = 0;
  15.         //pksRows指的是,更新的每一行数据主键字段和主键的值
  16.         List<Map<String, Field>> pksRows = rowsIncludingPK.pkRows();
  17.         //获取到主键字段名称,主键可能是联合主键,主键字段的名称可能有多个
  18.         List<String> primaryKeysOnlyName = getTableMeta().getPrimaryKeyOnlyName();
  19.         //rowMap就是一行数据,rowMap中的key是字段名称,value是字段值
  20.         for (Map<String, Field> rowMap : pksRows) {
  21.             int pkSplitIndex = 0;
  22.             //遍历和提取这行数据里多个主键字段的名称
  23.             for (String pkName : primaryKeysOnlyName) {
  24.                 if (pkSplitIndex > 0) {
  25.                     sb.append("_");
  26.                 }
  27.                 //获取到多个主键字段的value,然后拼接在一起
  28.                 sb.append(rowMap.get(pkName).getValue());
  29.                 pkSplitIndex++;
  30.             }
  31.             filedSequence++;
  32.             if (filedSequence < pksRows.size()) {
  33.                 sb.append(",");
  34.             }
  35.         }
  36.         //最终拼成的key形如:table_name:1101_aadd,table_name:xxxx_xxx
  37.         return sb.toString();
  38.     }
  39.     ...
  40. }
复制代码
(3)构建UndoLog数据的源码
  1. public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> {
  2.     ...
  3.     //build a SQLUndoLog
  4.     //@param beforeImage the before image
  5.     //@param afterImage  the after image
  6.     protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
  7.         SQLType sqlType = sqlRecognizer.getSQLType();
  8.         String tableName = sqlRecognizer.getTableName();
  9.         SQLUndoLog sqlUndoLog = new SQLUndoLog();
  10.         sqlUndoLog.setSqlType(sqlType);//SQL的类型可能为insert、update、delete
  11.         sqlUndoLog.setTableName(tableName);//表的名称
  12.         sqlUndoLog.setBeforeImage(beforeImage);//SQL执行前的数据镜像
  13.         sqlUndoLog.setAfterImage(afterImage);//SQL执行后的数据镜像
  14.         return sqlUndoLog;
  15.     }
  16.     ...
  17. }
  18. public class SQLUndoLog implements java.io.Serializable {
  19.     private SQLType sqlType;
  20.     private String tableName;
  21.     private TableRecords beforeImage;
  22.     private TableRecords afterImage;
  23.     ...
  24. }
复制代码
 
10.Seata Client发起分支事务注册的源码
(1)ConnectionProxy.commit()提交事务
(2)ConnectionProxy.register()注册分支事务
 
(1)ConnectionProxy.commit()提交事务
执行数据源连接代理ConnectionProxy的commit()方法提交事务的时候,首先会先调用数据源连接代理ConnectionProxy的register()方法注册分支事务。
8.png
  1. public class ConnectionProxy extends AbstractConnectionProxy {
  2.     private final ConnectionContext context = new ConnectionContext();
  3.     ...
  4.     @Override
  5.     public void commit() throws SQLException {
  6.         try {
  7.             //通过全局锁重试策略组件来执行本地事务的提交
  8.             lockRetryPolicy.execute(() -> {
  9.                 doCommit();
  10.                 return null;
  11.             });
  12.         } catch (SQLException e) {
  13.             if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
  14.                 rollback();
  15.             }
  16.             throw e;
  17.         } catch (Exception e) {
  18.             throw new SQLException(e);
  19.         }
  20.     }
  21.    
  22.     private void doCommit() throws SQLException {
  23.         if (context.inGlobalTransaction()) {
  24.             processGlobalTransactionCommit();
  25.         } else if (context.isGlobalLockRequire()) {
  26.             processLocalCommitWithGlobalLocks();
  27.         } else {
  28.             targetConnection.commit();
  29.         }
  30.     }
  31.     private void processLocalCommitWithGlobalLocks() throws SQLException {
  32.         //检查全局锁keys
  33.         checkLock(context.buildLockKeys());
  34.         try {
  35.             //目标数据源连接提交事务
  36.             targetConnection.commit();
  37.         } catch (Throwable ex) {
  38.             throw new SQLException(ex);
  39.         }
  40.         context.reset();
  41.     }
  42.     private void processGlobalTransactionCommit() throws SQLException {
  43.         try {
  44.             //注册分支事务
  45.             register();
  46.         } catch (TransactionException e) {
  47.             recognizeLockKeyConflictException(e, context.buildLockKeys());
  48.         }
  49.         try {
  50.             UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
  51.             //目标数据源连接提交事务
  52.             targetConnection.commit();
  53.         } catch (Throwable ex) {
  54.             LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
  55.             report(false);
  56.             throw new SQLException(ex);
  57.         }
  58.         if (IS_REPORT_SUCCESS_ENABLE) {
  59.             report(true);
  60.         }
  61.         context.reset();
  62.     }
  63.     ...
  64. }
复制代码
(2)ConnectionProxy.register()注册分支事务
执行数据源连接代理ConnectionProxy的register()方法注册分支事务的时候,会调用资源管理器DefaultResourceManager的branchRegister()方法,然后会继续调用AbstractResourceManager的branchRegister()方法来注册分支事务。
 
在AbstractResourceManager的branchRegister()方法中,首先会构造分支事务注册请求,然后通过RmNettyRemotingClient将分支事务注册请求发送给Seata Server。
  1. //The type Connection proxy.
  2. //数据源连接代理
  3. public class ConnectionProxy extends AbstractConnectionProxy {
  4.     private final ConnectionContext context = new ConnectionContext();
  5.     ...
  6.     private void register() throws TransactionException {
  7.         if (!context.hasUndoLog() || !context.hasLockKey()) {
  8.             return;
  9.         }
  10.         //分支事务注册
  11.         Long branchId = DefaultResourceManager.get().branchRegister(
  12.             BranchType.AT,//事务类型
  13.             getDataSourceProxy().getResourceId(),//资源id,资源是已经注册过了的
  14.             null,
  15.             context.getXid(),
  16.             context.getApplicationData(),
  17.             context.buildLockKeys()//注册分支事物时带上全局锁keys
  18.         );
  19.         context.setBranchId(branchId);
  20.     }
  21.     ...
  22. }
  23. public class DefaultResourceManager implements ResourceManager {
  24.     protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();
  25.    
  26.     private static class SingletonHolder {
  27.         private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
  28.     }
  29.    
  30.     public static DefaultResourceManager get() {
  31.         return SingletonHolder.INSTANCE;
  32.     }
  33.    
  34.     private DefaultResourceManager() {
  35.         initResourceManagers();
  36.     }
  37.    
  38.     protected void initResourceManagers() {
  39.         //通过SPI加载所有的ResourceManager资源管理器
  40.         //比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXA
  41.         List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
  42.         if (CollectionUtils.isNotEmpty(allResourceManagers)) {
  43.             for (ResourceManager rm : allResourceManagers) {
  44.                 resourceManagers.put(rm.getBranchType(), rm);
  45.             }
  46.         }
  47.     }
  48.    
  49.     //注册分支事务
  50.     @Override
  51.     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
  52.         return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys);
  53.     }
  54.    
  55.     public ResourceManager getResourceManager(BranchType branchType) {
  56.         ResourceManager rm = resourceManagers.get(branchType);
  57.         if (rm == null) {
  58.             throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());
  59.         }
  60.         return rm;
  61.     }
  62.     ...
  63. }
  64. public abstract class AbstractResourceManager implements ResourceManager {
  65.     ...
  66.     @Override
  67.     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
  68.         try {
  69.             BranchRegisterRequest request = new BranchRegisterRequest();
  70.             request.setXid(xid);//xid是全局事务id
  71.             request.setLockKey(lockKeys);//这次分支事务要更新数据全局锁key
  72.             request.setResourceId(resourceId);//分支事务对应的资源id
  73.             request.setBranchType(branchType);//分支事务类型
  74.             request.setApplicationData(applicationData);//应用数据
  75.             BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
  76.             if (response.getResultCode() == ResultCode.Failed) {
  77.                 throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
  78.             }
  79.             return response.getBranchId();
  80.         } catch (TimeoutException toe) {
  81.             throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
  82.         } catch (RuntimeException rex) {
  83.             throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
  84.         }
  85.     }
  86.     ...
  87. }
复制代码
 
11.Seata Server处理分支事务注册请求的源码
(1)Seata Server收到分支事务注册请求后的处理
(2)BranchRegisterRequest.handle()的处理
(3)DefaultCore.branchRegister()的处理
 
(1)Seata Server收到分支事务注册请求后的处理
Seata Server收到Seata Client发送过来的分支事务注册请求后,首先会将分支事务注册请求交给ServerOnRequestProcessor的process()方法进行处理,然后再将请求交给DefaultCoordinator的onRequest()方法进行处理。
  1. public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
  2.     ...
  3.     @ChannelHandler.Sharable
  4.     class ServerHandler extends ChannelDuplexHandler {
  5.         @Override
  6.         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  7.             if (!(msg instanceof RpcMessage)) {
  8.                 return;
  9.             }
  10.             //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
  11.             processMessage(ctx, (RpcMessage) msg);
  12.         }
  13.     }
  14. }
  15. public abstract class AbstractNettyRemoting implements Disposable {
  16.     ...
  17.     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  18.         if (LOGGER.isDebugEnabled()) {
  19.             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
  20.         }
  21.         Object body = rpcMessage.getBody();
  22.         if (body instanceof MessageTypeAware) {
  23.             MessageTypeAware messageTypeAware = (MessageTypeAware) body;
  24.             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
  25.             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
  26.             //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
  27.             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
  28.             if (pair != null) {
  29.                 if (pair.getSecond() != null) {
  30.                     try {
  31.                         pair.getSecond().execute(() -> {
  32.                             try {
  33.                                 pair.getFirst().process(ctx, rpcMessage);
  34.                             } catch (Throwable th) {
  35.                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  36.                             } finally {
  37.                                 MDC.clear();
  38.                             }
  39.                         });
  40.                     } catch (RejectedExecutionException e) {
  41.                         ...
  42.                     }
  43.                 } else {
  44.                     try {
  45.                         pair.getFirst().process(ctx, rpcMessage);
  46.                     } catch (Throwable th) {
  47.                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  48.                     }
  49.                 }
  50.             } else {
  51.                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
  52.             }
  53.         } else {
  54.             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
  55.         }
  56.     }
  57.     ...
  58. }
  59. public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
  60.     private final RemotingServer remotingServer;
  61.     ...
  62.     @Override
  63.     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  64.         if (ChannelManager.isRegistered(ctx.channel())) {
  65.             onRequestMessage(ctx, rpcMessage);
  66.         } else {
  67.             try {
  68.                 if (LOGGER.isInfoEnabled()) {
  69.                     LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
  70.                 }
  71.                 ctx.disconnect();
  72.                 ctx.close();
  73.             } catch (Exception exx) {
  74.                 LOGGER.error(exx.getMessage());
  75.             }
  76.             if (LOGGER.isInfoEnabled()) {
  77.                 LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
  78.             }
  79.         }
  80.     }
  81.    
  82.     private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
  83.         Object message = rpcMessage.getBody();
  84.         //RpcContext线程本地变量副本
  85.         RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
  86.         if (LOGGER.isDebugEnabled()) {
  87.             LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
  88.         } else {
  89.             try {
  90.                 BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
  91.             } catch (InterruptedException e) {
  92.                 LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
  93.             }
  94.         }
  95.         if (!(message instanceof AbstractMessage)) {
  96.             return;
  97.         }
  98.         //the batch send request message
  99.         if (message instanceof MergedWarpMessage) {
  100.             ...
  101.         } else {
  102.             //the single send request message
  103.             final AbstractMessage msg = (AbstractMessage) message;
  104.             //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage
  105.             //此时传入的msg其实就是客户端发送请求时的BranchRegisterRequest对象
  106.             AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
  107.             //返回响应给客户端
  108.             remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
  109.         }
  110.     }
  111.     ...
  112. }
  113. public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
  114.     ...
  115.     @Override
  116.     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
  117.         if (!(request instanceof AbstractTransactionRequestToTC)) {
  118.             throw new IllegalArgumentException();
  119.         }
  120.         //此时传入的request其实就是客户端发送请求时的BranchRegisterRequest对象
  121.         AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
  122.         transactionRequest.setTCInboundHandler(this);
  123.         return transactionRequest.handle(context);
  124.     }
  125.     ...
  126. }
复制代码
(2)BranchRegisterRequest.handle()的处理
在DefaultCoordinator的onRequest()方法中,会调用BranchRegisterRequest的handle()方法来处理分支事务注册请求,该handle()方法又会调用DefaultCoordinator的doBranchRegister()方法,所以最后会调用DefaultCore的branchRegister()方法来具体处理分支事务注册请求。
  1. public class BranchRegisterRequest extends AbstractTransactionRequestToTC  {
  2.     ...
  3.     @Override
  4.     public AbstractTransactionResponse handle(RpcContext rpcContext) {
  5.         return handler.handle(this, rpcContext);
  6.     }
  7.     ...
  8. }
  9. public interface TCInboundHandler {
  10.     ...
  11.     //Handle branch register response.
  12.     BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);
  13. }
  14. public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
  15.     ...
  16.     @Override
  17.     public BranchRegisterResponse handle(BranchRegisterRequest request, final RpcContext rpcContext) {
  18.         BranchRegisterResponse response = new BranchRegisterResponse();
  19.         exceptionHandleTemplate(new AbstractCallback<BranchRegisterRequest, BranchRegisterResponse>() {
  20.             @Override
  21.             public void execute(BranchRegisterRequest request, BranchRegisterResponse response) throws TransactionException {
  22.                 try {
  23.                     doBranchRegister(request, response, rpcContext);
  24.                 } catch (StoreException e) {
  25.                     throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("branch register request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
  26.                 }
  27.             }
  28.         }, request, response);
  29.         return response;
  30.     }
  31.    
  32.     //Do branch register.
  33.     protected abstract void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException;
  34.     ...
  35. }
  36. public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
  37.     private final DefaultCore core;
  38.     ...
  39.     @Override
  40.     protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException {
  41.         MDC.put(RootContext.MDC_KEY_XID, request.getXid());
  42.         //调用DefaultCore的branchRegister()方法处理分支事务注册请求
  43.         response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(), request.getXid(), request.getApplicationData(), request.getLockKey()));
  44.     }
  45.     ...
  46. }
复制代码
(3)DefaultCore.branchRegister()的处理
DefaultCore的branchRegister()方法其实会继续调用其抽象父类AbstractCore的branchRegister()方法来处理注册分支事务请求,具体的过程如下:
 
一.根据xid获取全局事务会话
二.根据全局事务会话创建分支事务会话
三.通过MDC将分支事务ID存到线程本地变量副本
四.注册分支事务需要先获取全局锁
五.把分支事务会话加入到全局事务会话中并持久化
9.png
  1. public class DefaultCore implements Core {
  2.     private static Map<BranchType, AbstractCore> coreMap = new ConcurrentHashMap<>();
  3.    
  4.     public DefaultCore(RemotingServer remotingServer) {
  5.         List allCore = EnhancedServiceLoader.loadAll(AbstractCore.class, new Class[] {RemotingServer.class}, new Object[] {remotingServer});
  6.         if (CollectionUtils.isNotEmpty(allCore)) {
  7.             for (AbstractCore core : allCore) {
  8.                 coreMap.put(core.getHandleBranchType(), core);
  9.             }
  10.         }
  11.     }
  12.    
  13.     @Override
  14.     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
  15.         return getCore(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys);
  16.     }
  17.    
  18.     public AbstractCore getCore(BranchType branchType) {
  19.         AbstractCore core = coreMap.get(branchType);
  20.         if (core == null) {
  21.             throw new NotSupportYetException("unsupported type:" + branchType.name());
  22.         }
  23.         return core;
  24.     }
  25.     ...
  26. }
  27. public abstract class AbstractCore implements Core {
  28.     protected RemotingServer remotingServer;
  29.    
  30.     public AbstractCore(RemotingServer remotingServer) {
  31.         if (remotingServer == null) {
  32.             throw new IllegalArgumentException("remotingServer must be not null");
  33.         }
  34.         this.remotingServer = remotingServer;
  35.     }
  36.    
  37.     //注册分支事务
  38.     @Override
  39.     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
  40.         //1.根据xid获取全局事务会话GlobalSession
  41.         GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
  42.         return SessionHolder.lockAndExecute(globalSession, () -> {
  43.             globalSessionStatusCheck(globalSession);
  44.             globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  45.             //2.创建分支事务会话BranchSession,根据全局事务开启一个分支事务
  46.             //传入的参数依次是:全局事务会话、事务类型、资源ID、应用数据、全局锁keys、客户端ID
  47.             BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId, applicationData, lockKeys, clientId);
  48.             //3.把分支事务的ID存放到线程本地变量副本中,也就是MDC中
  49.             MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
  50.             //4.注册分支事务时会获取全局锁
  51.             //分支事务会话branchSession尝试获取一个全局锁,获取失败会抛异常,说明分支事务注册失败
  52.             branchSessionLock(globalSession, branchSession);
  53.             try {
  54.                 //5.把分支事务会话加入到全局事务会话中
  55.                 globalSession.addBranch(branchSession);
  56.             } catch (RuntimeException ex) {
  57.                 branchSessionUnlock(branchSession);
  58.                 throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()), ex);
  59.             }
  60.             if (LOGGER.isInfoEnabled()) {
  61.                 LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}", globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
  62.             }
  63.             return branchSession.getBranchId();
  64.         });
  65.     }
  66.    
  67.     private GlobalSession assertGlobalSessionNotNull(String xid, boolean withBranchSessions) throws TransactionException {
  68.         //根据xid寻找全局事务会话GlobalSession
  69.         GlobalSession globalSession = SessionHolder.findGlobalSession(xid, withBranchSessions);
  70.         if (globalSession == null) {
  71.             throw new GlobalTransactionException(TransactionExceptionCode.GlobalTransactionNotExist, String.format("Could not found global transaction xid = %s, may be has finished.", xid));
  72.         }
  73.         return globalSession;
  74.     }
  75.    
  76.     //获取全局锁,获取全局锁失败则抛异常
  77.     protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
  78.     }
  79.     ...
  80. }
  81. public class SessionHolder {
  82.     ...
  83.     //根据xid获取全局事务会话GlobalSession
  84.     public static GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {
  85.         return getRootSessionManager().findGlobalSession(xid, withBranchSessions);
  86.     }
  87.     ...
  88. }
  89. @LoadLevel(name = "db", scope = Scope.PROTOTYPE)
  90. public class DataBaseSessionManager extends AbstractSessionManager implements Initialize {
  91.     ...
  92.     //根据xid获取全局事务会话GlobalSession
  93.     @Override
  94.     public GlobalSession findGlobalSession(String xid, boolean withBranchSessions) {
  95.         return transactionStoreManager.readSession(xid, withBranchSessions);
  96.     }
  97.     ...
  98. }
  99. public class DataBaseTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {
  100.     ...
  101.     //根据xid获取全局事务会话GlobalSession
  102.     @Override
  103.     public GlobalSession readSession(String xid, boolean withBranchSessions) {
  104.         //global transaction
  105.         GlobalTransactionDO globalTransactionDO = logStore.queryGlobalTransactionDO(xid);
  106.         if (globalTransactionDO == null) {
  107.             return null;
  108.         }
  109.         //branch transactions
  110.         List<BranchTransactionDO> branchTransactionDOs = null;
  111.         //reduce rpc with db when branchRegister and getGlobalStatus
  112.         if (withBranchSessions) {
  113.             branchTransactionDOs = logStore.queryBranchTransactionDO(globalTransactionDO.getXid());
  114.         }
  115.         return getGlobalSession(globalTransactionDO, branchTransactionDOs);
  116.     }
  117.     ...
  118. }
  119. public class SessionHelper {
  120.     ...
  121.     //创建分支事务会话
  122.     public static BranchSession newBranchByGlobal(GlobalSession globalSession, BranchType branchType, String resourceId, String applicationData, String lockKeys, String clientId) {
  123.         BranchSession branchSession = new BranchSession();
  124.         branchSession.setXid(globalSession.getXid());
  125.         branchSession.setTransactionId(globalSession.getTransactionId());
  126.         branchSession.setBranchId(UUIDGenerator.generateUUID());
  127.         branchSession.setBranchType(branchType);
  128.         branchSession.setResourceId(resourceId);
  129.         branchSession.setLockKey(lockKeys);
  130.         branchSession.setClientId(clientId);
  131.         branchSession.setApplicationData(applicationData);
  132.         return branchSession;
  133.     }
  134.     ...
  135. }
  136. public class GlobalSession implements SessionLifecycle, SessionStorable {
  137.     private List<BranchSession> branchSessions;
  138.     ...
  139.     //把分支事务会话加入到全局事务会话中
  140.     @Override
  141.     public void addBranch(BranchSession branchSession) throws TransactionException {
  142.         for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
  143.             lifecycleListener.onAddBranch(this, branchSession);
  144.         }
  145.         branchSession.setStatus(BranchStatus.Registered);
  146.         add(branchSession);
  147.     }
  148.    
  149.     //把分支事务会话加入到全局事务会话中
  150.     public boolean add(BranchSession branchSession) {
  151.         if (null != branchSessions) {
  152.             return branchSessions.add(branchSession);
  153.         } else {
  154.             //db and redis no need to deal with
  155.             return true;
  156.         }
  157.     }
  158.     ...
  159. }
  160. public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
  161.     ...
  162.     @Override
  163.     public void onAddBranch(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
  164.         addBranchSession(globalSession, branchSession);
  165.     }
  166.    
  167.     @Override
  168.     public void addBranchSession(GlobalSession session, BranchSession branchSession) throws TransactionException {
  169.         if (LOGGER.isDebugEnabled()) {
  170.             LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, branchSession, LogOperation.BRANCH_ADD);
  171.         }
  172.         writeSession(LogOperation.BRANCH_ADD, branchSession);
  173.     }
  174.    
  175.     //持久化全局事务会话
  176.     private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
  177.         //transactionStoreManager.writeSession()会对全局事务会话进行持久化
  178.         if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
  179.             ...
  180.         }
  181.     }
  182.     ...
  183. }
复制代码
 
12.将UndoLog写入到数据库与提交事务的源码
在数据源连接代理ConnectionProxy的processGlobalTransactionCommit()方法中:
一.首先会注册完分支事务
二.然后会将UndoLog写入到数据库
三.最后才提交目标数据源连接的事务
10.png
  1. //数据源连接代理
  2. public class ConnectionProxy extends AbstractConnectionProxy {
  3.     private final LockRetryPolicy lockRetryPolicy = new LockRetryPolicy(this);
  4.     ...
  5.     @Override
  6.     public void commit() throws SQLException {
  7.         try {
  8.             //通过全局锁重试策略组件LockRetryPolicy来执行本地事务的提交
  9.             lockRetryPolicy.execute(() -> {
  10.                 doCommit();
  11.                 return null;
  12.             });
  13.         } catch (SQLException e) {
  14.             if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
  15.                 rollback();
  16.             }
  17.             throw e;
  18.         } catch (Exception e) {
  19.             throw new SQLException(e);
  20.         }
  21.     }
  22.    
  23.     private void doCommit() throws SQLException {
  24.         if (context.inGlobalTransaction()) {
  25.             processGlobalTransactionCommit();
  26.         } else if (context.isGlobalLockRequire()) {
  27.             processLocalCommitWithGlobalLocks();
  28.         } else {
  29.             targetConnection.commit();
  30.         }
  31.     }
  32.    
  33.     private void processLocalCommitWithGlobalLocks() throws SQLException {
  34.         //检查全局锁keys
  35.         checkLock(context.buildLockKeys());
  36.         try {
  37.             //目标数据源连接提交事务
  38.             targetConnection.commit();
  39.         } catch (Throwable ex) {
  40.             throw new SQLException(ex);
  41.         }
  42.         context.reset();
  43.     }
  44.    
  45.     private void processGlobalTransactionCommit() throws SQLException {
  46.         try {
  47.             //1.注册分支事务
  48.             register();
  49.         } catch (TransactionException e) {
  50.             recognizeLockKeyConflictException(e, context.buildLockKeys());
  51.         }
  52.         try {
  53.             //2.将UndoLog写入到数据库
  54.             UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
  55.             //3.目标数据源连接提交事务
  56.             targetConnection.commit();
  57.         } catch (Throwable ex) {
  58.             LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
  59.             report(false);
  60.             throw new SQLException(ex);
  61.         }
  62.         if (IS_REPORT_SUCCESS_ENABLE) {
  63.             report(true);
  64.         }
  65.         context.reset();
  66.     }
  67.     ...
  68. }
  69. public class UndoLogManagerFactory {
  70.     private static final Map<String, UndoLogManager> UNDO_LOG_MANAGER_MAP = new ConcurrentHashMap<>();
  71.     //获取UndoLog管理器
  72.     public static UndoLogManager getUndoLogManager(String dbType) {
  73.         return CollectionUtils.computeIfAbsent(UNDO_LOG_MANAGER_MAP, dbType,
  74.             key -> EnhancedServiceLoader.load(UndoLogManager.class, dbType));
  75.     }
  76. }
  77. public abstract class AbstractUndoLogManager implements UndoLogManager {
  78.     ...
  79.     @Override
  80.     public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
  81.         ConnectionContext connectionContext = cp.getContext();
  82.         if (!connectionContext.hasUndoLog()) {
  83.             return;
  84.         }
  85.         String xid = connectionContext.getXid();
  86.         long branchId = connectionContext.getBranchId();
  87.         BranchUndoLog branchUndoLog = new BranchUndoLog();
  88.         branchUndoLog.setXid(xid);
  89.         branchUndoLog.setBranchId(branchId);
  90.         branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
  91.         UndoLogParser parser = UndoLogParserFactory.getInstance();
  92.         byte[] undoLogContent = parser.encode(branchUndoLog);
  93.         if (LOGGER.isDebugEnabled()) {
  94.             LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
  95.         }
  96.         CompressorType compressorType = CompressorType.NONE;
  97.         if (needCompress(undoLogContent)) {
  98.             compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
  99.             undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
  100.         }
  101.         //插入UndoLog到数据库
  102.         insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
  103.     }
  104.    
  105.     //insert uodo log when normal
  106.     protected abstract void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, Connection conn) throws SQLException;
  107.     ...
  108. }
  109. @LoadLevel(name = JdbcConstants.MYSQL)
  110. public class MySQLUndoLogManager extends AbstractUndoLogManager {
  111.     ...
  112.     @Override
  113.     protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, Connection conn) throws SQLException {
  114.         insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);
  115.     }
  116.    
  117.     private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, State state, Connection conn) throws SQLException {
  118.         try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
  119.             pst.setLong(1, branchId);
  120.             pst.setString(2, xid);
  121.             pst.setString(3, rollbackCtx);
  122.             pst.setBytes(4, undoLogContent);
  123.             pst.setInt(5, state.getValue());
  124.             pst.executeUpdate();
  125.         } catch (Exception e) {
  126.             if (!(e instanceof SQLException)) {
  127.                 e = new SQLException(e);
  128.             }
  129.             throw (SQLException) e;
  130.         }
  131.     }
  132.     ...
  133. }
复制代码
 
13.通过全局锁重试策略组件执行事务的提交
当设置完禁止自动提交事务、构建前镜像、执行SQL、构建后镜像,执行到数据源连接代理ConnectionProxy的commit()方法提交本地事务时,便会通过全局锁重试策略LockRetryPolicy来执行本地事务的提交。
 
全局锁重试策略LockRetryPolicy,会确保先获取到全局锁才提交本地事务。也就是如果获取不到全局锁,则重试获取。此外,注册分支事务时,获取到全局锁才能注册成功。
  1. public class ConnectionProxy extends AbstractConnectionProxy {
  2.     private final LockRetryPolicy lockRetryPolicy = new LockRetryPolicy(this);
  3.     ...
  4.     @Override
  5.     public void commit() throws SQLException {
  6.         try {
  7.             //通过全局锁重试策略组件LockRetryPolicy来执行本地事务的提交
  8.             lockRetryPolicy.execute(() -> {
  9.                 doCommit();
  10.                 return null;
  11.             });
  12.         } catch (SQLException e) {
  13.             if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
  14.                 rollback();
  15.             }
  16.             throw e;
  17.         } catch (Exception e) {
  18.             throw new SQLException(e);
  19.         }
  20.     }
  21.     ...
  22.     public static class LockRetryPolicy {
  23.         protected static final boolean LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = ConfigurationFactory.getInstance().
  24.             getBoolean(ConfigurationKeys.CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT, DEFAULT_CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT);
  25.         protected final ConnectionProxy connection;
  26.         public LockRetryPolicy(ConnectionProxy connection) {
  27.             this.connection = connection;
  28.         }
  29.         public <T> T execute(Callable<T> callable) throws Exception {
  30.             //the only case that not need to retry acquire lock hear is
  31.             //LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == true && connection#autoCommit == true
  32.             //because it has retry acquire lock when AbstractDMLBaseExecutor#executeAutoCommitTrue
  33.             if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT && connection.getContext().isAutoCommitChanged()) {
  34.                 //不需要重试
  35.                 return callable.call();
  36.             } else {
  37.                 //LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == false
  38.                 //or LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == true && autoCommit == false
  39.                 return doRetryOnLockConflict(callable);
  40.             }
  41.         }
  42.         protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
  43.             LockRetryController lockRetryController = new LockRetryController();
  44.             while (true) {
  45.                 try {
  46.                     return callable.call();
  47.                 } catch (LockConflictException lockConflict) {
  48.                     onException(lockConflict);
  49.                     //AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is released
  50.                     if (connection.getContext().isAutoCommitChanged() && lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
  51.                         lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);
  52.                     }
  53.                     //休眠一会再去重试
  54.                     lockRetryController.sleep(lockConflict);
  55.                 } catch (Exception e) {
  56.                     onException(e);
  57.                     throw e;
  58.                 }
  59.             }
  60.         }
  61.      
  62.         //Callback on exception in doLockRetryOnConflict.
  63.         protected void onException(Exception e) throws Exception {
  64.         }
  65.     }
  66.     ...
  67. }
  68. public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {
  69.     ...
  70.     private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy {
  71.         LockRetryPolicy(final ConnectionProxy connection) {
  72.             super(connection);
  73.         }
  74.         @Override
  75.         public <T> T execute(Callable<T> callable) throws Exception {
  76.             if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {
  77.                 return doRetryOnLockConflict(callable);
  78.             } else {
  79.                 return callable.call();
  80.             }
  81.         }
  82.         @Override
  83.         protected void onException(Exception e) throws Exception {
  84.             ConnectionContext context = connection.getContext();
  85.             //UndoItems can't use the Set collection class to prevent ABA
  86.             context.removeSavepoint(null);
  87.             //回滚目标数据源连接对SQL的执行
  88.             connection.getTargetConnection().rollback();
  89.         }
  90.         public static boolean isLockRetryPolicyBranchRollbackOnConflict() {
  91.             return LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT;
  92.         }
  93.     }
  94.     ...
  95. }
复制代码
 
14.注册分支事务时获取全局锁的入口源码
在Seata Server中,只有当全局锁获取成功后,分支事务才能注册成功。AbstractCore的branchRegister()方法会通过调用ATCore的branchSessionLock()方法来获取全局锁,而ATCore的branchSessionLock()方法最终则是靠调用AbstractLockManager的acquireLock()方法来尝试获取全局锁的。获取全局锁失败会抛出异常,说明注册分支事务失败。
  1. public abstract class AbstractCore implements Core {
  2.     ...
  3.     //注册分支事务
  4.     @Override
  5.     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
  6.         //1.根据xid获取全局事务会话GlobalSession
  7.         GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
  8.         return SessionHolder.lockAndExecute(globalSession, () -> {
  9.             globalSessionStatusCheck(globalSession);
  10.             globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  11.             //2.创建分支事务会话,根据全局事务开启一个分支事务
  12.             //传入的参数依次是:全局事务会话、事务类型、资源ID、应用数据、全局锁keys、客户端ID
  13.             BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId, applicationData, lockKeys, clientId);
  14.             //3.把分支事务的ID存放到线程本地变量副本中,也就是MDC中
  15.             MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
  16.             //4.注册分支事务时会加全局锁
  17.             //分支事务会话branchSession尝试获取一个全局锁,获取失败会抛异常,说明分支事务注册失败
  18.             branchSessionLock(globalSession, branchSession);
  19.             try {
  20.                 //5.把分支事务会话加入到全局事务会话中
  21.                 globalSession.addBranch(branchSession);
  22.             } catch (RuntimeException ex) {
  23.                 branchSessionUnlock(branchSession);
  24.                 throw new BranchTransactionException(FailedToAddBranch, String.format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()), ex);
  25.             }
  26.             if (LOGGER.isInfoEnabled()) {
  27.                 LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}", globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
  28.             }
  29.             return branchSession.getBranchId();
  30.         });
  31.     }
  32.     ...
  33. }
  34. public class ATCore extends AbstractCore {
  35.     ...
  36.     @Override
  37.     protected void branchSessionLock(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
  38.         //从应用程序数据里提取出一些属性进行属性赋值
  39.         String applicationData = branchSession.getApplicationData();
  40.         boolean autoCommit = true;
  41.         boolean skipCheckLock = false;
  42.         if (StringUtils.isNotBlank(applicationData)) {
  43.             if (objectMapper == null) {
  44.                 objectMapper = new ObjectMapper();
  45.             }
  46.             try {
  47.                 //ObjectMapper是一个对象映射框架,它可以把ApplicationData对象里的属性值读取出来,然后写入到HashMap里
  48.                 Map<String, Object> data = objectMapper.readValue(applicationData, HashMap.class);
  49.                 Object clientAutoCommit = data.get(AUTO_COMMIT);
  50.                 if (clientAutoCommit != null && !(boolean)clientAutoCommit) {
  51.                     autoCommit = (boolean)clientAutoCommit;
  52.                 }
  53.                 Object clientSkipCheckLock = data.get(SKIP_CHECK_LOCK);
  54.                 if (clientSkipCheckLock instanceof Boolean) {
  55.                     skipCheckLock = (boolean)clientSkipCheckLock;
  56.                 }
  57.             } catch (IOException e) {
  58.                 LOGGER.error("failed to get application data: {}", e.getMessage(), e);
  59.             }
  60.         }
  61.         try {
  62.             //分支事务会话branchSession尝试获取一个全局锁,获取失败会抛异常,说明分支事务注册失败
  63.             if (!branchSession.lock(autoCommit, skipCheckLock)) {
  64.                 throw new BranchTransactionException(LockKeyConflict, String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()));
  65.             }
  66.         } catch (StoreException e) {
  67.             if (e.getCause() instanceof BranchTransactionException) {
  68.                 throw new BranchTransactionException(((BranchTransactionException)e.getCause()).getCode(), String.format("Global lock acquire failed xid = %s branchId = %s", globalSession.getXid(), branchSession.getBranchId()));
  69.             }
  70.             throw e;
  71.         }
  72.     }
  73.     ...
  74. }
  75. public class BranchSession implements Lockable, Comparable<BranchSession>, SessionStorable {
  76.     ...
  77.     public boolean lock(boolean autoCommit, boolean skipCheckLock) throws TransactionException {
  78.         if (this.getBranchType().equals(BranchType.AT)) {
  79.             //尝试获取全局锁
  80.             return LockerManagerFactory.getLockManager().acquireLock(this, autoCommit, skipCheckLock);
  81.         }
  82.         return true;
  83.     }
  84.     ...
  85. }
  86. public class LockerManagerFactory {
  87.     private static final Configuration CONFIG = ConfigurationFactory.getInstance();
  88.     private static volatile LockManager LOCK_MANAGER;
  89.    
  90.     public static LockManager getLockManager() {
  91.         if (LOCK_MANAGER == null) {
  92.             init();
  93.         }
  94.         return LOCK_MANAGER;
  95.     }
  96.    
  97.     public static void init() {
  98.         init(null);
  99.     }
  100.    
  101.     public static void init(String lockMode) {
  102.         if (LOCK_MANAGER == null) {
  103.             synchronized (LockerManagerFactory.class) {
  104.                 if (LOCK_MANAGER == null) {
  105.                     if (StringUtils.isBlank(lockMode)) {
  106.                         lockMode = CONFIG.getConfig(ConfigurationKeys.STORE_LOCK_MODE, CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
  107.                     }
  108.                     if (StoreMode.contains(lockMode)) {
  109.                         LOCK_MANAGER = EnhancedServiceLoader.load(LockManager.class, lockMode);
  110.                     }
  111.                 }
  112.             }
  113.         }
  114.     }
  115. }
  116. public abstract class AbstractLockManager implements LockManager {
  117.     ...
  118.     @Override
  119.     public boolean acquireLock(BranchSession branchSession, boolean autoCommit, boolean skipCheckLock) throws TransactionException {
  120.         if (branchSession == null) {
  121.             throw new IllegalArgumentException("branchSession can't be null for memory/file locker.");
  122.         }
  123.         String lockKey = branchSession.getLockKey();
  124.         if (StringUtils.isNullOrEmpty(lockKey)) {
  125.             //no lock
  126.             return true;
  127.         }
  128.         //get locks of branch
  129.         //获取到分支事务里需要的所有行锁
  130.         List<RowLock> locks = collectRowLocks(branchSession);
  131.         if (CollectionUtils.isEmpty(locks)) {
  132.             //no lock
  133.             return true;
  134.         }
  135.         //具体进行获取锁
  136.         return getLocker(branchSession).acquireLock(locks, autoCommit, skipCheckLock);
  137.     }
  138.    
  139.     @Override
  140.     public List<RowLock> collectRowLocks(BranchSession branchSession) {
  141.         if (branchSession == null || StringUtils.isBlank(branchSession.getLockKey())) {
  142.             return Collections.emptyList();
  143.         }
  144.         String lockKey = branchSession.getLockKey();
  145.         String resourceId = branchSession.getResourceId();
  146.         String xid = branchSession.getXid();
  147.         long transactionId = branchSession.getTransactionId();
  148.         long branchId = branchSession.getBranchId();
  149.         return collectRowLocks(lockKey, resourceId, xid, transactionId, branchId);
  150.     }
  151.    
  152.     protected List<RowLock> collectRowLocks(String lockKey, String resourceId, String xid, Long transactionId, Long branchID) {
  153.         List<RowLock> locks = new ArrayList<>();
  154.         String[] tableGroupedLockKeys = lockKey.split(";");
  155.         for (String tableGroupedLockKey : tableGroupedLockKeys) {
  156.             int idx = tableGroupedLockKey.indexOf(":");
  157.             if (idx < 0) {
  158.                 return locks;
  159.             }
  160.             String tableName = tableGroupedLockKey.substring(0, idx);
  161.             String mergedPKs = tableGroupedLockKey.substring(idx + 1);
  162.             if (StringUtils.isBlank(mergedPKs)) {
  163.                 return locks;
  164.             }
  165.             String[] pks = mergedPKs.split(",");
  166.             if (pks == null || pks.length == 0) {
  167.                 return locks;
  168.             }
  169.             for (String pk : pks) {
  170.                 if (StringUtils.isNotBlank(pk)) {
  171.                     RowLock rowLock = new RowLock();
  172.                     rowLock.setXid(xid);
  173.                     rowLock.setTransactionId(transactionId);
  174.                     rowLock.setBranchId(branchID);
  175.                     rowLock.setTableName(tableName);
  176.                     rowLock.setPk(pk);
  177.                     rowLock.setResourceId(resourceId);
  178.                     locks.add(rowLock);
  179.                 }
  180.             }
  181.         }
  182.         return locks;
  183.     }
  184.     ...
  185. }
  186. public class RowLock {
  187.     private String xid;//全局事务xid
  188.     private Long transactionId;//全局事务ID   
  189.     private Long branchId;//分支事务ID   
  190.     private String resourceId;//资源ID
  191.     private String tableName;//表名称
  192.     private String pk;//主键   
  193.     private String rowKey;//行键
  194.     private String feature;//功能特性
  195.     ...
  196. }
复制代码
 
15.Seata Server获取全局锁的具体逻辑源码
调用AbstractLockManager的acquireLock()方法获取全局锁时,其实调用的是DataBaseLocker的acquireLock()方法 -> LockStoreDataBaseDAO的acquireLock()方法。
 
在LockStoreDataBaseDAO的acquireLock()方法中,首先会查询数据库中是否存在要申请的全局锁的记录,然后根据这些锁记录 + xid判断是否由当前全局事务获取的(这是核心)。
 
如果不是,则说明其他全局事务先获取到了要申请的全局锁,此时当前事务获取全局锁失败。
 
如果是,则把当前事务已经获取过的全局锁过滤出来,然后尝试写入当前分支事务还需获取的全局锁记录到数据库。如果写入成功,则表示当前分支事务成功获取到全局锁。如果写入失败,则表示其他分支事务已经获取到全局锁。
  1. @LoadLevel(name = "db")
  2. public class DataBaseLockManager extends AbstractLockManager implements Initialize {
  3.     private Locker locker;
  4.    
  5.     @Override
  6.     public void init() {
  7.         //init dataSource
  8.         String datasourceType = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
  9.         DataSource lockStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
  10.         locker = new DataBaseLocker(lockStoreDataSource);
  11.     }
  12.     @Override
  13.     public Locker getLocker(BranchSession branchSession) {
  14.         return locker;
  15.     }
  16.     ...
  17. }
  18. public class DataBaseLocker extends AbstractLocker {
  19.     private LockStore lockStore;
  20.    
  21.     public DataBaseLocker(DataSource logStoreDataSource) {
  22.         lockStore = new LockStoreDataBaseDAO(logStoreDataSource);
  23.     }
  24.     ...
  25.    
  26.     @Override
  27.     public boolean acquireLock(List<RowLock> locks, boolean autoCommit, boolean skipCheckLock) {
  28.         if (CollectionUtils.isEmpty(locks)) {
  29.             //no lock
  30.             return true;
  31.         }
  32.         try {
  33.             //通过执行MySQL来获取全局锁
  34.             return lockStore.acquireLock(convertToLockDO(locks), autoCommit, skipCheckLock);
  35.         } catch (StoreException e) {
  36.             throw e;
  37.         } catch (Exception t) {
  38.             LOGGER.error("AcquireLock error, locks:{}", CollectionUtils.toString(locks), t);
  39.             return false;
  40.         }
  41.     }
  42.     ...
  43. }
  44. public class LockStoreDataBaseDAO implements LockStore {
  45.     ...
  46.     @Override
  47.     public boolean acquireLock(List<LockDO> lockDOs, boolean autoCommit, boolean skipCheckLock) {
  48.         //数据库操作三剑客:连接、句柄、结果
  49.         Connection conn = null;
  50.         PreparedStatement ps = null;
  51.         ResultSet rs = null;
  52.         Set<String> dbExistedRowKeys = new HashSet<>();
  53.         boolean originalAutoCommit = true;
  54.         if (lockDOs.size() > 1) {
  55.             lockDOs = lockDOs.stream().filter(LambdaUtils.distinctByKey(LockDO::getRowKey)).collect(Collectors.toList());
  56.         }
  57.         try {
  58.             //从全局锁数据源里获取到一个连接
  59.             conn = lockStoreDataSource.getConnection();
  60.             //关闭自动提交事务
  61.             if (originalAutoCommit = conn.getAutoCommit()) {
  62.                 conn.setAutoCommit(false);
  63.             }
  64.             //需要获取的锁,有可能多个
  65.             List<LockDO> unrepeatedLockDOs = lockDOs;
  66.             //check lock
  67.             if (!skipCheckLock) {
  68.                 boolean canLock = true;
  69.                 //query,针对全局锁表查询某个数据加了全局锁的全局事务xid
  70.                 //LockStoreSqlFactory是全局锁存储的SQL工厂
  71.                 String checkLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getCheckLockableSql(lockTable, lockDOs.size());
  72.                 ps = conn.prepareStatement(checkLockSQL);
  73.                 for (int i = 0; i < lockDOs.size(); i++) {
  74.                     ps.setString(i + 1, lockDOs.get(i).getRowKey());
  75.                 }
  76.                 //执行查询
  77.                 rs = ps.executeQuery();
  78.                 //获取到当前要加全局锁的事务xid
  79.                 String currentXID = lockDOs.get(0).getXid();
  80.                 boolean failFast = false;
  81.                 //如果查询到的结果rs是空,则表示当前全局锁没有被事务获取占用
  82.                 while (rs.next()) {
  83.                     String dbXID = rs.getString(ServerTableColumnsName.LOCK_TABLE_XID);
  84.                     //如果获取到全局锁的是别的全局事务xid,那么获取全局锁失败,设置canLock为false
  85.                     if (!StringUtils.equals(dbXID, currentXID)) {
  86.                         if (LOGGER.isInfoEnabled()) {
  87.                             String dbPk = rs.getString(ServerTableColumnsName.LOCK_TABLE_PK);
  88.                             String dbTableName = rs.getString(ServerTableColumnsName.LOCK_TABLE_TABLE_NAME);
  89.                             long dbBranchId = rs.getLong(ServerTableColumnsName.LOCK_TABLE_BRANCH_ID);
  90.                             LOGGER.info("Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID, dbBranchId);
  91.                         }
  92.                         if (!autoCommit) {
  93.                             int status = rs.getInt(ServerTableColumnsName.LOCK_TABLE_STATUS);
  94.                             if (status == LockStatus.Rollbacking.getCode()) {
  95.                                 failFast = true;
  96.                             }
  97.                         }
  98.                         canLock = false;
  99.                         break;
  100.                     }
  101.                     dbExistedRowKeys.add(rs.getString(ServerTableColumnsName.LOCK_TABLE_ROW_KEY));
  102.                 }
  103.                 if (!canLock) {
  104.                     conn.rollback();
  105.                     if (failFast) {
  106.                         throw new StoreException(new BranchTransactionException(LockKeyConflictFailFast));
  107.                     }
  108.                     return false;
  109.                 }
  110.                 //If the lock has been exists in db, remove it from the lockDOs
  111.                 if (CollectionUtils.isNotEmpty(dbExistedRowKeys)) {
  112.                     //过滤当前事务已经获取过的全局锁
  113.                     unrepeatedLockDOs = lockDOs.stream().filter(lockDO -> !dbExistedRowKeys.contains(lockDO.getRowKey())).collect(Collectors.toList());
  114.                 }
  115.                 if (CollectionUtils.isEmpty(unrepeatedLockDOs)) {
  116.                     conn.rollback();
  117.                     return true;
  118.                 }
  119.             }
  120.             //lock
  121.             if (unrepeatedLockDOs.size() == 1) {
  122.                 LockDO lockDO = unrepeatedLockDOs.get(0);
  123.                 //尝试加锁,表示全局锁被当前的分支事务获取了
  124.                 if (!doAcquireLock(conn, lockDO)) {
  125.                     if (LOGGER.isInfoEnabled()) {
  126.                         LOGGER.info("Global lock acquire failed, xid {} branchId {} pk {}", lockDO.getXid(), lockDO.getBranchId(), lockDO.getPk());
  127.                     }
  128.                     conn.rollback();
  129.                     return false;
  130.                 }
  131.             } else {
  132.                 if (!doAcquireLocks(conn, unrepeatedLockDOs)) {
  133.                     if (LOGGER.isInfoEnabled()) {
  134.                         LOGGER.info("Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs.get(0).getXid(), unrepeatedLockDOs.get(0).getBranchId(), unrepeatedLockDOs.stream().map(lockDO -> lockDO.getPk()).collect(Collectors.toList()));
  135.                     }
  136.                     conn.rollback();
  137.                     return false;
  138.                 }
  139.             }
  140.             conn.commit();
  141.             return true;
  142.         } catch (SQLException e) {
  143.             throw new StoreException(e);
  144.         } finally {
  145.             IOUtil.close(rs, ps);
  146.             if (conn != null) {
  147.                 try {
  148.                     if (originalAutoCommit) {
  149.                         conn.setAutoCommit(true);
  150.                     }
  151.                     conn.close();
  152.                 } catch (SQLException e) {
  153.                 }
  154.             }
  155.         }
  156.     }
  157.    
  158.     protected boolean doAcquireLock(Connection conn, LockDO lockDO) {
  159.         PreparedStatement ps = null;
  160.         try {
  161.             //insert
  162.             String insertLockSQL = LockStoreSqlFactory.getLogStoreSql(dbType).getInsertLockSQL(lockTable);
  163.             ps = conn.prepareStatement(insertLockSQL);
  164.             ps.setString(1, lockDO.getXid());//全局事务xid
  165.             ps.setLong(2, lockDO.getTransactionId());//全局事务id
  166.             ps.setLong(3, lockDO.getBranchId());//分支事务id
  167.             ps.setString(4, lockDO.getResourceId());//资源id
  168.             ps.setString(5, lockDO.getTableName());//表名称
  169.             ps.setString(6, lockDO.getPk());//主键
  170.             ps.setString(7, lockDO.getRowKey());//rowkey
  171.             ps.setInt(8, LockStatus.Locked.getCode());//locked
  172.             return ps.executeUpdate() > 0;
  173.         } catch (SQLException e) {
  174.             throw new StoreException(e);
  175.         } finally {
  176.             IOUtil.close(ps);
  177.         }
  178.     }
  179.     ...
  180. }
复制代码
 
16.全局锁和分支事务及本地事务总结
获取到全局锁,才能注册分支事务成功,否则LockRetryPolicy重试。获取到全局锁,才能提交本地事务成功,否则LockRetryPolicy重试。
 
全局锁没有被其他事务(xid)获取,则当前事务(xid)才能获取全局锁成功。获取全局锁,会将当前分支事务申请全局锁的记录写入到数据库中。
 
17.提交全局事务以及提交各分支事务的源码
(1)Seata Client发起提交全局事务的请求
(2)Server向Client发送提交分支事务的请求
(3)Seata Client处理提交分支事务的请求
(4)全局事务的提交主要就是让各个分支事务把本地的UndoLog删除
 
(1)Seata Client发起提交全局事务的请求
  1. -> TransactionalTemplate.execute()发起全局事务的提交
  2. -> TransactionalTemplate.commitTransaction()
  3. -> DefaultGlobalTransaction.commit()
  4. -> DefaultTransactionManager.commit()
  5. -> DefaultTransactionManager.syncCall()
  6. -> TmNettyRemotingClient.sendSyncRequest()
  7. 把全局事务提交请求GlobalCommitRequest发送给Seata Server进行处理
复制代码
  1. //Template of executing business logic with a global transaction. 全局事务执行模版
  2. public class TransactionalTemplate {
  3.     private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);
  4.    
  5.     public Object execute(TransactionalExecutor business) throws Throwable {
  6.         //1.Get transactionInfo
  7.         TransactionInfo txInfo = business.getTransactionInfo();
  8.         if (txInfo == null) {
  9.             throw new ShouldNeverHappenException("transactionInfo does not exist");
  10.         }
  11.         //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
  12.         //根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务
  13.         //刚开始在开启一个全局事务的时候,是没有全局事务的
  14.         GlobalTransaction tx = GlobalTransactionContext.getCurrent();
  15.         //1.2 Handle the transaction propagation.
  16.         //从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED
  17.         //也就是如果存在一个全局事务,就直接执行业务;如果不存在一个全局事务,就开启一个新的全局事务;
  18.         Propagation propagation = txInfo.getPropagation();
  19.         //不同的全局事务传播级别,会采取不同的处理方式
  20.         //比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid
  21.         //可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别
  22.         SuspendedResourcesHolder suspendedResourcesHolder = null;
  23.         try {
  24.             switch (propagation) {
  25.                 ...
  26.             }
  27.             //1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
  28.             if (tx == null) {
  29.                 tx = GlobalTransactionContext.createNew();
  30.             }
  31.             //set current tx config to holder
  32.             GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
  33.             try {
  34.                 //2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
  35.                 //else do nothing. Of course, the hooks will still be triggered.
  36.                 //开启一个全局事务
  37.                 beginTransaction(txInfo, tx);
  38.   
  39.                 Object rs;
  40.                 try {
  41.                     //Do Your Business
  42.                     //执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并提交一个一个分支事务
  43.                     rs = business.execute();
  44.                 } catch (Throwable ex) {
  45.                     //3. The needed business exception to rollback.
  46.                     //发生异常时需要完成的事务
  47.                     completeTransactionAfterThrowing(txInfo, tx, ex);
  48.                     throw ex;
  49.                 }
  50.                 //4. everything is fine, commit.
  51.                 //如果一切执行正常就会在这里提交全局事务
  52.                 commitTransaction(tx);
  53.                 return rs;
  54.             } finally {
  55.                 //5. clear
  56.                 //执行一些全局事务完成后的回调,比如清理等工作
  57.                 resumeGlobalLockConfig(previousConfig);
  58.                 triggerAfterCompletion();
  59.                 cleanUp();
  60.             }
  61.         } finally {
  62.             //If the transaction is suspended, resume it.
  63.             if (suspendedResourcesHolder != null) {
  64.                 //如果之前挂起了一个全局事务,此时可以恢复这个全局事务
  65.                 tx.resume(suspendedResourcesHolder);
  66.             }
  67.         }
  68.     }
  69.    
  70.     //提交事务
  71.     private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
  72.         try {
  73.             triggerBeforeCommit();
  74.             tx.commit();
  75.             triggerAfterCommit();
  76.         } catch (TransactionException txe) {
  77.             // 4.1 Failed to commit
  78.             throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);
  79.         }
  80.     }
  81.     ...
  82. }
  83. //The type Default global transaction. 默认的全局事务
  84. public class DefaultGlobalTransaction implements GlobalTransaction {
  85.     private TransactionManager transactionManager;
  86.    
  87.     DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
  88.         this.transactionManager = TransactionManagerHolder.get();//全局事务管理者
  89.         this.xid = xid;
  90.         this.status = status;
  91.         this.role = role;
  92.     }
  93.     ...
  94.     @Override
  95.     public void commit() throws TransactionException {
  96.         if (role == GlobalTransactionRole.Participant) {
  97.             //Participant has no responsibility of committing
  98.             if (LOGGER.isDebugEnabled()) {
  99.                 LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);
  100.             }
  101.             return;
  102.         }
  103.         assertXIDNotNull();
  104.         int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
  105.         try {
  106.             while (retry > 0) {
  107.                 try {
  108.                     retry--;
  109.                     status = transactionManager.commit(xid);
  110.                     break;
  111.                 } catch (Throwable ex) {
  112.                     LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
  113.                     if (retry == 0) {
  114.                         throw new TransactionException("Failed to report global commit", ex);
  115.                     }
  116.                 }
  117.             }
  118.         } finally {
  119.             if (xid.equals(RootContext.getXID())) {
  120.                 suspend();
  121.             }
  122.         }
  123.         if (LOGGER.isInfoEnabled()) {
  124.             LOGGER.info("[{}] commit status: {}", xid, status);
  125.         }
  126.     }
  127.     ...
  128. }
  129. public class TransactionManagerHolder {
  130.     ...
  131.     private TransactionManagerHolder() {
  132.     }
  133.    
  134.     private static class SingletonHolder {
  135.         private static TransactionManager INSTANCE = null;
  136.         static {
  137.             try {
  138.                 INSTANCE = EnhancedServiceLoader.load(TransactionManager.class);
  139.                 LOGGER.info("TransactionManager Singleton {}", INSTANCE);
  140.             } catch (Throwable anyEx) {
  141.                 LOGGER.error("Failed to load TransactionManager Singleton! ", anyEx);
  142.             }
  143.         }
  144.     }
  145.    
  146.     //Get transaction manager.
  147.     public static TransactionManager get() {
  148.         if (SingletonHolder.INSTANCE == null) {
  149.             throw new ShouldNeverHappenException("TransactionManager is NOT ready!");
  150.         }
  151.         return SingletonHolder.INSTANCE;
  152.     }
  153.     ...
  154. }
  155. public class DefaultTransactionManager implements TransactionManager {
  156.     ...
  157.     @Override
  158.     public GlobalStatus commit(String xid) throws TransactionException {
  159.         GlobalCommitRequest globalCommit = new GlobalCommitRequest();
  160.         globalCommit.setXid(xid);
  161.         GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
  162.         return response.getGlobalStatus();
  163.     }
  164.    
  165.     private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
  166.         try {
  167.             //TMNettyRemotingClient会和Seata Server基于Netty建立长连接
  168.             return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
  169.         } catch (TimeoutException toe) {
  170.             throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
  171.         }
  172.     }
  173.     ...
  174. }
复制代码
(2)Server向Client发送提交分支事务的请求
ServerHandler的channelRead()方法会将收到的请求进行层层传递:首先交给DefaultCoordinator的onRequest()方法来进行处理,然后交给GlobalCommitRequest的handle()方法来进行处理,接着交给AbstractTCInboundHandler的handle()方法来进行处理,最后交给DefaultCoordinator的doGlobalCommit()方法来进行处理,也就是调用DefaultCore的commit()方法来提交全局事务。
  1. public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
  2.     ...
  3.     @ChannelHandler.Sharable
  4.     class ServerHandler extends ChannelDuplexHandler {
  5.         @Override
  6.         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  7.             if (!(msg instanceof RpcMessage)) {
  8.                 return;
  9.             }
  10.             //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
  11.             processMessage(ctx, (RpcMessage) msg);
  12.         }
  13.         ...
  14.     }
  15.     ...
  16. }
  17. public abstract class AbstractNettyRemoting implements Disposable {
  18.     ...
  19.     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  20.         if (LOGGER.isDebugEnabled()) {
  21.             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
  22.         }
  23.         Object body = rpcMessage.getBody();
  24.         if (body instanceof MessageTypeAware) {
  25.             MessageTypeAware messageTypeAware = (MessageTypeAware) body;
  26.             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
  27.             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
  28.             //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
  29.             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
  30.             if (pair != null) {
  31.                 if (pair.getSecond() != null) {
  32.                     try {
  33.                         pair.getSecond().execute(() -> {
  34.                             try {
  35.                                 pair.getFirst().process(ctx, rpcMessage);
  36.                             } catch (Throwable th) {
  37.                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  38.                             } finally {
  39.                                 MDC.clear();
  40.                             }
  41.                         });
  42.                     } catch (RejectedExecutionException e) {
  43.                         ...
  44.                     }
  45.                 } else {
  46.                     try {
  47.                         pair.getFirst().process(ctx, rpcMessage);
  48.                     } catch (Throwable th) {
  49.                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  50.                     }
  51.                 }
  52.             } else {
  53.                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
  54.             }
  55.         } else {
  56.             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
  57.         }
  58.     }
  59.     ...
  60. }
  61. public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
  62.     private final RemotingServer remotingServer;
  63.     ...
  64.     @Override
  65.     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  66.         if (ChannelManager.isRegistered(ctx.channel())) {
  67.             onRequestMessage(ctx, rpcMessage);
  68.         } else {
  69.             try {
  70.                 if (LOGGER.isInfoEnabled()) {
  71.                     LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
  72.                 }
  73.                 ctx.disconnect();
  74.                 ctx.close();
  75.             } catch (Exception exx) {
  76.                 LOGGER.error(exx.getMessage());
  77.             }
  78.             if (LOGGER.isInfoEnabled()) {
  79.                 LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
  80.             }
  81.         }
  82.     }
  83.    
  84.     private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
  85.         Object message = rpcMessage.getBody();
  86.         //RpcContext线程本地变量副本
  87.         RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
  88.         if (LOGGER.isDebugEnabled()) {
  89.             LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
  90.         } else {
  91.             try {
  92.                 BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
  93.             } catch (InterruptedException e) {
  94.                 LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
  95.             }
  96.         }
  97.         if (!(message instanceof AbstractMessage)) {
  98.             return;
  99.         }
  100.         //the batch send request message
  101.         if (message instanceof MergedWarpMessage) {
  102.             ...
  103.         } else {
  104.             //the single send request message
  105.             final AbstractMessage msg = (AbstractMessage) message;
  106.             //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage
  107.             AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
  108.             //返回响应给客户端
  109.             remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
  110.         }
  111.     }
  112.     ...
  113. }
  114. public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
  115.     ...
  116.     @Override
  117.     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
  118.         if (!(request instanceof AbstractTransactionRequestToTC)) {
  119.             throw new IllegalArgumentException();
  120.         }
  121.         //传入的request其实就是客户端发送请求时的GlobalCommitRequest
  122.         AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
  123.         transactionRequest.setTCInboundHandler(this);
  124.         return transactionRequest.handle(context);
  125.     }
  126.     ...
  127. }
  128. public class GlobalCommitRequest extends AbstractGlobalEndRequest {
  129.     @Override
  130.     public short getTypeCode() {
  131.         return MessageType.TYPE_GLOBAL_COMMIT;
  132.     }
  133.    
  134.     @Override
  135.     public AbstractTransactionResponse handle(RpcContext rpcContext) {
  136.         return handler.handle(this, rpcContext);
  137.     }
  138. }
  139. public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
  140.     ...
  141.     @Override
  142.     public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
  143.         GlobalCommitResponse response = new GlobalCommitResponse();
  144.         response.setGlobalStatus(GlobalStatus.Committing);
  145.         exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
  146.             @Override
  147.             public void execute(GlobalCommitRequest request, GlobalCommitResponse response) throws TransactionException {
  148.                 try {
  149.                     doGlobalCommit(request, response, rpcContext);
  150.                 } catch (StoreException e) {
  151.                     throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);
  152.                 }
  153.             }
  154.            
  155.             @Override
  156.             public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response, TransactionException tex) {
  157.                 super.onTransactionException(request, response, tex);
  158.                 checkTransactionStatus(request, response);
  159.             }
  160.             @Override
  161.             public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {
  162.                 super.onException(request, response, rex);
  163.                 checkTransactionStatus(request, response);
  164.             }
  165.         }, request, response);
  166.         return response;
  167.     }
  168.    
  169.     protected abstract void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException;
  170.     ...
  171. }
  172. public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
  173.     private final DefaultCore core;
  174.     ...
  175.     @Override
  176.     protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException {
  177.         MDC.put(RootContext.MDC_KEY_XID, request.getXid());
  178.         //调用DefaultCore.commit()方法提交全局事务
  179.         response.setGlobalStatus(core.commit(request.getXid()));
  180.     }
  181.     ...
  182. }
复制代码
DefaultCore的commit()方法会调用DefaultCore的doGlobalCommit()方法,而doGlobalCommit()方法会获取全局事务的所有分支事务并进行遍历,然后把提交分支事务的请求BranchCommitRequest发送到Seata Client中。
  1. public class DefaultCore implements Core {
  2.     ...
  3.     @Override
  4.     public GlobalStatus commit(String xid) throws TransactionException {
  5.         GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
  6.         if (globalSession == null) {
  7.             return GlobalStatus.Finished;
  8.         }
  9.         globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  10.         //just lock changeStatus
  11.         boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
  12.             if (globalSession.getStatus() == GlobalStatus.Begin) {
  13.                 //Highlight: Firstly, close the session, then no more branch can be registered.
  14.                 globalSession.closeAndClean();
  15.                 if (globalSession.canBeCommittedAsync()) {
  16.                     globalSession.asyncCommit();
  17.                     MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);
  18.                     return false;
  19.                 } else {
  20.                     globalSession.changeGlobalStatus(GlobalStatus.Committing);
  21.                     return true;
  22.                 }
  23.             }
  24.             return false;
  25.         });
  26.         if (shouldCommit) {
  27.             boolean success = doGlobalCommit(globalSession, false);
  28.             //If successful and all remaining branches can be committed asynchronously, do async commit.
  29.             if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
  30.                 globalSession.asyncCommit();
  31.                 return GlobalStatus.Committed;
  32.             } else {
  33.                 return globalSession.getStatus();
  34.             }
  35.         } else {
  36.             return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
  37.         }
  38.     }
  39.     @Override
  40.     public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
  41.         boolean success = true;
  42.         //start committing event
  43.         MetricsPublisher.postSessionDoingEvent(globalSession, retrying);
  44.         if (globalSession.isSaga()) {
  45.             success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);
  46.         } else {
  47.             //获取到全局事务的所有分支事务,并进行遍历提交
  48.             Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {
  49.                 //if not retrying, skip the canBeCommittedAsync branches
  50.                 if (!retrying && branchSession.canBeCommittedAsync()) {
  51.                     return CONTINUE;
  52.                 }
  53.                 BranchStatus currentStatus = branchSession.getStatus();
  54.                 if (currentStatus == BranchStatus.PhaseOne_Failed) {
  55.                     SessionHelper.removeBranch(globalSession, branchSession, !retrying);
  56.                     return CONTINUE;
  57.                 }
  58.                 try {
  59.                     //发送请求给Seata Client提交分支事务
  60.                     BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
  61.                     if (isXaerNotaTimeout(globalSession,branchStatus)) {
  62.                         LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());
  63.                         branchStatus = BranchStatus.PhaseTwo_Committed;
  64.                     }
  65.                     switch (branchStatus) {
  66.                         case PhaseTwo_Committed:
  67.                             SessionHelper.removeBranch(globalSession, branchSession, !retrying);
  68.                             return CONTINUE;
  69.                         case PhaseTwo_CommitFailed_Unretryable:
  70.                             //not at branch
  71.                             SessionHelper.endCommitFailed(globalSession, retrying);
  72.                             LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
  73.                             return false;
  74.                         default:
  75.                             if (!retrying) {
  76.                                 globalSession.queueToRetryCommit();
  77.                                 return false;
  78.                             }
  79.                             if (globalSession.canBeCommittedAsync()) {
  80.                                 LOGGER.error("Committing branch transaction[{}], status:{} and will retry later", branchSession.getBranchId(), branchStatus);
  81.                                 return CONTINUE;
  82.                             } else {
  83.                                 LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());
  84.                                 return false;
  85.                             }
  86.                     }
  87.                 } catch (Exception ex) {
  88.                     StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}", new String[] {branchSession.toString()});
  89.                     if (!retrying) {
  90.                         globalSession.queueToRetryCommit();
  91.                         throw new TransactionException(ex);
  92.                     }
  93.                 }
  94.                 return CONTINUE;
  95.             });
  96.             //Return if the result is not null
  97.             if (result != null) {
  98.                 return result;
  99.             }
  100.             //If has branch and not all remaining branches can be committed asynchronously,
  101.             //do print log and return false
  102.             if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
  103.                 LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());
  104.                 return false;
  105.             }
  106.             if (!retrying) {
  107.                 //contains not AT branch
  108.                 globalSession.setStatus(GlobalStatus.Committed);
  109.             }
  110.         }
  111.         //if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is
  112.         //executed to improve concurrency performance, and the global transaction ends..
  113.         if (success && globalSession.getBranchSessions().isEmpty()) {
  114.             SessionHelper.endCommitted(globalSession, retrying);
  115.             LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());
  116.         }
  117.         return success;
  118.     }
  119.     ...
  120. }
  121. public abstract class AbstractCore implements Core {
  122.     protected RemotingServer remotingServer;
  123.     ...
  124.     @Override
  125.     public BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {
  126.         try {
  127.             BranchCommitRequest request = new BranchCommitRequest();
  128.             request.setXid(branchSession.getXid());
  129.             request.setBranchId(branchSession.getBranchId());
  130.             request.setResourceId(branchSession.getResourceId());
  131.             request.setApplicationData(branchSession.getApplicationData());
  132.             request.setBranchType(branchSession.getBranchType());
  133.             return branchCommitSend(request, globalSession, branchSession);
  134.         } catch (IOException | TimeoutException e) {
  135.             throw new BranchTransactionException(FailedToSendBranchCommitRequest, String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(), branchSession.getBranchId()), e);
  136.         }
  137.     }
  138.    
  139.     protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException {
  140.         BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(branchSession.getResourceId(), branchSession.getClientId(), request);
  141.         return response.getBranchStatus();
  142.     }
  143.     ...
  144. }
  145. public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
  146.     ...
  147.     @Override
  148.     public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {
  149.         if (channel == null) {
  150.             throw new RuntimeException("client is not connected");
  151.         }
  152.         RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
  153.         return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
  154.     }
  155.     ...
  156. }
  157. public abstract class AbstractNettyRemoting implements Disposable {
  158.     ...
  159.     protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
  160.         if (timeoutMillis <= 0) {
  161.             throw new FrameworkException("timeout should more than 0ms");
  162.         }
  163.         if (channel == null) {
  164.             LOGGER.warn("sendSync nothing, caused by null channel.");
  165.             return null;
  166.         }
  167.         //把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里
  168.         MessageFuture messageFuture = new MessageFuture();
  169.         messageFuture.setRequestMessage(rpcMessage);
  170.         messageFuture.setTimeout(timeoutMillis);
  171.         futures.put(rpcMessage.getId(), messageFuture);
  172.         channelWritableCheck(channel, rpcMessage.getBody());
  173.         //获取远程地址
  174.         String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
  175.         doBeforeRpcHooks(remoteAddr, rpcMessage);
  176.         //异步化发送数据,同时对发送结果添加监听器
  177.         //如果发送失败,则会对网络连接Channel进行销毁处理
  178.         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
  179.             if (!future.isSuccess()) {
  180.                 MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
  181.                 if (messageFuture1 != null) {
  182.                     messageFuture1.setResultMessage(future.cause());
  183.                 }
  184.                 destroyChannel(future.channel());
  185.             }
  186.         });
  187.         try {
  188.             //然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应
  189.             Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
  190.             doAfterRpcHooks(remoteAddr, rpcMessage, result);
  191.             return result;
  192.         } catch (Exception exx) {
  193.             LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
  194.             if (exx instanceof TimeoutException) {
  195.                 throw (TimeoutException) exx;
  196.             } else {
  197.                 throw new RuntimeException(exx);
  198.             }
  199.         }
  200.     }
  201.     ...
  202. }
复制代码
(3)Seata Client处理提交分支事务的请求
ClientHandler的channelRead()方法收到提交分支事务的请求后,会由RmBranchCommitProcessor的handleBranchCommit()方法进行处理。
  1. -> AbstractRMHandler.onRequest()
  2. -> BranchCommitRequest.handle()
  3. -> AbstractRMHandler.handle()
  4. -> AbstractRMHandler.doBranchCommit()
  5. -> DataSourceManager.branchCommit()
  6. -> AsyncWorker.branchCommit()异步化提交分支事务
复制代码
  1. public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
  2.     ...
  3.     @Sharable
  4.     class ClientHandler extends ChannelDuplexHandler {
  5.         @Override
  6.         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  7.             if (!(msg instanceof RpcMessage)) {
  8.                 return;
  9.             }
  10.             processMessage(ctx, (RpcMessage) msg);
  11.         }
  12.         ...
  13.     }
  14.     ...
  15. }
  16. public abstract class AbstractNettyRemoting implements Disposable {
  17.     ...
  18.     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  19.         if (LOGGER.isDebugEnabled()) {
  20.             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
  21.         }
  22.         Object body = rpcMessage.getBody();
  23.         if (body instanceof MessageTypeAware) {
  24.             MessageTypeAware messageTypeAware = (MessageTypeAware) body;
  25.             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
  26.             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
  27.             if (pair != null) {
  28.                 if (pair.getSecond() != null) {
  29.                     try {
  30.                         pair.getSecond().execute(() -> {
  31.                             try {
  32.                                 pair.getFirst().process(ctx, rpcMessage);
  33.                             } catch (Throwable th) {
  34.                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  35.                             } finally {
  36.                                 MDC.clear();
  37.                             }
  38.                         });
  39.                     } catch (RejectedExecutionException e) {
  40.                         ...
  41.                     }
  42.                 } else {
  43.                     try {
  44.                         pair.getFirst().process(ctx, rpcMessage);
  45.                     } catch (Throwable th) {
  46.                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  47.                     }
  48.                 }
  49.             } else {
  50.                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
  51.             }
  52.         } else {
  53.             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
  54.         }
  55.     }
  56.     ...
  57. }
  58. public class RmBranchCommitProcessor implements RemotingProcessor {
  59.     ...
  60.     @Override
  61.     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  62.         String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
  63.         Object msg = rpcMessage.getBody();
  64.         if (LOGGER.isInfoEnabled()) {
  65.             LOGGER.info("rm client handle branch commit process:" + msg);
  66.         }
  67.         handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
  68.     }
  69.     private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
  70.         BranchCommitResponse resultMessage;
  71.         resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);
  72.         if (LOGGER.isDebugEnabled()) {
  73.             LOGGER.debug("branch commit result:" + resultMessage);
  74.         }
  75.         try {
  76.             this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
  77.         } catch (Throwable throwable) {
  78.             LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
  79.         }
  80.     }
  81.     ...
  82. }
  83. public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {
  84.     ...
  85.     @Override
  86.     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
  87.         if (!(request instanceof AbstractTransactionRequestToRM)) {
  88.             throw new IllegalArgumentException();
  89.         }
  90.         AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
  91.         transactionRequest.setRMInboundMessageHandler(this);
  92.         return transactionRequest.handle(context);
  93.     }
  94.     ...
  95. }
  96. public class BranchCommitRequest extends AbstractBranchEndRequest {
  97.     @Override
  98.     public short getTypeCode() {
  99.         return MessageType.TYPE_BRANCH_COMMIT;
  100.     }
  101.    
  102.     @Override
  103.     public AbstractTransactionResponse handle(RpcContext rpcContext) {
  104.         return handler.handle(this);
  105.     }
  106. }
  107. public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {
  108.     @Override
  109.     public BranchCommitResponse handle(BranchCommitRequest request) {
  110.         BranchCommitResponse response = new BranchCommitResponse();
  111.         exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
  112.             @Override
  113.             public void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
  114.                 doBranchCommit(request, response);
  115.             }
  116.         }, request, response);
  117.         return response;
  118.     }
  119.    
  120.     protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
  121.         String xid = request.getXid();
  122.         long branchId = request.getBranchId();
  123.         String resourceId = request.getResourceId();
  124.         String applicationData = request.getApplicationData();
  125.         if (LOGGER.isInfoEnabled()) {
  126.             LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
  127.         }
  128.         BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);
  129.         response.setXid(xid);
  130.         response.setBranchId(branchId);
  131.         response.setBranchStatus(status);
  132.         if (LOGGER.isInfoEnabled()) {
  133.             LOGGER.info("Branch commit result: " + status);
  134.         }
  135.     }
  136.     ...
  137. }
  138. //The type Data source manager. DataSourceManager是AT模式下的资源管理器
  139. public class DataSourceManager extends AbstractResourceManager {
  140.     //异步化worker
  141.     private final AsyncWorker asyncWorker = new AsyncWorker(this);
  142.     ...
  143.     @Override
  144.     public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
  145.         //通过asyncWorker,异步化提交分支事务
  146.         return asyncWorker.branchCommit(xid, branchId, resourceId);
  147.     }
  148.     ...
  149. }
复制代码
(4)全局事务的提交主要就是让各个分支事务把本地的UndoLog删除
  1. public class AsyncWorker {
  2.     ...
  3.     public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
  4.         Phase2Context context = new Phase2Context(xid, branchId, resourceId);
  5.         addToCommitQueue(context);
  6.         return BranchStatus.PhaseTwo_Committed;
  7.     }
  8.    
  9.     private void addToCommitQueue(Phase2Context context) {
  10.         if (commitQueue.offer(context)) {
  11.             return;
  12.         }
  13.         CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor).thenRun(() -> addToCommitQueue(context));
  14.     }
  15.    
  16.     void doBranchCommitSafely() {
  17.         try {
  18.             doBranchCommit();
  19.         } catch (Throwable e) {
  20.             LOGGER.error("Exception occur when doing branch commit", e);
  21.         }
  22.     }
  23.    
  24.     private void doBranchCommit() {
  25.         if (commitQueue.isEmpty()) {
  26.             return;
  27.         }
  28.         //transfer all context currently received to this list
  29.         List<Phase2Context> allContexts = new LinkedList<>();
  30.         commitQueue.drainTo(allContexts);
  31.         //group context by their resourceId
  32.         Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);
  33.         groupedContexts.forEach(this::dealWithGroupedContexts);
  34.     }
  35.    
  36.     private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
  37.         DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
  38.         if (dataSourceProxy == null) {
  39.             LOGGER.warn("failed to find resource for {} and requeue", resourceId);
  40.             addAllToCommitQueue(contexts);
  41.             return;
  42.         }
  43.         Connection conn = null;
  44.         try {
  45.             conn = dataSourceProxy.getPlainConnection();
  46.             UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
  47.             //split contexts into several lists, with each list contain no more element than limit size
  48.             List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);
  49.             //全局事务的提交,就是让各个分支事务把本地的undo logs删除掉即可
  50.             for (List<Phase2Context> partition : splitByLimit) {
  51.                 deleteUndoLog(conn, undoLogManager, partition);
  52.             }
  53.         } catch (SQLException sqlExx) {
  54.             addAllToCommitQueue(contexts);
  55.             LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, sqlExx);
  56.         } finally {
  57.             IOUtil.close(conn);
  58.         }
  59.     }
  60.     ...
  61. }
复制代码
 
18.全局事务回滚的过程源码
全局事务的回滚流程和提交流程几乎一样:
一.Seata Client发起全局事务回滚请求
二.Server向Client发送分支事务回滚请求
三.Seata Client处理分支事务回滚的请求
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册