大纲
1.Seata开启分布式事务的流程总结
2.Seata生成全局事务ID的雪花算法源码
3.生成xid以及对全局事务会话进行持久化的源码
4.全局事务会话数据持久化的实现源码
5.Seata Server创建全局事务与返回xid的源码
6.Client获取Server的响应与处理的源码
7.Seata与Dubbo整合的过滤器源码
1.Seata开启分布式事务的流程总结
(1)Seata分布式事务执行流程
(2)开启一个全局事务的流程
(1)Seata分布式事务执行流程
Seata Client在执行添加了全局事务注解@GlobalTransactional的方法时,实际执行的是根据全局事务拦截器创建该方法所在Bean的动态代理方法,于是会执行GlobalTransactionalInterceptor的invoke()方法。此时,添加了全局事务注解@GlobalTransactional的方法就会被全局事务拦截器拦截了。
GlobalTransactionalInterceptor全局事务拦截器拦截目标方法的调用后,会由事务执行模版TransactionalTemplate的excute()方法来执行目标方法。
在事务执行模版TransactionalTemplate的excute()方法中,首先会判断Propagation全局事务传播级别,然后开启一个全局事务(也就是打开一个全局事务),接着才执行具体的业务目标方法。
执行具体的业务目标方法时,会通过Dubbo的RPC调用来传递全局事务的xid给其他的Seata Client。其他的Seata Client通过Dubbo过滤器获取到RPC调用中的xid后,会将xid放入线程本地变量副本中。之后执行SQL时就会获取数据库连接代理来对SQL进行拦截,数据库连接代理就可以从线程本地变量副本中获取xid,然后开启分支事务。
各个分支事务都执行完毕后,开启全局事务的Seata Client就会提交事务、处理全局锁、资源清理。
(2)开启一个全局事务的流程
Seata Server收到Seata Client发送过来的RpcMessage对象消息后,RpcMessage对象消息首先会由ServerOnRequestProcessor的process()方法处理,然后会由DefaultCoordinator的onRequest()方法进行处理,接着会由GlobalBeginRequest的handle()方法进行处理,然后会由DefaultCoordinator的doGlobalBegin()方法来处理,最后给到DefaultCore的begin()方法来进行处理。
在DefaultCore的begin()方法中,首先就会创建一个全局事务会话,然后将全局事务会话的xid通过MDC放入线程本地变量副本中,接着对该全局事务会话添加一个全局事务会话的生命周期监听器,最后打开该全局事务会话、发布会话开启事件并返回全局事务会话的xid。
在创建一个全局事务会话GlobalSession时,首先会由uuid生成组件UUIDGenerator来生成全局事务id(transactionId),然后根据生成的全局事务id(transactionId)来继续生成xid。
2.Seata生成全局事务ID的雪花算法源码
(1)通过UUIDGenerator生成全局事务ID
(2)IdWorker实现的雪花算法生成的ID的组成
(3)IdWorker实现的雪花算法对时钟回拨的处理
(1)通过UUIDGenerator生成全局事务ID
Seata在创建全局事务会话时会通过UUIDGenerator来生成全局事务ID,UUIDGenerator在生成ID时是通过Seata自己实现的雪花算法来生成的。- public class GlobalSession implements SessionLifecycle, SessionStorable {
- ...
- //创建全局事务会话
- public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {
- GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);
- return session;
- }
-
- public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {
- //全局事务id是通过UUIDGenerator来生成的
- this.transactionId = UUIDGenerator.generateUUID();
- this.status = GlobalStatus.Begin;
- this.lazyLoadBranch = lazyLoadBranch;
- if (!lazyLoadBranch) {
- this.branchSessions = new ArrayList<>();
- }
- this.applicationId = applicationId;
- this.transactionServiceGroup = transactionServiceGroup;
- this.transactionName = transactionName;
- this.timeout = timeout;
- //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid
- this.xid = XID.generateXID(transactionId);
- }
- ...
- }
- public class UUIDGenerator {
- private static volatile IdWorker idWorker;
-
- //generate UUID using snowflake algorithm
- public static long generateUUID() {
- //Double Check + volatile,实现并发场景下只创建一次idWorker对象
- if (idWorker == null) {
- synchronized (UUIDGenerator.class) {
- if (idWorker == null) {
- init(null);
- }
- }
- }
- //正常情况下,每次都会通过idWorker生成一个id
- return idWorker.nextId();
- }
-
- //init IdWorker
- public static void init(Long serverNode) {
- idWorker = new IdWorker(serverNode);
- }
- }
复制代码 (2)IdWorker实现的雪花算法生成的ID的组成
IdWorker就是Seata自己实现的基于雪花算法的ID生成器。IdWorker的nextId()方法通过雪花算法生成的transactionId一共是64位,用64个bit拼接出一个唯一的ID。
一.最高位始终是0,占1个bit
二.接着的10个bit是workerId
一台机器就是一个worker,每个worker都会有一个自己的workerId。生成workerId时,是基于本机网络地址里的Mac地址来生成的。
三.接着的41个bit是时间戳
表示可以为某台机器的每一毫秒,分配一个自增长的ID。毫秒时间戳有13位数,转换为2进制需要2的41次方。
四.最后的12个bit是序列号
如果一台机器在一毫秒内需要为很多线程生成ID,就可以通过自增长的12个bit的Sequence为每个线程分配ID。
(3)IdWorker实现的雪花算法对时钟回拨的处理
在执行IdWorker的nextId()方法时,会对包含序列号和时间戳的timestampAndSequence进行累加,也就是对timestampAndSequence的某一个毫秒内的Sequence序列号进行累加。
如果出现大量的线程并发获取ID,此时可能会导致timestampAndSequence中某一个毫秒内的Sequence序列号快速累加,并且将代表Sequence序列号的12个bit全部累加完毕,最后便会导致包含序列号和时间戳的timestampAndSequence中的毫秒时间戳也进行累加。
但当前的实际时间其实还是这一毫秒,而timestampAndSequence里的毫秒时间戳已经累加到下一个毫秒去了,出现时钟回拨问题,于是就需要调用waitIfNecessary()方法进行处理。
所以,在IdWorker的waitIfNecessary()方法中,如果获取ID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽,那么就需要阻塞当前线程5毫秒。(3)将全局事务会话持久化到File文件的实现- public class XID {
- private static int port;
- private static String ipAddress;
- ...
- //Generate xid string.
- public static String generateXID(long tranId) {
- //首先获取当前机器的IP地址
- //然后拼接上一个冒号、接着拼接一个端口号、再拼接一个冒号
- //最后再拼接事务id,以此来生成xid
- //所以xid是通过ip:port:transactionId拼接出来的
- return new StringBuilder().append(ipAddress).append(IP_PORT_SPLIT_CHAR).append(port).append(IP_PORT_SPLIT_CHAR).append(tranId).toString();
- }
- ...
- }
复制代码- public class DefaultCore implements Core {
- ...
- @Override
- public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
- //创建一个全局事务会话
- GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
-
- //通过slf4j的MDC把xid放入线程本地变量副本里去
- MDC.put(RootContext.MDC_KEY_XID, session.getXid());
-
- //添加一个全局事务会话的生命周期监听器
- session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
-
- //打开Session,其中会对全局事务会话进行持久化
- session.begin();
- //transaction start event,发布会话开启事件
- MetricsPublisher.postSessionDoingEvent(session, false);
- //返回全局事务会话的xid
- return session.getXid();
- }
- ...
- }
- public class GlobalSession implements SessionLifecycle, SessionStorable {
- ...
- @Override
- public void begin() throws TransactionException {
- this.status = GlobalStatus.Begin;
- this.beginTime = System.currentTimeMillis();
- this.active = true;
- for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
- lifecycleListener.onBegin(this);
- }
- }
- ...
- }
- public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
- ...
- @Override
- public void onBegin(GlobalSession globalSession) throws TransactionException {
- addGlobalSession(globalSession);
- }
-
- @Override
- public void addGlobalSession(GlobalSession session) throws TransactionException {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD);
- }
- writeSession(LogOperation.GLOBAL_ADD, session);
- }
-
- private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
- //transactionStoreManager.writeSession()会对全局事务会话进行持久化
- if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
- if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
- throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store global session");
- } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
- throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update global session");
- } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
- throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove global session");
- } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
- throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store branch session");
- } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
- throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update branch session");
- } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
- throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove branch session");
- } else {
- throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Unknown LogOperation:" + logOperation.name());
- }
- }
- }
- ...
- }
复制代码- //The type Database transaction store manager.
- public class DataBaseTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {
- private static volatile DataBaseTransactionStoreManager instance;
- protected LogStore logStore;
- ...
- //Get the instance.
- public static DataBaseTransactionStoreManager getInstance() {
- if (instance == null) {
- synchronized (DataBaseTransactionStoreManager.class) {
- if (instance == null) {
- instance = new DataBaseTransactionStoreManager();
- }
- }
- }
- return instance;
- }
-
- //Instantiates a new Database transaction store manager.
- private DataBaseTransactionStoreManager() {
- logQueryLimit = CONFIG.getInt(ConfigurationKeys.STORE_DB_LOG_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);
- String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
- //init dataSource,通过SPI机制加载DataSourceProvider
- DataSource logStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
- logStore = new LogStoreDataBaseDAO(logStoreDataSource);
- }
-
- @Override
- public boolean writeSession(LogOperation logOperation, SessionStorable session) {
- if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
- return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
- } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
- return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
- } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
- return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
- } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
- return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
- } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
- return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
- } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
- return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
- } else {
- throw new StoreException("Unknown LogOperation:" + logOperation.name());
- }
- }
- ...
- }
- public class LogStoreDataBaseDAO implements LogStore {
- protected DataSource logStoreDataSource = null;
- protected String globalTable;
- protected String branchTable;
- private String dbType;
- ...
- public LogStoreDataBaseDAO(DataSource logStoreDataSource) {
- this.logStoreDataSource = logStoreDataSource;
- globalTable = CONFIG.getConfig(ConfigurationKeys.STORE_DB_GLOBAL_TABLE, DEFAULT_STORE_DB_GLOBAL_TABLE);
- branchTable = CONFIG.getConfig(ConfigurationKeys.STORE_DB_BRANCH_TABLE, DEFAULT_STORE_DB_BRANCH_TABLE);
- dbType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_TYPE);
- if (StringUtils.isBlank(dbType)) {
- throw new StoreException("there must be db type.");
- }
- if (logStoreDataSource == null) {
- throw new StoreException("there must be logStoreDataSource.");
- }
- //init transaction_name size
- initTransactionNameSize();
- }
-
- @Override
- public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
- String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
- Connection conn = null;
- PreparedStatement ps = null;
- try {
- int index = 1;
- conn = logStoreDataSource.getConnection();
- conn.setAutoCommit(true);
- ps = conn.prepareStatement(sql);
- ps.setString(index++, globalTransactionDO.getXid());
- ps.setLong(index++, globalTransactionDO.getTransactionId());
- ps.setInt(index++, globalTransactionDO.getStatus());
- ps.setString(index++, globalTransactionDO.getApplicationId());
- ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
- String transactionName = globalTransactionDO.getTransactionName();
- transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0, transactionNameColumnSize) : transactionName;
- ps.setString(index++, transactionName);
- ps.setInt(index++, globalTransactionDO.getTimeout());
- ps.setLong(index++, globalTransactionDO.getBeginTime());
- ps.setString(index++, globalTransactionDO.getApplicationData());
- return ps.executeUpdate() > 0;
- } catch (SQLException e) {
- throw new StoreException(e);
- } finally {
- IOUtil.close(ps, conn);
- }
- }
- ...
- }
复制代码- public class FileTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager, ReloadableStore {
- private ReentrantLock writeSessionLock = new ReentrantLock();
- ...
- @Override
- public boolean writeSession(LogOperation logOperation, SessionStorable session) {
- long curFileTrxNum;
- writeSessionLock.lock();
- try {
- if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
- return false;
- }
- lastModifiedTime = System.currentTimeMillis();
- curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
- if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
- return saveHistory();
- }
- } catch (Exception exx) {
- LOGGER.error("writeSession error, {}", exx.getMessage(), exx);
- return false;
- } finally {
- writeSessionLock.unlock();
- }
- flushDisk(curFileTrxNum, currFileChannel);
- return true;
- }
-
- private boolean writeDataFile(byte[] bs) {
- if (bs == null || bs.length >= Integer.MAX_VALUE - 3) {
- return false;
- }
- if (!writeDataFrame(bs)) {
- return false;
- }
- return flushWriteBuffer(writeBuffer);
- }
-
- private boolean writeDataFrame(byte[] data) {
- if (data == null || data.length <= 0) {
- return true;
- }
- int dataLength = data.length;
- int bufferRemainingSize = writeBuffer.remaining();
- if (bufferRemainingSize <= INT_BYTE_SIZE) {
- if (!flushWriteBuffer(writeBuffer)) {
- return false;
- }
- }
- bufferRemainingSize = writeBuffer.remaining();
- if (bufferRemainingSize <= INT_BYTE_SIZE) {
- throw new IllegalStateException(String.format("Write buffer remaining size %d was too small", bufferRemainingSize));
- }
- writeBuffer.putInt(dataLength);
- bufferRemainingSize = writeBuffer.remaining();
- int dataPos = 0;
- while (dataPos < dataLength) {
- int dataLengthToWrite = dataLength - dataPos;
- dataLengthToWrite = Math.min(dataLengthToWrite, bufferRemainingSize);
- writeBuffer.put(data, dataPos, dataLengthToWrite);
- bufferRemainingSize = writeBuffer.remaining();
- if (bufferRemainingSize == 0) {
- if (!flushWriteBuffer(writeBuffer)) {
- return false;
- }
- bufferRemainingSize = writeBuffer.remaining();
- }
- dataPos += dataLengthToWrite;
- }
- return true;
- }
-
- private boolean flushWriteBuffer(ByteBuffer writeBuffer) {
- writeBuffer.flip();
- if (!writeDataFileByBuffer(writeBuffer)) {
- return false;
- }
- writeBuffer.clear();
- return true;
- }
-
- private void flushDisk(long curFileNum, FileChannel currFileChannel) {
- if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) {
- SyncFlushRequest syncFlushRequest = new SyncFlushRequest(curFileNum, currFileChannel);
- writeDataFileRunnable.putRequest(syncFlushRequest);
- syncFlushRequest.waitForFlush(MAX_WAIT_FOR_FLUSH_TIME_MILLS);
- } else {
- writeDataFileRunnable.putRequest(new AsyncFlushRequest(curFileNum, currFileChannel));
- }
- }
- ...
- }
- public class TransactionWriteStore implements SessionStorable {
- private SessionStorable sessionRequest;
- private LogOperation operate;
-
- public TransactionWriteStore(SessionStorable sessionRequest, LogOperation operate) {
- this.sessionRequest = sessionRequest;
- this.operate = operate;
- }
-
- @Override
- public byte[] encode() {
- byte[] bySessionRequest = this.sessionRequest.encode();
- byte byOpCode = this.getOperate().getCode();
- int len = bySessionRequest.length + 1;
- byte[] byResult = new byte[len];
- ByteBuffer byteBuffer = ByteBuffer.wrap(byResult);
- byteBuffer.put(bySessionRequest);
- byteBuffer.put(byOpCode);
- return byResult;
- }
- ...
- }
复制代码- public class RedisTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {
- private static volatile RedisTransactionStoreManager instance;
- //Map for LogOperation Global Operation
- public static volatile ImmutableMap<LogOperation, Function<GlobalTransactionDO, Boolean>> globalMap;
- //Map for LogOperation Branch Operation
- public static volatile ImmutableMap<LogOperation, Function<BranchTransactionDO, Boolean>> branchMap;
- ...
- public static RedisTransactionStoreManager getInstance() {
- if (instance == null) {
- synchronized (RedisTransactionStoreManager.class) {
- if (instance == null) {
- instance = new RedisTransactionStoreManager();
- }
- }
- }
- return instance;
- }
-
- public RedisTransactionStoreManager() {
- super();
- initGlobalMap();
- initBranchMap();
- logQueryLimit = CONFIG.getInt(STORE_REDIS_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);
- if (logQueryLimit > DEFAULT_LOG_QUERY_LIMIT) {
- logQueryLimit = DEFAULT_LOG_QUERY_LIMIT;
- }
- }
-
- public void initGlobalMap() {
- if (CollectionUtils.isEmpty(branchMap)) {
- globalMap = ImmutableMap.<LogOperation, Function<GlobalTransactionDO, Boolean>>builder()
- .put(LogOperation.GLOBAL_ADD, this::insertGlobalTransactionDO)
- .put(LogOperation.GLOBAL_UPDATE, this::updateGlobalTransactionDO)
- .put(LogOperation.GLOBAL_REMOVE, this::deleteGlobalTransactionDO)
- .build();
- }
- }
-
- public void initBranchMap() {
- if (CollectionUtils.isEmpty(branchMap)) {
- branchMap = ImmutableMap.<LogOperation, Function<BranchTransactionDO, Boolean>>builder()
- .put(LogOperation.BRANCH_ADD, this::insertBranchTransactionDO)
- .put(LogOperation.BRANCH_UPDATE, this::updateBranchTransactionDO)
- .put(LogOperation.BRANCH_REMOVE, this::deleteBranchTransactionDO)
- .build();
- }
- }
-
- //Insert the global transaction.
- private boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
- String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId());
- try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {
- Date now = new Date();
- globalTransactionDO.setGmtCreate(now);
- globalTransactionDO.setGmtModified(now);
- pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO));
- pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), globalTransactionDO.getXid());
- pipelined.sync();
- return true;
- } catch (Exception ex) {
- throw new RedisException(ex);
- }
- }
-
- //Insert branch transaction
- private boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {
- String branchKey = buildBranchKey(branchTransactionDO.getBranchId());
- String branchListKey = buildBranchListKeyByXid(branchTransactionDO.getXid());
- try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {
- Date now = new Date();
- branchTransactionDO.setGmtCreate(now);
- branchTransactionDO.setGmtModified(now);
- pipelined.hmset(branchKey, BeanUtils.objectToMap(branchTransactionDO));
- pipelined.rpush(branchListKey, branchKey);
- pipelined.sync();
- return true;
- } catch (Exception ex) {
- throw new RedisException(ex);
- }
- }
-
- @Override
- public boolean writeSession(LogOperation logOperation, SessionStorable session) {
- if (globalMap.containsKey(logOperation) || branchMap.containsKey(logOperation)) {
- return globalMap.containsKey(logOperation) ?
- globalMap.get(logOperation).apply(SessionConverter.convertGlobalTransactionDO(session)) :
- branchMap.get(logOperation).apply(SessionConverter.convertBranchTransactionDO(session));
- } else {
- throw new StoreException("Unknown LogOperation:" + logOperation.name());
- }
- }
- ...
- }
- public class SessionConverter {
- ...
- public static GlobalTransactionDO convertGlobalTransactionDO(SessionStorable session) {
- if (session == null || !(session instanceof GlobalSession)) {
- throw new IllegalArgumentException("The parameter of SessionStorable is not available, SessionStorable:" + StringUtils.toString(session));
- }
- GlobalSession globalSession = (GlobalSession)session;
- GlobalTransactionDO globalTransactionDO = new GlobalTransactionDO();
- globalTransactionDO.setXid(globalSession.getXid());
- globalTransactionDO.setStatus(globalSession.getStatus().getCode());
- globalTransactionDO.setApplicationId(globalSession.getApplicationId());
- globalTransactionDO.setBeginTime(globalSession.getBeginTime());
- globalTransactionDO.setTimeout(globalSession.getTimeout());
- globalTransactionDO.setTransactionId(globalSession.getTransactionId());
- globalTransactionDO.setTransactionName(globalSession.getTransactionName());
- globalTransactionDO.setTransactionServiceGroup(globalSession.getTransactionServiceGroup());
- globalTransactionDO.setApplicationData(globalSession.getApplicationData());
- return globalTransactionDO;
- }
-
- public static BranchTransactionDO convertBranchTransactionDO(SessionStorable session) {
- if (session == null || !(session instanceof BranchSession)) {
- throw new IllegalArgumentException("The parameter of SessionStorable is not available, SessionStorable:" + StringUtils.toString(session));
- }
- BranchSession branchSession = (BranchSession)session;
- BranchTransactionDO branchTransactionDO = new BranchTransactionDO();
- branchTransactionDO.setXid(branchSession.getXid());
- branchTransactionDO.setBranchId(branchSession.getBranchId());
- branchTransactionDO.setBranchType(branchSession.getBranchType().name());
- branchTransactionDO.setClientId(branchSession.getClientId());
- branchTransactionDO.setResourceGroupId(branchSession.getResourceGroupId());
- branchTransactionDO.setTransactionId(branchSession.getTransactionId());
- branchTransactionDO.setApplicationData(branchSession.getApplicationData());
- branchTransactionDO.setResourceId(branchSession.getResourceId());
- branchTransactionDO.setStatus(branchSession.getStatus().getCode());
- return branchTransactionDO;
- }
- ...
- }
复制代码- -> ServerHandler.channelRead()接收Seata Client发送过来的请求;
- -> AbstractNettyRemoting.processMessage()处理RpcMessage消息;
- -> ServerOnRequestProcessor.process()处理RpcMessage消息;
- -> TransactionMessageHandler.onRequest()处理RpcMessage消息;
- -> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端;
复制代码
6.Client获取Server的响应与处理的源码- public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
- ...
- @ChannelHandler.Sharable
- class ServerHandler extends ChannelDuplexHandler {
- @Override
- public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
- if (!(msg instanceof RpcMessage)) {
- return;
- }
- //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理
- processMessage(ctx, (RpcMessage) msg);
- }
- }
- }
- public abstract class AbstractNettyRemoting implements Disposable {
- ...
- protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
- }
- Object body = rpcMessage.getBody();
- if (body instanceof MessageTypeAware) {
- MessageTypeAware messageTypeAware = (MessageTypeAware) body;
- //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
- //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
- //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
- final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
- if (pair != null) {
- if (pair.getSecond() != null) {
- try {
- pair.getSecond().execute(() -> {
- try {
- pair.getFirst().process(ctx, rpcMessage);
- } catch (Throwable th) {
- LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
- } finally {
- MDC.clear();
- }
- });
- } catch (RejectedExecutionException e) {
- ...
- }
- } else {
- try {
- pair.getFirst().process(ctx, rpcMessage);
- } catch (Throwable th) {
- LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
- }
- }
- } else {
- LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
- }
- } else {
- LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
- }
- }
- ...
- }
- public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {
- private final RemotingServer remotingServer;
- ...
- @Override
- public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
- if (ChannelManager.isRegistered(ctx.channel())) {
- onRequestMessage(ctx, rpcMessage);
- } else {
- try {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
- }
- ctx.disconnect();
- ctx.close();
- } catch (Exception exx) {
- LOGGER.error(exx.getMessage());
- }
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
- }
- }
- }
-
- private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
- Object message = rpcMessage.getBody();
- //RpcContext线程本地变量副本
- RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());
- } else {
- try {
- BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());
- } catch (InterruptedException e) {
- LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);
- }
- }
- if (!(message instanceof AbstractMessage)) {
- return;
- }
- // the batch send request message
- if (message instanceof MergedWarpMessage) {
- ...
- } else {
- // the single send request message
- final AbstractMessage msg = (AbstractMessage) message;
- //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage
- AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
- //返回响应给客户端
- remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
- }
- }
- ...
- }
复制代码- -> TransactionMessageHandler.onRequest()处理RpcMessage消息;
- -> DefaultCoordinator.onRequest()处理RpcMessage消息;
- -> GlobalBeginRequest.handle()处理开启全局事务请求;
- -> AbstractTCInboundHandler.handle()开启全局事务返回全局事务;
- -> DefaultCoordinator.doGlobalBegin()开启全局事务;
- -> DefaultCore.begin()创建全局事务会话并开启;
- -> GlobalSession.createGlobalSession()创建全局事务会话;
- -> GlobalSession.begin()开启全局事务会话;
- -> AbstractSessionManager.onBegin()
- -> AbstractSessionManager.addGlobalSession()
- -> AbstractSessionManager.writeSession()
- -> TransactionStoreManager.writeSession()持久化全局事务会话;
复制代码 由于Seata Client发送开启全局事务的请求给Seata Server时,会通过MessageFuture的get()方法同步等待Seata Server返回响应。所以当Seata Client获取Seata Server的响应并通过complete()方法设置MessageFuture已经完成后,原来同步等待Seata Server响应的线程便会继续往下处理。
即某线程执行CompletableFuture.complete()方法后,执行CompletableFuture.get()方法的线程就不会被阻塞而会被唤醒。- public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
- ...
- @Override
- public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
- if (!(request instanceof AbstractTransactionRequestToTC)) {
- throw new IllegalArgumentException();
- }
- AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
- transactionRequest.setTCInboundHandler(this);
- return transactionRequest.handle(context);
- }
- ...
- }
- public class GlobalBeginRequest extends AbstractTransactionRequestToTC {
- ...
- @Override
- public AbstractTransactionResponse handle(RpcContext rpcContext) {
- return handler.handle(this, rpcContext);
- }
- ...
- }
- public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTCInboundHandler.class);
-
- @Override
- public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
- GlobalBeginResponse response = new GlobalBeginResponse();
- exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
- @Override
- public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
- try {
- //开启全局事务
- doGlobalBegin(request, response, rpcContext);
- } catch (StoreException e) {
- throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e);
- }
- }
- }, request, response);
- return response;
- }
- ...
- }
- public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
- private final DefaultCore core;
- ...
- @Override
- protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException {
- //接下来才真正处理开启全局事务的业务逻辑
- //其中会调用DefaultCore来真正开启一个全局事务,即拿到xid并设置到响应里去
- response.setXid(core.begin(
- rpcContext.getApplicationId(),//应用程序id
- rpcContext.getTransactionServiceGroup(),//事务服务分组
- request.getTransactionName(),//事务名称
- request.getTimeout())//超时时间
- );
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
- }
- }
- ...
- }
- public class DefaultCore implements Core {
- ...
- @Override
- public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
- //创建一个全局事务会话
- GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
- //通过slf4j的MDC把xid放入线程本地变量副本里去
- MDC.put(RootContext.MDC_KEY_XID, session.getXid());
- //添加一个全局事务会话的生命周期监听器
- session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
- //打开Session,其中会对全局事务会话进行持久化
- session.begin();
- //transaction start event,发布会话开启事件
- MetricsPublisher.postSessionDoingEvent(session, false);
- //返回全局事务会话的xid
- return session.getXid();
- }
- ...
- }
- public class GlobalSession implements SessionLifecycle, SessionStorable {
- ...
- public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {
- GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);
- return session;
- }
-
- public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {
- //全局事务id是通过UUIDGenerator来生成的
- this.transactionId = UUIDGenerator.generateUUID();
- this.status = GlobalStatus.Begin;
- this.lazyLoadBranch = lazyLoadBranch;
- if (!lazyLoadBranch) {
- this.branchSessions = new ArrayList<>();
- }
- this.applicationId = applicationId;
- this.transactionServiceGroup = transactionServiceGroup;
- this.transactionName = transactionName;
- this.timeout = timeout;
- //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid
- this.xid = XID.generateXID(transactionId);
- }
-
- @Override
- public void begin() throws TransactionException {
- this.status = GlobalStatus.Begin;
- this.beginTime = System.currentTimeMillis();
- this.active = true;
- for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
- lifecycleListener.onBegin(this);
- }
- }
- ...
- }
- public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
- ...
- @Override
- public void onBegin(GlobalSession globalSession) throws TransactionException {
- addGlobalSession(globalSession);
- }
-
- @Override
- public void addGlobalSession(GlobalSession session) throws TransactionException {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD);
- }
- writeSession(LogOperation.GLOBAL_ADD, session);
- }
-
- private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
- //transactionStoreManager.writeSession()会对全局事务会话进行持久化
- if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
- ...
- }
- }
- ...
- }
复制代码- -> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端;
- -> AbstractNettyRemotingServer.sendAsyncResponse()异步发送响应;
- -> AbstractNettyRemoting.buildResponseMessage()构造包含xid响应;
- -> AbstractNettyRemoting.sendAsync()异步发送响应;
- -> Netty的Channel.writeAndFlush()发送响应给客户端;
复制代码
7.Seata与Dubbo整合的过滤器源码
(1)调用Dubbo过滤器的入口
(2)Seata与Dubbo整合的过滤器
(1)调用Dubbo过滤器的入口- public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {
- ...
- @Override
- public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {
- Channel clientChannel = channel;
- if (!(msg instanceof HeartbeatMessage)) {
- clientChannel = ChannelManager.getSameClientChannel(channel);
- }
- if (clientChannel != null) {
- RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage
- ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE
- : ProtocolConstants.MSGTYPE_RESPONSE);
- super.sendAsync(clientChannel, rpcMsg);
- } else {
- throw new RuntimeException("channel is error.");
- }
- }
- ...
- }
- public abstract class AbstractNettyRemoting implements Disposable {
- ...
- protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) {
- RpcMessage rpcMsg = new RpcMessage();
- rpcMsg.setMessageType(messageType);
- rpcMsg.setCodec(rpcMessage.getCodec()); // same with request
- rpcMsg.setCompressor(rpcMessage.getCompressor());
- rpcMsg.setBody(msg);
- rpcMsg.setId(rpcMessage.getId());
- return rpcMsg;
- }
-
- //rpc async request.
- protected void sendAsync(Channel channel, RpcMessage rpcMessage) {
- channelWritableCheck(channel, rpcMessage.getBody());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
- }
- doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage);
- channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
- if (!future.isSuccess()) {
- destroyChannel(future.channel());
- }
- });
- }
- ...
- }
复制代码- -> ClientHandler.channelRead()接收Seata Server返回的响应;
- -> AbstractNettyRemoting.processMessage()处理RpcMessage消息;
- -> ClientOnResponseProcessor.process()会设置MessageFuture结果;
- -> MessageFuture.setResultMessage()设置MessageFuture结果;
- -> CompletableFuture.complete()唤醒阻塞的线程;
复制代码 (2)Seata与Dubbo整合的过滤器
如果线程本地变量副本里的xid不为null,对应于发起RPC调用的情形。如果线程本地变量副本里的xid为null,则对应于接收RPC调用的情形。
当RootContext的xid不为null时,需要设置RpcContext的xid。当RootContext的xid为null + RpcContext的xid不为null时,需要设置RootContext的xid。- public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
- ...
- @Sharable
- class ClientHandler extends ChannelDuplexHandler {
- @Override
- public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
- if (!(msg instanceof RpcMessage)) {
- return;
- }
- processMessage(ctx, (RpcMessage) msg);
- }
- ...
- }
- ...
- }
- public abstract class AbstractNettyRemoting implements Disposable {
- ...
- protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
- }
- Object body = rpcMessage.getBody();
- if (body instanceof MessageTypeAware) {
- MessageTypeAware messageTypeAware = (MessageTypeAware) body;
- //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的
- //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的
- //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理
- final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
- if (pair != null) {
- if (pair.getSecond() != null) {
- try {
- pair.getSecond().execute(() -> {
- try {
- pair.getFirst().process(ctx, rpcMessage);
- } catch (Throwable th) {
- LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
- } finally {
- MDC.clear();
- }
- });
- } catch (RejectedExecutionException e) {
- ...
- }
- } else {
- try {
- pair.getFirst().process(ctx, rpcMessage);
- } catch (Throwable th) {
- LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
- }
- }
- } else {
- LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
- }
- } else {
- LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
- }
- }
- ...
- }
- public class ClientOnResponseProcessor implements RemotingProcessor {
- ...
- @Override
- public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
- if (rpcMessage.getBody() instanceof MergeResultMessage) {
- ...
- } else if (rpcMessage.getBody() instanceof BatchResultMessage) {
- ...
- } else {
- //这里是对普通消息的处理
- MessageFuture messageFuture = futures.remove(rpcMessage.getId());
- if (messageFuture != null) {
- messageFuture.setResultMessage(rpcMessage.getBody());
- } else {
- if (rpcMessage.getBody() instanceof AbstractResultMessage) {
- if (transactionMessageHandler != null) {
- transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
- }
- }
- }
- }
- }
- ...
- }
- public class MessageFuture {
- private transient CompletableFuture<Object> origin = new CompletableFuture<>();
- ...
- //Sets result message.
- public void setResultMessage(Object obj) {
- origin.complete(obj);
- }
- ...
- }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |