找回密码
 立即注册
首页 业界区 业界 Seata源码—5.全局事务的创建与返回处理

Seata源码—5.全局事务的创建与返回处理

梢疠 2025-6-2 23:52:39
大纲
1.Seata开启分布式事务的流程总结
2.Seata生成全局事务ID的雪花算法源码
3.生成xid以及对全局事务会话进行持久化的源码
4.全局事务会话数据持久化的实现源码
5.Seata Server创建全局事务与返回xid的源码
6.Client获取Server的响应与处理的源码
7.Seata与Dubbo整合的过滤器源码
 
1.Seata开启分布式事务的流程总结
(1)Seata分布式事务执行流程
(2)开启一个全局事务的流程
 
(1)Seata分布式事务执行流程
Seata Client在执行添加了全局事务注解@GlobalTransactional的方法时,实际执行的是根据全局事务拦截器创建该方法所在Bean的动态代理方法,于是会执行GlobalTransactionalInterceptor的invoke()方法。此时,添加了全局事务注解@GlobalTransactional的方法就会被全局事务拦截器拦截了。
 
GlobalTransactionalInterceptor全局事务拦截器拦截目标方法的调用后,会由事务执行模版TransactionalTemplate的excute()方法来执行目标方法。
 
在事务执行模版TransactionalTemplate的excute()方法中,首先会判断Propagation全局事务传播级别,然后开启一个全局事务(也就是打开一个全局事务),接着才执行具体的业务目标方法。
 
执行具体的业务目标方法时,会通过Dubbo的RPC调用来传递全局事务的xid给其他的Seata Client。其他的Seata Client通过Dubbo过滤器获取到RPC调用中的xid后,会将xid放入线程本地变量副本中。之后执行SQL时就会获取数据库连接代理来对SQL进行拦截,数据库连接代理就可以从线程本地变量副本中获取xid,然后开启分支事务。
 
各个分支事务都执行完毕后,开启全局事务的Seata Client就会提交事务、处理全局锁、资源清理。
 
(2)开启一个全局事务的流程
Seata Server收到Seata Client发送过来的RpcMessage对象消息后,RpcMessage对象消息首先会由ServerOnRequestProcessor的process()方法处理,然后会由DefaultCoordinator的onRequest()方法进行处理,接着会由GlobalBeginRequest的handle()方法进行处理,然后会由DefaultCoordinator的doGlobalBegin()方法来处理,最后给到DefaultCore的begin()方法来进行处理。
 
在DefaultCore的begin()方法中,首先就会创建一个全局事务会话,然后将全局事务会话的xid通过MDC放入线程本地变量副本中,接着对该全局事务会话添加一个全局事务会话的生命周期监听器,最后打开该全局事务会话、发布会话开启事件并返回全局事务会话的xid。
 
在创建一个全局事务会话GlobalSession时,首先会由uuid生成组件UUIDGenerator来生成全局事务id(transactionId),然后根据生成的全局事务id(transactionId)来继续生成xid。
 
2.Seata生成全局事务ID的雪花算法源码
(1)通过UUIDGenerator生成全局事务ID
(2)IdWorker实现的雪花算法生成的ID的组成
(3)IdWorker实现的雪花算法对时钟回拨的处理
 
(1)通过UUIDGenerator生成全局事务ID
Seata在创建全局事务会话时会通过UUIDGenerator来生成全局事务ID,UUIDGenerator在生成ID时是通过Seata自己实现的雪花算法来生成的。
  1. public class GlobalSession implements SessionLifecycle, SessionStorable {
  2.     ...
  3.     //创建全局事务会话
  4.     public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {
  5.         GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);
  6.         return session;
  7.     }
  8.    
  9.     public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {
  10.         //全局事务id是通过UUIDGenerator来生成的
  11.         this.transactionId = UUIDGenerator.generateUUID();
  12.         this.status = GlobalStatus.Begin;
  13.         this.lazyLoadBranch = lazyLoadBranch;
  14.         if (!lazyLoadBranch) {
  15.             this.branchSessions = new ArrayList<>();
  16.         }
  17.         this.applicationId = applicationId;
  18.         this.transactionServiceGroup = transactionServiceGroup;
  19.         this.transactionName = transactionName;
  20.         this.timeout = timeout;
  21.         //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid
  22.         this.xid = XID.generateXID(transactionId);
  23.     }
  24.     ...
  25. }
  26. public class UUIDGenerator {
  27.     private static volatile IdWorker idWorker;
  28.    
  29.     //generate UUID using snowflake algorithm
  30.     public static long generateUUID() {
  31.         //Double Check + volatile,实现并发场景下只创建一次idWorker对象
  32.         if (idWorker == null) {
  33.             synchronized (UUIDGenerator.class) {
  34.                 if (idWorker == null) {
  35.                     init(null);
  36.                 }
  37.             }
  38.         }
  39.         //正常情况下,每次都会通过idWorker生成一个id
  40.         return idWorker.nextId();
  41.     }
  42.    
  43.     //init IdWorker
  44.     public static void init(Long serverNode) {
  45.         idWorker = new IdWorker(serverNode);
  46.     }
  47. }
复制代码
(2)IdWorker实现的雪花算法生成的ID的组成
IdWorker就是Seata自己实现的基于雪花算法的ID生成器。IdWorker的nextId()方法通过雪花算法生成的transactionId一共是64位,用64个bit拼接出一个唯一的ID。
 
一.最高位始终是0,占1个bit
 
二.接着的10个bit是workerId
一台机器就是一个worker,每个worker都会有一个自己的workerId。生成workerId时,是基于本机网络地址里的Mac地址来生成的。
 
三.接着的41个bit是时间戳
表示可以为某台机器的每一毫秒,分配一个自增长的ID。毫秒时间戳有13位数,转换为2进制需要2的41次方。
 
四.最后的12个bit是序列号
如果一台机器在一毫秒内需要为很多线程生成ID,就可以通过自增长的12个bit的Sequence为每个线程分配ID。
 
(3)IdWorker实现的雪花算法对时钟回拨的处理
在执行IdWorker的nextId()方法时,会对包含序列号和时间戳的timestampAndSequence进行累加,也就是对timestampAndSequence的某一个毫秒内的Sequence序列号进行累加。
 
如果出现大量的线程并发获取ID,此时可能会导致timestampAndSequence中某一个毫秒内的Sequence序列号快速累加,并且将代表Sequence序列号的12个bit全部累加完毕,最后便会导致包含序列号和时间戳的timestampAndSequence中的毫秒时间戳也进行累加。
 
但当前的实际时间其实还是这一毫秒,而timestampAndSequence里的毫秒时间戳已经累加到下一个毫秒去了,出现时钟回拨问题,于是就需要调用waitIfNecessary()方法进行处理。
 
所以,在IdWorker的waitIfNecessary()方法中,如果获取ID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽,那么就需要阻塞当前线程5毫秒。
  1. //IdWorker就是Seata自己实现的基于雪花算法的ID生成器
  2. public class IdWorker {
  3.     private final long twepoch = 1588435200000L;//Start time cut (2020-05-03)
  4.     private final int workerIdBits = 10;//The number of bits occupied by workerId
  5.     private final int timestampBits = 41;//The number of bits occupied by timestamp
  6.     private final int sequenceBits = 12;//The number of bits occupied by sequence
  7.     private final int maxWorkerId = ~(-1 << workerIdBits);//Maximum supported machine id, the result is 1023
  8.     //business meaning: machine ID (0 ~ 1023)
  9.     //actual layout in memory:
  10.     //highest 1 bit: 0
  11.     //middle 10 bit: workerId
  12.     //lowest 53 bit: all 0
  13.     private long workerId;
  14.     //timestampAndSequence是64位的、支持CAS操作的Long型的、包含了Sequence序列号的时间戳
  15.     //它的最高位是11个bit,没有使用
  16.     //中间有41个bit,是时间戳
  17.     //最低位有12个bit,是序列号
  18.     //timestampAndSequence可以认为是把时间戳和序列号混合在了一个long型数字里
  19.    
  20.     //timestamp and sequence mix in one Long
  21.     //highest 11 bit: not used
  22.     //middle  41 bit: timestamp
  23.     //lowest  12 bit: sequence
  24.     private AtomicLong timestampAndSequence;
  25.     //mask that help to extract timestamp and sequence from a long
  26.     //可以帮忙从一个long数字里提取出一个包含Sequence序列号的时间戳
  27.     private final long timestampAndSequenceMask = ~(-1L << (timestampBits + sequenceBits));
  28.     //instantiate an IdWorker using given workerId
  29.     public IdWorker(Long workerId) {
  30.         //初始化timestampAndSequence
  31.         initTimestampAndSequence();
  32.         //初始化workerId
  33.         initWorkerId(workerId);
  34.     }
  35.     //init first timestamp and sequence immediately
  36.     private void initTimestampAndSequence() {
  37.         //获取相对于twepoch的最新时间戳
  38.         long timestamp = getNewestTimestamp();
  39.         //将最新时间戳和sequenceBits进行位运算(左移),从而得到一个混合了sequence的时间戳
  40.         long timestampWithSequence = timestamp << sequenceBits;
  41.         //把混合了sequence的时间戳,赋值给timestampAndSequence
  42.         this.timestampAndSequence = new AtomicLong(timestampWithSequence);
  43.     }
  44.     //init workerId
  45.     private void initWorkerId(Long workerId) {
  46.         if (workerId == null) {
  47.             workerId = generateWorkerId();
  48.         }
  49.         if (workerId > maxWorkerId || workerId < 0) {
  50.             String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId);
  51.             throw new IllegalArgumentException(message);
  52.         }
  53.         //将workerId与timestampBits+sequenceBits的和进行位运算(左移),获取一个workerId
  54.         this.workerId = workerId << (timestampBits + sequenceBits);
  55.     }
  56.     //通过snowflake雪花算法来生成transactionId
  57.     //一共是64位,用64个bit拼接出一个唯一的ID,最高位始终是0,占1个bit
  58.     //接着的10个bit是workerId,一台机器就是一个worker,每个worker都会有一个自己的workerId
  59.     //接着的41个bit是时间戳,表示可以为某台机器的每一毫秒,分配一个自增长的id,毫秒时间戳有13位数,转换为2进制就需要2的41次方,2的20次方是一个7位数的数字
  60.     //最后的12个bit是序列号,如果一台机器在一毫秒内需要为很多线程生成id,就可以通过自增长的12个bit的Sequence为每个线程分配id
  61.    
  62.     //get next UUID(base on snowflake algorithm), which look like:
  63.     //highest 1 bit: always 0
  64.     //next   10 bit: workerId
  65.     //next   41 bit: timestamp
  66.     //lowest 12 bit: sequence
  67.     public long nextId() {
  68.         waitIfNecessary();
  69.         //对包含Sequence序列号的时间戳timestampAndSequence进行累加,也就是对timestampAndSequence的某一个毫秒内的Sequence进行累加
  70.         //如果出现大量的线程并发获取id,此时可能会导致timestampAndSequence的某一个毫秒内的Sequence快速累加,并且将12个bit全部累加完毕
  71.         //最终导致timestampAndSequence的毫秒时间戳也进行累加了
  72.         //但当前的实际时间其实还是这一毫秒,而timestampAndSequence里的毫秒时间戳已经累加到下一个毫秒去了,于是就需要waitIfNecessary()进行处理
  73.         long next = timestampAndSequence.incrementAndGet();
  74.         //把最新的包含Sequence序列号的时间戳next与timestampAndSequenceMask进行位运算,获取真正的包含Sequence序列号的时间戳timestampWithSequence
  75.         long timestampWithSequence = next & timestampAndSequenceMask;
  76.         //对包含Sequence序列号的时间戳与workerId通过位运算拼接在一起
  77.         return workerId | timestampWithSequence;
  78.     }
  79.     //block current thread if the QPS of acquiring UUID is too high that current sequence space is exhausted
  80.     //如果获取UUID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽了,那么就需要阻塞当前线程5毫秒
  81.     private void waitIfNecessary() {
  82.         //先获取包含Sequence序列号的当前时间戳
  83.         long currentWithSequence = timestampAndSequence.get();
  84.         //将currentWithSequence与sequenceBits进行位运算(右移),获取到当前时间戳
  85.         long current = currentWithSequence >>> sequenceBits;
  86.         //获取相对于twepoch的最新时间戳
  87.         long newest = getNewestTimestamp();
  88.         //如果当前的时间戳大于最新的时间戳,说明获取UUID的QPS过高,导致timestampAndSequence增长太快了(出现时钟回拨问题)
  89.         if (current >= newest) {
  90.             try {
  91.                 //如果获取UUID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽了,那么就需要阻塞当前线程5毫秒
  92.                 Thread.sleep(5);
  93.             } catch (InterruptedException ignore) {
  94.                 //don't care
  95.             }
  96.         }
  97.     }
  98.     //get newest timestamp relative to twepoch
  99.     private long getNewestTimestamp() {
  100.         //通过当前毫秒单位的时间戳 减去 一个固定的时间twepoch,得到的就是相对于twepoch的最新时间戳
  101.         return System.currentTimeMillis() - twepoch;
  102.     }
  103.     //auto generate workerId, try using mac first, if failed, then randomly generate one
  104.     private long generateWorkerId() {
  105.         try {
  106.             //生成一个workerId,默认是基于网络的Mac地址来生成的
  107.             return generateWorkerIdBaseOnMac();
  108.         } catch (Exception e) {
  109.             return generateRandomWorkerId();
  110.         }
  111.     }
  112.     //use lowest 10 bit of available MAC as workerId
  113.     private long generateWorkerIdBaseOnMac() throws Exception {
  114.         //获取所有的网络接口
  115.         Enumeration<NetworkInterface> all = NetworkInterface.getNetworkInterfaces();
  116.         //遍历每一个网络接口
  117.         while (all.hasMoreElements()) {
  118.             NetworkInterface networkInterface = all.nextElement();
  119.             boolean isLoopback = networkInterface.isLoopback();
  120.             boolean isVirtual = networkInterface.isVirtual();
  121.             //如果是虚拟的、回环的地址,那么这个地址就跳过,不能使用
  122.             if (isLoopback || isVirtual) {
  123.                 continue;
  124.             }
  125.             //获取本机网络地址里的Mac地址,基于Mac地址来生成一个workerid
  126.             byte[] mac = networkInterface.getHardwareAddress();
  127.             return ((mac[4] & 0B11) << 8) | (mac[5] & 0xFF);
  128.         }
  129.         throw new RuntimeException("no available mac found");
  130.     }
  131.     //randomly generate one as workerId
  132.     private long generateRandomWorkerId() {
  133.         return new Random().nextInt(maxWorkerId + 1);
  134.     }
  135. }
