在微服务架构中,会存在服务间的调用。如果一个服务的一个方法开启了全局事务,该方法如果涉及到远程调用,那么seata需要把全局事务传播到被调用方,使得被调用方也加入到全局事务中。这就是seata的全局事务传播机制,我们本篇文章将对其进行研究。

xid的传递

seata的全局事务传播机制,关键在于xid的传递,如果是http调用,seata将会把xid塞入请求头中。

在这里插入图片描述

调用方把xid塞入请求头,被调用方从请求头中接收到xid,然后放入RootContext中。

在这里插入图片描述

xid被保存到RootContext时,其实就是保存到ThreadLocal中,那么当前线程就与该xid绑定了。

然后被调用方在执行sql并提交的时候,会检查当前线程是否绑定了xid。如果是,则加入到全局事务中,然后提交sql;如果不是,则直接提交sql。

在这里插入图片描述

源码解析

TransactionPropagationInterceptor(通过SpringMVC拦截器从请求头获取xid绑定到ThreadLocal)

Seata通过一个SpringMVC拦截器TransactionPropagationInterceptor从请求头中接收xid,然后将xid绑定到RootContext中

首先看看TransactionPropagationInterceptor是如何被引入进来的。

seata-spring-boot-starter\src\main\resources\META-INF\spring.factories:

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
# HttpAutoConfiguration引入的TransactionPropagationInterceptor
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration

在META-INF\spring.factories指定了HttpAutoConfiguration自动配置类,HttpAutoConfiguration中会引入TransactionPropagationInterceptor。

@Configuration
@ConditionalOnWebApplication
public class HttpAutoConfiguration extends WebMvcConfigurerAdapter {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
    	// 注册拦截器TransactionPropagationInterceptor
        registry.addInterceptor(new TransactionPropagationInterceptor());
    }

}

在这里插入图片描述

public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String xid = RootContext.getXID();
        // 从请求头取出xid
        String rpcXid = request.getHeader(RootContext.KEY_XID);
        
        if (xid == null && rpcXid != null) {
        	// 绑定xid到RootContext
            RootContext.bind(rpcXid);
        }

        return true;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
        ModelAndView modelAndView) {
        if (RootContext.inGlobalTransaction()) {
        	// 从RootContext中解绑xid
            XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
        }
    }

}

TransactionPropagationInterceptor继承了HandlerInterceptorAdapter,是一个SpringMVC拦截器。

在prepreHandle方法中从请求头取出xid,绑定到RootContext。而postHandle方法调用XidResource的cleanXid方法,里面会从RootContext中解绑xid。

在这里插入图片描述

RootContext.bind(rpcXid) 绑定xid到ThreadLocal
    public static void bind(@Nonnull String xid) {
    	// 调用ContextCore的put方法
        CONTEXT_HOLDER.put(KEY_XID, xid);
    }

在这里插入图片描述

public class ThreadLocalContextCore implements ContextCore {

    private ThreadLocal<Map<String, Object>> threadLocal = ThreadLocal.withInitial(HashMap::new);

    @Override
    public Object put(String key, Object value) {
        return threadLocal.get().put(key, value);
    }

}    

所以RootContext.bind(rpcXid)其实就是把xid绑定到ThreadLocal中。

在这里插入图片描述

XidResource.cleanXid(request.getHeader(RootContext.KEY_XID)) 从ThreadLocal解绑xid
    public static void cleanXid(String rpcXid) {
        String xid = RootContext.getXID();
        if (xid != null) {
        	// 从ThreadLocal中解绑xid
            String unbindXid = RootContext.unbind();
            // 解绑出来的xid与传进来的xid不一样,塞回去
            if (!StringUtils.equalsIgnoreCase(rpcXid, unbindXid)) {
                RootContext.bind(unbindXid);
            }
        }
    }

RootContext#unbind()

    public static String unbind() {
        String xid = (String) CONTEXT_HOLDER.remove(KEY_XID);
        return xid;
    }

ThreadLocalContextCore#remove(String key)

    @Override
    public Object remove(String key) {
        return threadLocal.get().remove(key);
    }

XidResource的cleanXid就是调用RootContext的unbind方法从ThreadLocal中解绑xid。

在这里插入图片描述

SeataRestTemplateInterceptor 通过RestTemplate拦截器添加xid到请求头

Seata在调用方使用RestTemplate发起http请求时,会把xid添加到请求头中,这个工作是由SeataRestTemplateInterceptor完成的,SeataRestTemplateInterceptor是一个RestTemplate拦截器。属性Ribbon的都知道,Ribbon就是使用了RestTemplate拦截器触发客户端负载均衡。

spring-cloud-starter-alibaba-seata\src\main\resources\META-INF\spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
# 这里引入了SeataRestTemplateInterceptor
com.alibaba.cloud.seata.rest.SeataRestTemplateAutoConfiguration,\
com.alibaba.cloud.seata.web.SeataHandlerInterceptorConfiguration,\
com.alibaba.cloud.seata.feign.SeataFeignClientAutoConfiguration,\
com.alibaba.cloud.seata.feign.hystrix.SeataHystrixAutoConfiguration
@Configuration(proxyBeanMethods = false)
public class SeataRestTemplateAutoConfiguration {

