【深入理解SpringCloud微服务】Seata(AT模式)源码解析——全局事务的事务传播机制
本文解析了Seata AT模式中全局事务传播机制的实现原理。核心在于通过xid传递实现事务传播:调用方通过RestTemplate拦截器SeataRestTemplateInterceptor将xid添加到HTTP请求头;被调用方通过SpringMVC拦截器TransactionPropagationInterceptor从请求头获取xid并绑定到ThreadLocal。xid存储于RootCon
Seata(AT模式)源码解析——全局事务的事务传播机制
在微服务架构中,会存在服务间的调用。如果一个服务的一个方法开启了全局事务,该方法如果涉及到远程调用,那么seata需要把全局事务传播到被调用方,使得被调用方也加入到全局事务中。这就是seata的全局事务传播机制,我们本篇文章将对其进行研究。
xid的传递
seata的全局事务传播机制,关键在于xid的传递,如果是http调用,seata将会把xid塞入请求头中。

调用方把xid塞入请求头,被调用方从请求头中接收到xid,然后放入RootContext中。

xid被保存到RootContext时,其实就是保存到ThreadLocal中,那么当前线程就与该xid绑定了。
然后被调用方在执行sql并提交的时候,会检查当前线程是否绑定了xid。如果是,则加入到全局事务中,然后提交sql;如果不是,则直接提交sql。

源码解析
TransactionPropagationInterceptor(通过SpringMVC拦截器从请求头获取xid绑定到ThreadLocal)
Seata通过一个SpringMVC拦截器TransactionPropagationInterceptor从请求头中接收xid,然后将xid绑定到RootContext中。
首先看看TransactionPropagationInterceptor是如何被引入进来的。
seata-spring-boot-starter\src\main\resources\META-INF\spring.factories:
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
# HttpAutoConfiguration引入的TransactionPropagationInterceptor
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration
在META-INF\spring.factories指定了HttpAutoConfiguration自动配置类,HttpAutoConfiguration中会引入TransactionPropagationInterceptor。
@Configuration
@ConditionalOnWebApplication
public class HttpAutoConfiguration extends WebMvcConfigurerAdapter {
@Override
public void addInterceptors(InterceptorRegistry registry) {
// 注册拦截器TransactionPropagationInterceptor
registry.addInterceptor(new TransactionPropagationInterceptor());
}
}

public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String xid = RootContext.getXID();
// 从请求头取出xid
String rpcXid = request.getHeader(RootContext.KEY_XID);
if (xid == null && rpcXid != null) {
// 绑定xid到RootContext
RootContext.bind(rpcXid);
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
ModelAndView modelAndView) {
if (RootContext.inGlobalTransaction()) {
// 从RootContext中解绑xid
XidResource.cleanXid(request.getHeader(RootContext.KEY_XID));
}
}
}
TransactionPropagationInterceptor继承了HandlerInterceptorAdapter,是一个SpringMVC拦截器。
在prepreHandle方法中从请求头取出xid,绑定到RootContext。而postHandle方法调用XidResource的cleanXid方法,里面会从RootContext中解绑xid。

RootContext.bind(rpcXid) 绑定xid到ThreadLocal
public static void bind(@Nonnull String xid) {
// 调用ContextCore的put方法
CONTEXT_HOLDER.put(KEY_XID, xid);
}

public class ThreadLocalContextCore implements ContextCore {
private ThreadLocal<Map<String, Object>> threadLocal = ThreadLocal.withInitial(HashMap::new);
@Override
public Object put(String key, Object value) {
return threadLocal.get().put(key, value);
}
}
所以RootContext.bind(rpcXid)其实就是把xid绑定到ThreadLocal中。

XidResource.cleanXid(request.getHeader(RootContext.KEY_XID)) 从ThreadLocal解绑xid
public static void cleanXid(String rpcXid) {
String xid = RootContext.getXID();
if (xid != null) {
// 从ThreadLocal中解绑xid
String unbindXid = RootContext.unbind();
// 解绑出来的xid与传进来的xid不一样,塞回去
if (!StringUtils.equalsIgnoreCase(rpcXid, unbindXid)) {
RootContext.bind(unbindXid);
}
}
}
RootContext#unbind()
public static String unbind() {
String xid = (String) CONTEXT_HOLDER.remove(KEY_XID);
return xid;
}
ThreadLocalContextCore#remove(String key)
@Override
public Object remove(String key) {
return threadLocal.get().remove(key);
}
XidResource的cleanXid就是调用RootContext的unbind方法从ThreadLocal中解绑xid。

SeataRestTemplateInterceptor 通过RestTemplate拦截器添加xid到请求头
Seata在调用方使用RestTemplate发起http请求时,会把xid添加到请求头中,这个工作是由SeataRestTemplateInterceptor完成的,SeataRestTemplateInterceptor是一个RestTemplate拦截器。属性Ribbon的都知道,Ribbon就是使用了RestTemplate拦截器触发客户端负载均衡。
spring-cloud-starter-alibaba-seata\src\main\resources\META-INF\spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
# 这里引入了SeataRestTemplateInterceptor
com.alibaba.cloud.seata.rest.SeataRestTemplateAutoConfiguration,\
com.alibaba.cloud.seata.web.SeataHandlerInterceptorConfiguration,\
com.alibaba.cloud.seata.feign.SeataFeignClientAutoConfiguration,\
com.alibaba.cloud.seata.feign.hystrix.SeataHystrixAutoConfiguration
@Configuration(proxyBeanMethods = false)
public class SeataRestTemplateAutoConfiguration {
@Bean
public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
return new SeataRestTemplateInterceptor();
}
@Autowired(required = false)
private Collection<RestTemplate> restTemplates;
@Autowired
private SeataRestTemplateInterceptor seataRestTemplateInterceptor;
@PostConstruct
public void init() {
if (this.restTemplates != null) {
for (RestTemplate restTemplate : restTemplates) {
// 取得RestTemplate的拦截器链
List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(
restTemplate.getInterceptors());
// 往拦截器链中添加seataRestTemplateInterceptor
interceptors.add(this.seataRestTemplateInterceptor);
// 拦截器链重新保存到RestTemplate中
restTemplate.setInterceptors(interceptors);
}
}
}
}

@Override
public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
// 从RootContext中取得xid
String xid = RootContext.getXID();
if (!StringUtils.isEmpty(xid)) {
// xid放入请求头中
requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
}
return clientHttpRequestExecution.execute(requestWrapper, bytes);
}

目前整体流程图:

StatementProxy#execute(String sql)
当我们的工程引入了seata,我们的数据源就会被代理。上面已经通过拦截器从请求头中取得了xid,并保存到RootContext中。StatementProxy会从RootContext中取得xid绑定到ConnectionProxy。
@Override
public boolean execute(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
}
ExecuteTemplate.execute(…)
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
T rs;
try {
rs = executor.execute(args);
} catch () {}
return rs;
}
@Override
public T execute(Object... args) throws Throwable {
// 从RootContext中获取xid
String xid = RootContext.getXID();
if (xid != null) {
// 绑定xid到ConnectionProxy中
statementProxy.getConnectionProxy().bind(xid);
}
return doExecute(args);
}
可以看到从RootContext中获取xid,然后绑定到ConnectionProxy中。

ConnectionProxy#bind(String xid)
public void bind(String xid) {
// 保存到ConnectionContext中
context.bind(xid);
}

ConnectionProxy#commit()
由于Connection对象也被代理了,因此当提交本地事务的时候,执行的其实是ConnectionProxy的commit方法。
commit方法会执行doCommit()方法。
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
// ConnectionProxy绑定了xid,会进入这里
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
ConnectionContext#inGlobalTransaction()
public boolean inGlobalTransaction() {
return xid != null;
}
context.inGlobalTransaction()其实就是判断xid是否不为空,由于上面已经绑定到了ConnectionContext中,因此这里的xid肯定不为空。

processGlobalTransactionCommit()里面就会向TC发起分支事务注册,一旦注册成功,就等于是加入了全局事务。
整体流程图:

如果被调用方也有@GlobalTransaction注解修饰,会重复开启全局事务吗?
首先说结论:不会,Seata有它的判断机制,不会重复开启全局事务。
Seata在开启全局事务的地方DefaultGlobalTransaction#begin(int timeout, String name)
@Override
public void begin(int timeout, String name) throws TransactionException {
if (role != GlobalTransactionRole.Launcher) {
// 如果role不是Launcher就return,不会开启全局事务
return;
}
// 开启全局事务的处理
}
如果DefaultGlobalTransaction中的role字段不是Launcher,就不会开启全局事务。

什么情况下role是Launcher,什么情况下不是呢?
我们看回TransactionalTemplate的execute方法,有一行这样的代码:
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
GlobalTransactionContext#getCurrent()
public static GlobalTransaction getCurrent() {
String xid = RootContext.getXID();
if (xid == null) {
return null;
}
// RootContext存在xid,创建一个DefaultGlobalTransaction,但是role字段指定为Participant(非Launcher)
return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
}
因为TransactionPropagationInterceptor做了拦截,把请求头中的xid放入到了RootContext。所以这里检查到xid不为空,就创建了一个role字段非Launcher的DefaultGlobalTransaction。
那么这个DefaultGlobalTransaction就不会开启全局事务。

如果xid为空,这里就会返回空。然后TransactionalTemplate的execute方法中在上面这行代码的后面,还有一行这样的代码:
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}
如果上面返回了null,那么这里就调用GlobalTransactionContext.createNew()创建DefaultGlobalTransaction。
public static GlobalTransaction createNew() {
// 调用无参构造方法创建
return new DefaultGlobalTransaction();
}
DefaultGlobalTransaction() {
// 指定了role为Launcher
this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
}
此时的DefaultGlobalTransaction就指定了role为Launcher,那么它就会去开启全局事务。

更多推荐
所有评论(0)