复制代码
(3)将全局事务会话持久化到File文件的实现
  1. public class XID {
  2.     private static int port;
  3.     private static String ipAddress;
  4.     ...
  5.     //Generate xid string.
  6.     public static String generateXID(long tranId) {
  7.         //首先获取当前机器的IP地址
  8.         //然后拼接上一个冒号、接着拼接一个端口号、再拼接一个冒号
  9.         //最后再拼接事务id,以此来生成xid
  10.         //所以xid是通过ip:port:transactionId拼接出来的
  11.         return new StringBuilder().append(ipAddress).append(IP_PORT_SPLIT_CHAR).append(port).append(IP_PORT_SPLIT_CHAR).append(tranId).toString();
  12.     }
  13.     ...
  14. }
复制代码
  1. public class DefaultCore implements Core {
  2.     ...
  3.     @Override
  4.     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
  5.         //创建一个全局事务会话
  6.         GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
  7.       
  8.         //通过slf4j的MDC把xid放入线程本地变量副本里去
  9.         MDC.put(RootContext.MDC_KEY_XID, session.getXid());
  10.       
  11.         //添加一个全局事务会话的生命周期监听器
  12.         session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  13.       
  14.         //打开Session,其中会对全局事务会话进行持久化
  15.         session.begin();
  16.         //transaction start event,发布会话开启事件
  17.         MetricsPublisher.postSessionDoingEvent(session, false);
  18.         //返回全局事务会话的xid
  19.         return session.getXid();
  20.     }
  21.     ...
  22. }
  23. public class GlobalSession implements SessionLifecycle, SessionStorable {
  24.     ...
  25.     @Override
  26.     public void begin() throws TransactionException {
  27.         this.status = GlobalStatus.Begin;
  28.         this.beginTime = System.currentTimeMillis();
  29.         this.active = true;
  30.         for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
  31.             lifecycleListener.onBegin(this);
  32.         }
  33.     }
  34.     ...
  35. }
  36. public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
  37.     ...
  38.     @Override
  39.     public void onBegin(GlobalSession globalSession) throws TransactionException {
  40.         addGlobalSession(globalSession);
  41.     }
  42.    
  43.     @Override
  44.     public void addGlobalSession(GlobalSession session) throws TransactionException {
  45.         if (LOGGER.isDebugEnabled()) {
  46.             LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD);
  47.         }
  48.         writeSession(LogOperation.GLOBAL_ADD, session);
  49.     }
  50.    
  51.     private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
  52.         //transactionStoreManager.writeSession()会对全局事务会话进行持久化
  53.         if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
  54.             if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
  55.                 throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store global session");
  56.             } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
  57.                 throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update global session");
  58.             } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
  59.                 throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove global session");
  60.             } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
  61.                 throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store branch session");
  62.             } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
  63.                 throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update branch session");
  64.             } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
  65.                 throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove branch session");
  66.             } else {
  67.                 throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Unknown LogOperation:" + logOperation.name());
  68.             }
  69.         }
  70.     }
  71.     ...
  72. }