	@Bean
	public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
		return new SeataRestTemplateInterceptor();
	}

	@Autowired(required = false)
	private Collection<RestTemplate> restTemplates;

	@Autowired
	private SeataRestTemplateInterceptor seataRestTemplateInterceptor;

	@PostConstruct
	public void init() {
		if (this.restTemplates != null) {
			for (RestTemplate restTemplate : restTemplates) {
				// 取得RestTemplate的拦截器链
				List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(
						restTemplate.getInterceptors());
				// 往拦截器链中添加seataRestTemplateInterceptor
				interceptors.add(this.seataRestTemplateInterceptor);
				// 拦截器链重新保存到RestTemplate中
				restTemplate.setInterceptors(interceptors);
			}
		}
	}

}

在这里插入图片描述

	@Override
	public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
			ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
		HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
		
		// 从RootContext中取得xid
		String xid = RootContext.getXID();

		if (!StringUtils.isEmpty(xid)) {
			// xid放入请求头中
			requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
		}
		return clientHttpRequestExecution.execute(requestWrapper, bytes);
	}

在这里插入图片描述

目前整体流程图:

在这里插入图片描述

StatementProxy#execute(String sql)

当我们的工程引入了seata,我们的数据源就会被代理。上面已经通过拦截器从请求头中取得了xid,并保存到RootContext中。StatementProxy会从RootContext中取得xid绑定到ConnectionProxy

    @Override
    public boolean execute(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
    }

ExecuteTemplate.execute(…)

    public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        T rs;
        try {
            rs = executor.execute(args);
        } catch () {}
        return rs;
    }
    @Override
    public T execute(Object... args) throws Throwable {
    	// 从RootContext中获取xid
        String xid = RootContext.getXID();
        if (xid != null) {
        	// 绑定xid到ConnectionProxy中
            statementProxy.getConnectionProxy().bind(xid);
        }
        return doExecute(args);
    }

可以看到从RootContext中获取xid,然后绑定到ConnectionProxy中

在这里插入图片描述

ConnectionProxy#bind(String xid)

    public void bind(String xid) {
    	// 保存到ConnectionContext中
        context.bind(xid);
    }

在这里插入图片描述

ConnectionProxy#commit()

由于Connection对象也被代理了,因此当提交本地事务的时候,执行的其实是ConnectionProxy的commit方法。

commit方法会执行doCommit()方法。

    private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
        	// ConnectionProxy绑定了xid,会进入这里
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

ConnectionContext#inGlobalTransaction()

    public boolean inGlobalTransaction() {
        return xid != null;
    }

context.inGlobalTransaction()其实就是判断xid是否不为空,由于上面已经绑定到了ConnectionContext中,因此这里的xid肯定不为空。

在这里插入图片描述

processGlobalTransactionCommit()里面就会向TC发起分支事务注册,一旦注册成功,就等于是加入了全局事务

整体流程图:

在这里插入图片描述

如果被调用方也有@GlobalTransaction注解修饰,会重复开启全局事务吗?

首先说结论:不会,Seata有它的判断机制,不会重复开启全局事务

Seata在开启全局事务的地方DefaultGlobalTransaction#begin(int timeout, String name)

    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
        	// 如果role不是Launcher就return,不会开启全局事务         
            return;
        }
        // 开启全局事务的处理
    }

如果DefaultGlobalTransaction中的role字段不是Launcher,就不会开启全局事务。

在这里插入图片描述

什么情况下role是Launcher,什么情况下不是呢?

我们看回TransactionalTemplate的execute方法,有一行这样的代码:

GlobalTransaction tx = GlobalTransactionContext.getCurrent();

GlobalTransactionContext#getCurrent()

    public static GlobalTransaction getCurrent() {
        String xid = RootContext.getXID();
        if (xid == null) {
            return null;
        }
        // RootContext存在xid,创建一个DefaultGlobalTransaction,但是role字段指定为Participant(非Launcher)
        return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
    }

因为TransactionPropagationInterceptor做了拦截,把请求头中的xid放入到了RootContext。所以这里检查到xid不为空,就创建了一个role字段非Launcher的DefaultGlobalTransaction。

那么这个DefaultGlobalTransaction就不会开启全局事务

在这里插入图片描述

如果xid为空,这里就会返回空。然后TransactionalTemplate的execute方法中在上面这行代码的后面,还有一行这样的代码:

            if (tx == null) {
                tx = GlobalTransactionContext.createNew();
            }

如果上面返回了null,那么这里就调用GlobalTransactionContext.createNew()创建DefaultGlobalTransaction

    public static GlobalTransaction createNew() {
    	// 调用无参构造方法创建
        return new DefaultGlobalTransaction();
    }
    DefaultGlobalTransaction() {
    	// 指定了role为Launcher
        this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
    }

此时的DefaultGlobalTransaction就指定了role为Launcher,那么它就会去开启全局事务。

在这里插入图片描述

Logo

中国智能体开发者社区,聚焦智能体与大模型开发,提供前沿资讯、实用工具链、开源项目及行业案例。通过技术沙龙、开发者大赛等活动,促进经验交流与协作,助力开发者快速构建创新智能应用。

更多推荐