复制代码
  1. //The type Database transaction store manager.
  2. public class DataBaseTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {
  3.     private static volatile DataBaseTransactionStoreManager instance;
  4.     protected LogStore logStore;
  5.     ...
  6.     //Get the instance.
  7.     public static DataBaseTransactionStoreManager getInstance() {
  8.         if (instance == null) {
  9.             synchronized (DataBaseTransactionStoreManager.class) {
  10.                 if (instance == null) {
  11.                     instance = new DataBaseTransactionStoreManager();
  12.                 }
  13.             }
  14.         }
  15.         return instance;
  16.     }
  17.    
  18.     //Instantiates a new Database transaction store manager.
  19.     private DataBaseTransactionStoreManager() {
  20.         logQueryLimit = CONFIG.getInt(ConfigurationKeys.STORE_DB_LOG_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);
  21.         String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
  22.         //init dataSource,通过SPI机制加载DataSourceProvider
  23.         DataSource logStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
  24.         logStore = new LogStoreDataBaseDAO(logStoreDataSource);
  25.     }
  26.    
  27.     @Override
  28.     public boolean writeSession(LogOperation logOperation, SessionStorable session) {
  29.         if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
  30.             return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
  31.         } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
  32.             return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
  33.         } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
  34.             return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
  35.         } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
  36.             return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
  37.         } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
  38.             return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
  39.         } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
  40.             return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
  41.         } else {
  42.             throw new StoreException("Unknown LogOperation:" + logOperation.name());
  43.         }
  44.     }
  45.     ...
  46. }
  47. public class LogStoreDataBaseDAO implements LogStore {
  48.     protected DataSource logStoreDataSource = null;
  49.     protected String globalTable;
  50.     protected String branchTable;
  51.     private String dbType;
  52.     ...
  53.     public LogStoreDataBaseDAO(DataSource logStoreDataSource) {
  54.         this.logStoreDataSource = logStoreDataSource;
  55.         globalTable = CONFIG.getConfig(ConfigurationKeys.STORE_DB_GLOBAL_TABLE, DEFAULT_STORE_DB_GLOBAL_TABLE);
  56.         branchTable = CONFIG.getConfig(ConfigurationKeys.STORE_DB_BRANCH_TABLE, DEFAULT_STORE_DB_BRANCH_TABLE);
  57.         dbType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_TYPE);
  58.         if (StringUtils.isBlank(dbType)) {
  59.             throw new StoreException("there must be db type.");
  60.         }
  61.         if (logStoreDataSource == null) {
  62.             throw new StoreException("there must be logStoreDataSource.");
  63.         }
  64.         //init transaction_name size
  65.         initTransactionNameSize();
  66.     }
  67.    
  68.     @Override
  69.     public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
  70.         String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
  71.         Connection conn = null;
  72.         PreparedStatement ps = null;
  73.         try {
  74.             int index = 1;
  75.             conn = logStoreDataSource.getConnection();
  76.             conn.setAutoCommit(true);
  77.             ps = conn.prepareStatement(sql);
  78.             ps.setString(index++, globalTransactionDO.getXid());
  79.             ps.setLong(index++, globalTransactionDO.getTransactionId());
  80.             ps.setInt(index++, globalTransactionDO.getStatus());
  81.             ps.setString(index++, globalTransactionDO.getApplicationId());
  82.             ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
  83.             String transactionName = globalTransactionDO.getTransactionName();
  84.             transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0, transactionNameColumnSize) : transactionName;
  85.             ps.setString(index++, transactionName);
  86.             ps.setInt(index++, globalTransactionDO.getTimeout());
  87.             ps.setLong(index++, globalTransactionDO.getBeginTime());
  88.             ps.setString(index++, globalTransactionDO.getApplicationData());
  89.             return ps.executeUpdate() > 0;
  90.         } catch (SQLException e) {
  91.             throw new StoreException(e);
  92.         } finally {
  93.             IOUtil.close(ps, conn);
  94.         }
  95.     }
  96.     ...
  97. }
复制代码
  1. public class FileTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager, ReloadableStore {
  2.     private ReentrantLock writeSessionLock = new ReentrantLock();
  3.     ...
  4.     @Override
  5.     public boolean writeSession(LogOperation logOperation, SessionStorable session) {
  6.         long curFileTrxNum;
  7.         writeSessionLock.lock();
  8.         try {
  9.             if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
  10.                 return false;
  11.             }
  12.             lastModifiedTime = System.currentTimeMillis();
  13.             curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
  14.             if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
  15.                 return saveHistory();
  16.             }
  17.         } catch (Exception exx) {
  18.             LOGGER.error("writeSession error, {}", exx.getMessage(), exx);
  19.             return false;
  20.         } finally {
  21.             writeSessionLock.unlock();
  22.         }
  23.         flushDisk(curFileTrxNum, currFileChannel);
  24.         return true;
  25.     }
  26.    
  27.     private boolean writeDataFile(byte[] bs) {
  28.         if (bs == null || bs.length >= Integer.MAX_VALUE - 3) {
  29.             return false;
  30.         }
  31.         if (!writeDataFrame(bs)) {
  32.             return false;
  33.         }
  34.         return flushWriteBuffer(writeBuffer);
  35.     }
  36.    
  37.     private boolean writeDataFrame(byte[] data) {
  38.         if (data == null || data.length <= 0) {
  39.             return true;
  40.         }
  41.         int dataLength = data.length;
  42.         int bufferRemainingSize = writeBuffer.remaining();
  43.         if (bufferRemainingSize <= INT_BYTE_SIZE) {
  44.             if (!flushWriteBuffer(writeBuffer)) {
  45.                 return false;
  46.             }
  47.         }
  48.         bufferRemainingSize = writeBuffer.remaining();
  49.         if (bufferRemainingSize <= INT_BYTE_SIZE) {
  50.             throw new IllegalStateException(String.format("Write buffer remaining size %d was too small", bufferRemainingSize));
  51.         }
  52.         writeBuffer.putInt(dataLength);
  53.         bufferRemainingSize = writeBuffer.remaining();
  54.         int dataPos = 0;
  55.         while (dataPos < dataLength) {
  56.             int dataLengthToWrite = dataLength - dataPos;
  57.             dataLengthToWrite = Math.min(dataLengthToWrite, bufferRemainingSize);
  58.             writeBuffer.put(data, dataPos, dataLengthToWrite);
  59.             bufferRemainingSize = writeBuffer.remaining();
  60.             if (bufferRemainingSize == 0) {
  61.                 if (!flushWriteBuffer(writeBuffer)) {
  62.                     return false;
  63.                 }
  64.                 bufferRemainingSize = writeBuffer.remaining();
  65.             }
  66.             dataPos += dataLengthToWrite;
  67.         }
  68.         return true;
  69.     }
  70.    
  71.     private boolean flushWriteBuffer(ByteBuffer writeBuffer) {
  72.         writeBuffer.flip();
  73.         if (!writeDataFileByBuffer(writeBuffer)) {
  74.             return false;
  75.         }
  76.         writeBuffer.clear();
  77.         return true;
  78.     }
  79.    
  80.     private void flushDisk(long curFileNum, FileChannel currFileChannel) {
  81.         if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) {
  82.             SyncFlushRequest syncFlushRequest = new SyncFlushRequest(curFileNum, currFileChannel);
  83.             writeDataFileRunnable.putRequest(syncFlushRequest);
  84.             syncFlushRequest.waitForFlush(MAX_WAIT_FOR_FLUSH_TIME_MILLS);
  85.         } else {
  86.             writeDataFileRunnable.putRequest(new AsyncFlushRequest(curFileNum, currFileChannel));
  87.         }
  88.     }
  89.     ...
  90. }
  91. public class TransactionWriteStore implements SessionStorable {
  92.     private SessionStorable sessionRequest;
  93.     private LogOperation operate;
  94.    
  95.     public TransactionWriteStore(SessionStorable sessionRequest, LogOperation operate) {
  96.         this.sessionRequest = sessionRequest;
  97.         this.operate = operate;
  98.     }
  99.    
  100.     @Override
  101.     public byte[] encode() {
  102.         byte[] bySessionRequest = this.sessionRequest.encode();
  103.         byte byOpCode = this.getOperate().getCode();
  104.         int len = bySessionRequest.length + 1;
  105.         byte[] byResult = new byte[len];
  106.         ByteBuffer byteBuffer = ByteBuffer.wrap(byResult);
  107.         byteBuffer.put(bySessionRequest);
  108.         byteBuffer.put(byOpCode);
  109.         return byResult;
  110.     }
  111.     ...
  112. }
复制代码
  1. public class RedisTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {
  2.     private static volatile RedisTransactionStoreManager instance;
  3.     //Map for LogOperation Global Operation
  4.     public static volatile ImmutableMap<LogOperation, Function<GlobalTransactionDO, Boolean>> globalMap;
  5.     //Map for LogOperation Branch Operation
  6.     public static volatile ImmutableMap<LogOperation, Function<BranchTransactionDO, Boolean>> branchMap;
  7.     ...
  8.     public static RedisTransactionStoreManager getInstance() {
  9.         if (instance == null) {
  10.             synchronized (RedisTransactionStoreManager.class) {
  11.                 if (instance == null) {
  12.                     instance = new RedisTransactionStoreManager();
  13.                 }
  14.             }
  15.         }
  16.         return instance;
  17.     }
  18.    
  19.     public RedisTransactionStoreManager() {
  20.         super();
  21.         initGlobalMap();
  22.         initBranchMap();
  23.         logQueryLimit = CONFIG.getInt(STORE_REDIS_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);
  24.         if (logQueryLimit > DEFAULT_LOG_QUERY_LIMIT) {
  25.             logQueryLimit = DEFAULT_LOG_QUERY_LIMIT;
  26.         }
  27.     }
  28.    
  29.     public void initGlobalMap() {
  30.         if (CollectionUtils.isEmpty(branchMap)) {
  31.             globalMap = ImmutableMap.<LogOperation, Function<GlobalTransactionDO, Boolean>>builder()
  32.                 .put(LogOperation.GLOBAL_ADD, this::insertGlobalTransactionDO)
  33.                 .put(LogOperation.GLOBAL_UPDATE, this::updateGlobalTransactionDO)
  34.                 .put(LogOperation.GLOBAL_REMOVE, this::deleteGlobalTransactionDO)
  35.                 .build();
  36.         }
  37.     }
  38.    
  39.     public void initBranchMap() {
  40.         if (CollectionUtils.isEmpty(branchMap)) {
  41.             branchMap = ImmutableMap.<LogOperation, Function<BranchTransactionDO, Boolean>>builder()
  42.                 .put(LogOperation.BRANCH_ADD, this::insertBranchTransactionDO)
  43.                 .put(LogOperation.BRANCH_UPDATE, this::updateBranchTransactionDO)
  44.                 .put(LogOperation.BRANCH_REMOVE, this::deleteBranchTransactionDO)
  45.                 .build();
  46.         }
  47.     }
  48.    
  49.     //Insert the global transaction.
  50.     private boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
  51.         String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId());
  52.         try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {
  53.             Date now = new Date();
  54.             globalTransactionDO.setGmtCreate(now);
  55.             globalTransactionDO.setGmtModified(now);
  56.             pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO));
  57.             pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), globalTransactionDO.getXid());
  58.             pipelined.sync();
  59.             return true;
  60.         } catch (Exception ex) {
  61.             throw new RedisException(ex);
  62.         }
  63.     }
  64.    
  65.     //Insert branch transaction
  66.     private boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {
  67.         String branchKey = buildBranchKey(branchTransactionDO.getBranchId());
  68.         String branchListKey = buildBranchListKeyByXid(branchTransactionDO.getXid());
  69.         try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {
  70.             Date now = new Date();
  71.             branchTransactionDO.setGmtCreate(now);
  72.             branchTransactionDO.setGmtModified(now);
  73.             pipelined.hmset(branchKey, BeanUtils.objectToMap(branchTransactionDO));
  74.             pipelined.rpush(branchListKey, branchKey);  
  75.             pipelined.sync();
  76.             return true;
  77.         } catch (Exception ex) {
  78.             throw new RedisException(ex);
  79.         }
  80.     }
  81.    
  82.     @Override
  83.     public boolean writeSession(LogOperation logOperation, SessionStorable session) {
  84.         if (globalMap.containsKey(logOperation) || branchMap.containsKey(logOperation)) {
  85.             return globalMap.containsKey(logOperation) ?
  86.                 globalMap.get(logOperation).apply(SessionConverter.convertGlobalTransactionDO(session)) :
  87.                 branchMap.get(logOperation).apply(SessionConverter.convertBranchTransactionDO(session));
  88.         } else {
  89.             throw new StoreException("Unknown LogOperation:" + logOperation.name());
  90.         }
  91.     }
  92.     ...
  93. }
  94. public class SessionConverter {
  95.     ...
  96.     public static GlobalTransactionDO convertGlobalTransactionDO(SessionStorable session) {
  97.         if (session == null || !(session instanceof GlobalSession)) {
  98.             throw new IllegalArgumentException("The parameter of SessionStorable is not available, SessionStorable:" + StringUtils.toString(session));
  99.         }
  100.         GlobalSession globalSession = (GlobalSession)session;
  101.         GlobalTransactionDO globalTransactionDO = new GlobalTransactionDO();
  102.         globalTransactionDO.setXid(globalSession.getXid());
  103.         globalTransactionDO.setStatus(globalSession.getStatus().getCode());
  104.         globalTransactionDO.setApplicationId(globalSession.getApplicationId());
  105.         globalTransactionDO.setBeginTime(globalSession.getBeginTime());
  106.         globalTransactionDO.setTimeout(globalSession.getTimeout());
  107.         globalTransactionDO.setTransactionId(globalSession.getTransactionId());
  108.         globalTransactionDO.setTransactionName(globalSession.getTransactionName());
  109.         globalTransactionDO.setTransactionServiceGroup(globalSession.getTransactionServiceGroup());
  110.         globalTransactionDO.setApplicationData(globalSession.getApplicationData());
  111.         return globalTransactionDO;
  112.     }
  113.    
  114.     public static BranchTransactionDO convertBranchTransactionDO(SessionStorable session) {
  115.         if (session == null || !(session instanceof BranchSession)) {
  116.             throw new IllegalArgumentException("The parameter of SessionStorable is not available, SessionStorable:" + StringUtils.toString(session));
  117.         }
  118.         BranchSession branchSession = (BranchSession)session;
  119.         BranchTransactionDO branchTransactionDO = new BranchTransactionDO();
  120.         branchTransactionDO.setXid(branchSession.getXid());
  121.         branchTransactionDO.setBranchId(branchSession.getBranchId());
  122.         branchTransactionDO.setBranchType(branchSession.getBranchType().name());
  123.         branchTransactionDO.setClientId(branchSession.getClientId());
  124.         branchTransactionDO.setResourceGroupId(branchSession.getResourceGroupId());
  125.         branchTransactionDO.setTransactionId(branchSession.getTransactionId());
  126.         branchTransactionDO.setApplicationData(branchSession.getApplicationData());
  127.         branchTransactionDO.setResourceId(branchSession.getResourceId());
  128.         branchTransactionDO.setStatus(branchSession.getStatus().getCode());
  129.         return branchTransactionDO;
  130.     }
  131.     ...
  132. }
复制代码
  1. -> ServerHandler.channelRead()接收Seata Client发送过来的请求;
  2. -> AbstractNettyRemoting.processMessage()处理RpcMessage消息;
  3. -> ServerOnRequestProcessor.process()处理RpcMessage消息;
  4. -> TransactionMessageHandler.onRequest()处理RpcMessage消息;
  5. -> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端;
复制代码
 
6.Client获取Server的响应与处理的源码
  1. public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
  2.     ...
  3.     @ChannelHandler.Sharable
  4.     class ServerHandler extends ChannelDuplexHandler {
  5.         @Override
  6.         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  7.             if (!(msg instanceof RpcMessage)) {
  8.                 return;
  9.             }
  10.             //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
  11.             processMessage(ctx, (RpcMessage) msg);
  12.         }
  13.     }
  14. }
  15. public abstract class AbstractNettyRemoting implements Disposable {
  16.     ...
  17.     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  18.         if (LOGGER.isDebugEnabled()) {
  19.             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
  20.         }
  21.         Object body = rpcMessage.getBody();
  22.         if (body instanceof MessageTypeAware) {
  23.             MessageTypeAware messageTypeAware = (MessageTypeAware) body;
  24.             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
  25.             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
  26.             //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
  27.             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
  28.             if (pair != null) {
  29.                 if (pair.getSecond() != null) {
  30.                     try {
  31.                         pair.getSecond().execute(() -> {
  32.                             try {
  33.                                 pair.getFirst().process(ctx, rpcMessage);
  34.                             } catch (Throwable th) {
  35.                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  36.                             } finally {
  37.                                 MDC.clear();
  38.                             }
  39.                         });
  40.                     } catch (RejectedExecutionException e) {
  41.                         ...
  42.                     }
  43.                 } else {
  44.                     try {
  45.                         pair.getFirst().process(ctx, rpcMessage);
  46.                     } catch (Throwable th) {
  47.                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  48.                     }
  49.                 }
  50.             } else {
  51.                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
  52.             }
  53.         } else {
  54.             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
  55.         }
  56.     }
  57.     ...
  58. }
  59. public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
  60.     private final RemotingServer remotingServer;
  61.     ...
  62.     @Override
  63.     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  64.         if (ChannelManager.isRegistered(ctx.channel())) {
  65.             onRequestMessage(ctx, rpcMessage);
  66.         } else {
  67.             try {
  68.                 if (LOGGER.isInfoEnabled()) {
  69.                     LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
  70.                 }
  71.                 ctx.disconnect();
  72.                 ctx.close();
  73.             } catch (Exception exx) {
  74.                 LOGGER.error(exx.getMessage());
  75.             }
  76.             if (LOGGER.isInfoEnabled()) {
  77.                 LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
  78.             }
  79.         }
  80.     }
  81.    
  82.     private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
  83.         Object message = rpcMessage.getBody();
  84.         //RpcContext线程本地变量副本
  85.         RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
  86.         if (LOGGER.isDebugEnabled()) {
  87.             LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
  88.         } else {
  89.             try {
  90.                 BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
  91.             } catch (InterruptedException e) {
  92.                 LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
  93.             }
  94.         }
  95.         if (!(message instanceof AbstractMessage)) {
  96.             return;
  97.         }
  98.         // the batch send request message
  99.         if (message instanceof MergedWarpMessage) {
  100.             ...
  101.         } else {
  102.             // the single send request message
  103.             final AbstractMessage msg = (AbstractMessage) message;
  104.             //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage
  105.             AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
  106.             //返回响应给客户端
  107.             remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
  108.         }
  109.     }
  110.     ...
  111. }
复制代码
  1. -> TransactionMessageHandler.onRequest()处理RpcMessage消息;
  2. -> DefaultCoordinator.onRequest()处理RpcMessage消息;
  3. -> GlobalBeginRequest.handle()处理开启全局事务请求;
  4. -> AbstractTCInboundHandler.handle()开启全局事务返回全局事务;
  5. -> DefaultCoordinator.doGlobalBegin()开启全局事务;
  6. -> DefaultCore.begin()创建全局事务会话并开启;
  7. -> GlobalSession.createGlobalSession()创建全局事务会话;
  8. -> GlobalSession.begin()开启全局事务会话;
  9. -> AbstractSessionManager.onBegin()
  10. -> AbstractSessionManager.addGlobalSession()
  11. -> AbstractSessionManager.writeSession()
  12. -> TransactionStoreManager.writeSession()持久化全局事务会话;
复制代码
由于Seata Client发送开启全局事务的请求给Seata Server时,会通过MessageFuture的get()方法同步等待Seata Server返回响应。所以当Seata Client获取Seata Server的响应并通过complete()方法设置MessageFuture已经完成后,原来同步等待Seata Server响应的线程便会继续往下处理。
 
即某线程执行CompletableFuture.complete()方法后,执行CompletableFuture.get()方法的线程就不会被阻塞而会被唤醒。
  1. public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
  2.     ...
  3.     @Override
  4.     public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
  5.         if (!(request instanceof AbstractTransactionRequestToTC)) {
  6.             throw new IllegalArgumentException();
  7.         }
  8.         AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
  9.         transactionRequest.setTCInboundHandler(this);
  10.         return transactionRequest.handle(context);
  11.     }
  12.     ...
  13. }
  14. public class GlobalBeginRequest extends AbstractTransactionRequestToTC {
  15.     ...
  16.     @Override
  17.     public AbstractTransactionResponse handle(RpcContext rpcContext) {
  18.         return handler.handle(this, rpcContext);
  19.     }
  20.     ...
  21. }
  22. public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
  23.     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTCInboundHandler.class);
  24.    
  25.     @Override
  26.     public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
  27.         GlobalBeginResponse response = new GlobalBeginResponse();
  28.         exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
  29.             @Override
  30.             public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
  31.                 try {
  32.                     //开启全局事务
  33.                     doGlobalBegin(request, response, rpcContext);
  34.                 } catch (StoreException e) {
  35.                     throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e);
  36.                 }
  37.             }
  38.         }, request, response);
  39.         return response;
  40.     }
  41.     ...
  42. }
  43. public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
  44.     private final DefaultCore core;
  45.     ...
  46.     @Override
  47.     protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {
  48.         //接下来才真正处理开启全局事务的业务逻辑
  49.         //其中会调用DefaultCore来真正开启一个全局事务,即拿到xid并设置到响应里去
  50.         response.setXid(core.begin(
  51.             rpcContext.getApplicationId(),//应用程序id
  52.             rpcContext.getTransactionServiceGroup(),//事务服务分组
  53.             request.getTransactionName(),//事务名称
  54.             request.getTimeout())//超时时间
  55.         );
  56.         if (LOGGER.isInfoEnabled()) {
  57.             LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
  58.         }
  59.     }
  60.     ...
  61. }
  62. public class DefaultCore implements Core {
  63.     ...
  64.     @Override
  65.     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
  66.         //创建一个全局事务会话
  67.         GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
  68.         //通过slf4j的MDC把xid放入线程本地变量副本里去
  69.         MDC.put(RootContext.MDC_KEY_XID, session.getXid());
  70.         //添加一个全局事务会话的生命周期监听器
  71.         session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
  72.         //打开Session,其中会对全局事务会话进行持久化
  73.         session.begin();
  74.         //transaction start event,发布会话开启事件
  75.         MetricsPublisher.postSessionDoingEvent(session, false);
  76.         //返回全局事务会话的xid
  77.         return session.getXid();
  78.     }
  79.     ...
  80. }
  81. public class GlobalSession implements SessionLifecycle, SessionStorable {
  82.     ...
  83.     public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {
  84.         GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);
  85.         return session;
  86.     }
  87.    
  88.     public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {
  89.         //全局事务id是通过UUIDGenerator来生成的
  90.         this.transactionId = UUIDGenerator.generateUUID();
  91.         this.status = GlobalStatus.Begin;
  92.         this.lazyLoadBranch = lazyLoadBranch;
  93.         if (!lazyLoadBranch) {
  94.             this.branchSessions = new ArrayList<>();
  95.         }
  96.         this.applicationId = applicationId;
  97.         this.transactionServiceGroup = transactionServiceGroup;
  98.         this.transactionName = transactionName;
  99.         this.timeout = timeout;
  100.         //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid
  101.         this.xid = XID.generateXID(transactionId);
  102.     }
  103.    
  104.     @Override
  105.     public void begin() throws TransactionException {
  106.         this.status = GlobalStatus.Begin;
  107.         this.beginTime = System.currentTimeMillis();
  108.         this.active = true;
  109.         for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
  110.             lifecycleListener.onBegin(this);
  111.         }
  112.     }
  113.     ...
  114. }
  115. public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
  116.     ...
  117.     @Override
  118.     public void onBegin(GlobalSession globalSession) throws TransactionException {
  119.         addGlobalSession(globalSession);
  120.     }
  121.    
  122.     @Override
  123.     public void addGlobalSession(GlobalSession session) throws TransactionException {
  124.         if (LOGGER.isDebugEnabled()) {
  125.             LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD);
  126.         }
  127.         writeSession(LogOperation.GLOBAL_ADD, session);
  128.     }
  129.    
  130.     private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
  131.         //transactionStoreManager.writeSession()会对全局事务会话进行持久化
  132.         if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
  133.             ...
  134.         }
  135.     }
  136.     ...
  137. }
复制代码
  1. -> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端;
  2. -> AbstractNettyRemotingServer.sendAsyncResponse()异步发送响应;
  3. -> AbstractNettyRemoting.buildResponseMessage()构造包含xid响应;
  4. -> AbstractNettyRemoting.sendAsync()异步发送响应;
  5. -> Netty的Channel.writeAndFlush()发送响应给客户端;
复制代码
 
7.Seata与Dubbo整合的过滤器源码
(1)调用Dubbo过滤器的入口
(2)Seata与Dubbo整合的过滤器
 
(1)调用Dubbo过滤器的入口
  1. public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
  2.     ...
  3.     @Override
  4.     public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {
  5.         Channel clientChannel = channel;
  6.         if (!(msg instanceof HeartbeatMessage)) {
  7.             clientChannel = ChannelManager.getSameClientChannel(channel);
  8.         }
  9.         if (clientChannel != null) {
  10.             RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage
  11.                 ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE
  12.                 : ProtocolConstants.MSGTYPE_RESPONSE);
  13.             super.sendAsync(clientChannel, rpcMsg);
  14.         } else {
  15.             throw new RuntimeException("channel is error.");
  16.         }
  17.     }
  18.     ...
  19. }
  20. public abstract class AbstractNettyRemoting implements Disposable {
  21.     ...
  22.     protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) {
  23.         RpcMessage rpcMsg = new RpcMessage();
  24.         rpcMsg.setMessageType(messageType);
  25.         rpcMsg.setCodec(rpcMessage.getCodec()); // same with request
  26.         rpcMsg.setCompressor(rpcMessage.getCompressor());
  27.         rpcMsg.setBody(msg);
  28.         rpcMsg.setId(rpcMessage.getId());
  29.         return rpcMsg;
  30.     }
  31.    
  32.     //rpc async request.
  33.     protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
  34.         channelWritableCheck(channel, rpcMessage.getBody());
  35.         if (LOGGER.isDebugEnabled()) {
  36.             LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
  37.         }
  38.         doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
  39.         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
  40.             if (!future.isSuccess()) {
  41.                 destroyChannel(future.channel());
  42.             }
  43.         });
  44.     }
  45.     ...
  46. }
复制代码
  1. -> ClientHandler.channelRead()接收Seata Server返回的响应;
  2. -> AbstractNettyRemoting.processMessage()处理RpcMessage消息;
  3. -> ClientOnResponseProcessor.process()会设置MessageFuture结果;
  4. -> MessageFuture.setResultMessage()设置MessageFuture结果;
  5. -> CompletableFuture.complete()唤醒阻塞的线程;
复制代码
(2)Seata与Dubbo整合的过滤器
如果线程本地变量副本里的xid不为null,对应于发起RPC调用的情形。如果线程本地变量副本里的xid为null,则对应于接收RPC调用的情形。
 
当RootContext的xid不为null时,需要设置RpcContext的xid。当RootContext的xid为null + RpcContext的xid不为null时,需要设置RootContext的xid。
  1. public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
  2.     ...
  3.     @Sharable
  4.     class ClientHandler extends ChannelDuplexHandler {
  5.         @Override
  6.         public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
  7.             if (!(msg instanceof RpcMessage)) {
  8.                 return;
  9.             }
  10.             processMessage(ctx, (RpcMessage) msg);
  11.         }
  12.         ...
  13.     }
  14.     ...
  15. }
  16. public abstract class AbstractNettyRemoting implements Disposable {
  17.     ...
  18.     protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  19.         if (LOGGER.isDebugEnabled()) {
  20.             LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
  21.         }
  22.         Object body = rpcMessage.getBody();
  23.         if (body instanceof MessageTypeAware) {
  24.             MessageTypeAware messageTypeAware = (MessageTypeAware) body;
  25.             //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
  26.             //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
  27.             //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
  28.             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
  29.             if (pair != null) {
  30.                 if (pair.getSecond() != null) {
  31.                     try {
  32.                         pair.getSecond().execute(() -> {
  33.                             try {
  34.                                 pair.getFirst().process(ctx, rpcMessage);
  35.                             } catch (Throwable th) {
  36.                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  37.                             } finally {
  38.                                 MDC.clear();
  39.                             }
  40.                         });
  41.                     } catch (RejectedExecutionException e) {
  42.                         ...
  43.                     }
  44.                 } else {
  45.                     try {
  46.                         pair.getFirst().process(ctx, rpcMessage);
  47.                     } catch (Throwable th) {
  48.                         LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
  49.                     }
  50.                 }
  51.             } else {
  52.                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
  53.             }
  54.         } else {
  55.             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
  56.         }
  57.     }
  58.     ...
  59. }
  60. public class ClientOnResponseProcessor implements RemotingProcessor {
  61.     ...
  62.     @Override
  63.     public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
  64.         if (rpcMessage.getBody() instanceof MergeResultMessage) {
  65.             ...
  66.         } else if (rpcMessage.getBody() instanceof BatchResultMessage) {
  67.             ...
  68.         } else {
  69.             //这里是对普通消息的处理
  70.             MessageFuture messageFuture = futures.remove(rpcMessage.getId());
  71.             if (messageFuture != null) {
  72.                 messageFuture.setResultMessage(rpcMessage.getBody());
  73.             } else {
  74.                 if (rpcMessage.getBody() instanceof AbstractResultMessage) {
  75.                     if (transactionMessageHandler != null) {
  76.                         transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
  77.                     }
  78.                 }
  79.             }
  80.         }
  81.     }
  82.     ...
  83. }
  84. public class MessageFuture {
  85.     private transient CompletableFuture<Object> origin = new CompletableFuture<>();
  86.     ...
  87.     //Sets result message.
  88.     public void setResultMessage(Object obj) {
  89.         origin.complete(obj);
  90.     }
  91.     ...
  92. }
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册