事务增强流程:
bean实例化之前会调用InfrastructureAdvisorAutoProxyCreator的父类AbstractAutoProxyCreator中实现的方法:postProcessAfterInitialization(@Nullable Object bean, String beanName)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Create a proxy with the configured interceptors if the bean is
* identified as one to proxy by the subclass.
* @see #getAdvicesAndAdvisorsForBean
*/
@Override
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
Object cacheKey = getCacheKey(bean.getClass(), beanName);
if (this.earlyProxyReferences.remove(cacheKey) != bean) {
return wrapIfNecessary(bean, beanName, cacheKey);
}
}
return bean;
}

之后会走到增强的主要流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Wrap the given bean if necessary, i.e. if it is eligible for being proxied.
* @param bean the raw bean instance
* @param beanName the name of the bean
* @param cacheKey the cache key for metadata access
* @return a proxy wrapping the bean, or the raw bean instance as-is
*/
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
return bean;
}
if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
return bean;
}
if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}

// Create proxy if we have advice.
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {
this.advisedBeans.put(cacheKey, Boolean.TRUE);
Object proxy = createProxy(
bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
return proxy;
}

this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}

这些步骤在分析AOP的时候都了解过,所以不在这赘述。
这里获取到BeanFactoryTransactionAttributeSourceAdvisor增强器,之后带入到createProxy方法中创建代理对象:
这里会分为JDK动态代理或者是CGLIB,从JDK动态代理JdkDynamicAopProxy的invoke方法看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

/**
* Implementation of {@code InvocationHandler.invoke}.
* <p>Callers will see exactly the exception thrown by the target,
* unless a hook method throws an exception.
*/
@Override
@Nullable
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
。。。。

// Get the interception chain for this method.
// 1.看这里!!!!!!
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

// Check whether we have any advice. If we don't, we can fallback on direct
// reflective invocation of the target, and avoid creating a MethodInvocation.
// 2.看这里!!!!!!
if (chain.isEmpty()) {
// We can skip creating a MethodInvocation: just invoke the target directly
// Note that the final invoker must be an InvokerInterceptor so we know it does
// nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
}
else {
// We need to create a method invocation...
// 3.看这里!!!!!!
MethodInvocation invocation =
new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
// Proceed to the joinpoint through the interceptor chain.
// 4.看这里!!!!!!
retVal = invocation.proceed();
}

。。。。。
}

省略了一堆代码,这里会获取有没有拦截器,如果有的话创建ReflectiveMethodInvocation:
类继承关系图:
ReflectiveMethodInvocaion.png

它实现了Joinpoint接口的proceed()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
@Nullable
public Object proceed() throws Throwable {
// We start with an index of -1 and increment early.
// 执行完所有增强后执行切点方法
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}

// 获取下一个要执行的拦截器
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
// Evaluate dynamic method matcher here: static part will already have
// been evaluated and found to match.
InterceptorAndDynamicMethodMatcher dm =
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
//匹配
if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
return dm.interceptor.invoke(this);
}
else {
// Dynamic matching failed.
// Skip this interceptor and invoke the next in the chain.
return proceed();
}
}
else {
// It's an interceptor, so we just invoke it: The pointcut will have
// been evaluated statically before this object was constructed.
// 看这里!!!!!!
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
}

这个方法以前也分析过,因为这里的interceptorOrInterceptionAdvice是TransactionInterceptor,会执行它的invoke方法:

1
2
3
4
5
6
7
8
9
10
11
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

会调用其父类TransactionAspectSupport中的invokeWithinTransaction方法,这就是本文的重点,在方法之前先贴一个PlatformTransactionManager事务管理器的继承关系图:
PlateformTransaction.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/**
* 基于循环建议的子类的常规委托,委托给该类上的其他几个模板方法。
* 能够处理{@link CallbackPreferringPlatformTransactionManager}以及常规的{@link PlatformTransactionManager}实现。
* @param method 被调用的方法
* @param targetClass 我们调用方法的目标类
* @param invocation 用于处理目标调用的回调
* @return 方法的返回值(如果有的话)
* @throws Throwable 从目标调用传播
*/
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

// 如果事务属性为null,则该方法是非事务性的。
// 返回事务属性source。
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取@Transactional定义的事务属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器,如果事务属性为null,则不尝试获取
// 这里假设获取的是org.springframework.jdbc.datasource.DataSourceTransactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

// 声明式事务
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 使用getTransaction和commit/rollback调用的标准事务界定。
// 创建链接并绑定线程,重点
// 事务开启操作
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 执行下一个拦截器或者调用目标类目标方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
//事务回滚操作
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//使用堆栈恢复旧事务TransactionInfo。如果没有设置,则为空。
cleanupTransactionInfo(txInfo);
}
//事务提交操作
commitTransactionAfterReturning(txInfo);
return retVal;
}

// 编程式事务
else {
final ThrowableHolder throwableHolder = new ThrowableHolder();

// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
//开启事务
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
//执行目标方法
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
//回滚操作
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});

// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}

在这主要看声明式事务的三个重要流程方法:

  1. createTransactionIfNecessary
  2. completeTransactionAfterThrowing
  3. commitTransactionAfterReturning

createTransactionIfNecessary 事务开启

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/**
* 如果需要,根据给定的TransactionAttribute创建事务。
* 允许调用者通过TransactionAttributeSource执行定制的TransactionAttribute查找。
* @param txAttr TransactionAttribute(可能是{@code null})
* @param joinpointIdentification 完全限定的方法名称(用于监视和日志记录)
* @return 一个TransactionInfo对象,不管是否创建了事务。
* TransactionInfo上的{@code hasTransaction()}方法可用于判断是否创建了事务。
* @see #getTransactionAttributeSource()
*/
@SuppressWarnings("serial")
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

// 如果没有指定名称,则应用方法标识作为事务名称。
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 根据指定的传播行为返回当前活动的事务或创建新的事务。
// 这里会涉及获取数据源和spring的事务传播行为,后面单独分析
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 为给定的属性和状态对象准备一个TransactionInfo。
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
/**
* 为给定的属性和状态对象准备一个TransactionInfo。
* @param txAttr the TransactionAttribute (may be {@code null})
* @param joinpointIdentification the fully qualified method name
* (used for monitoring and logging purposes)
* @param status the TransactionStatus for the current transaction
* @return the prepared TransactionInfo object
*/
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {

TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 如果已经存在不兼容的tx,事务管理器将标记错误。
txInfo.newTransactionStatus(status);
}
else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}

// 我们总是将TransactionInfo绑定到线程,即使我们没有在这里创建一个新的事务。
// 这保证了即使这个方面没有创建任何事务,也能正确地管理TransactionInfo堆栈。
txInfo.bindToThread();
return txInfo;
}
private void bindToThread() {
// 公开当前TransactionStatus,保存任何现有的TransactionStatus,
// 以便在此事务完成后恢复。
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}

这里面有个复杂但是又很重要的方法:tm.getTransaction(txAttr),打算用一篇文章单独分析。

completeTransactionAfterThrowing 回滚事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 处理一个throwable,完成事务。
* 我们可以提交或回滚,这取决于配置。
*
* @param txInfo 关于当前事务的信息
* @param ex 异常
*/
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
//应该回滚到给定的异常吗?
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
//事务管理器的回滚操作
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
// 我们不回滚这个异常。
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
//如果TransactionStatus.isRollbackOnly()为true,则仍然回滚。
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}

这里面也涉及到事务管理器的回滚和提交方法,后期分析。

commitTransactionAfterReturning 提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 在成功完成调用后执行,而不是在处理异常之后执行。
* 如果不创建事务,则什么也不做。
*
* @param txInfo information about the current transaction
*/
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
//调用事务管理器的提交操作
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

这里会调用事务管理器的提交操作。

以上将整个spring的TransactionInterceptor事务的增强操作流程简单的分析了一下,下面看一看事务管理器PlatformTransactionManager的接口定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

/**
* 这是Spring事务基础结构中的中心接口。
* 通常:可以使用TransactionTemplate来实现编程式事务,
* 或者直接用AOP的能力实现声明式事务。
* <p>
* 对于实现者,建议从提供的{@link org.springframework.transaction.support.AbstractPlatformTransactionManager}类继承,
* 它预先实现定义的传播行为,并负责事务同步处理。
* 子类必须为基础事务的特定状态实现模板方法,
* 例如:begin、suspend、resume、commit。
* <p>
* 这个策略接口的默认实现是{@link org.springframework.transaction.jta.JtaTransactionManager}
* 和{@link org.springframework.jdbc.datasource.DataSourceTransactionManager},
* 它可以作为其他事务策略的实现指南。
*
* @author Rod Johnson
* @author Juergen Hoeller
* @see org.springframework.transaction.support.TransactionTemplate
* @see org.springframework.transaction.interceptor.TransactionInterceptor
* @since 16.05.2003
*/
public interface PlatformTransactionManager {

/**
* 根据指定的传播行为返回当前活动的事务或创建新的事务。
* 注意,隔离级别或超时等参数只应用于新事务,因此在参与活动事务时将被忽略。
* 此外,并非每个事务管理器都支持所有事务定义设置:当遇到不支持的设置时,正确的事务管理器实现应该抛出异常。
* 上面规则的一个例外是只读标志,如果不支持显式只读模式,应该忽略它。本质上,只读标志只是潜在优化的一个提示。
*
* @param definition TransactionDefinition实例(默认值可以是{@code null}),描述传播行为、隔离级别、超时等。
* 这些设置来源于@Transactional注解的属性
* @return 表示新事务或当前事务的事务状态对象
* @throws TransactionException 在查找、创建或系统错误的情况下
* @throws IllegalTransactionStateException 如果无法执行给定的事务定义(例如,如果当前活动的事务与指定的传播行为冲突)
* @see TransactionDefinition#getPropagationBehavior
* @see TransactionDefinition#getIsolationLevel
* @see TransactionDefinition#getTimeout
* @see TransactionDefinition#isReadOnly
*/
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;

/**
* 提交给定事务的状态。如果事务仅以编程方式标记回滚,则执行回滚。
* 如果事务不是新事务,则忽略提交以正确地参与周围的事务。
* 如果前一个事务已被挂起,以便能够创建一个新事务,则在提交新事务之后恢复前一个事务。
* <p>
* 注意,当提交调用完成时,无论是否正常或抛出异常,事务都必须完全完成并清除。
* 在这种情况下不应该期望回滚调用。
* <p>
* 如果此方法抛出的异常不是TransactionException,则提交前的一些错误导致提交尝试失败。
* 例如,O/R映射工具可能在提交之前尝试刷新对数据库的更改,结果是DataAccessException导致事务失败。
* 在这种情况下,原始异常将传播到此提交方法的调用者。
*
* @param status 对象,该对象由{@code getTransaction}方法返回
* @throws UnexpectedRollbackException 如果事务协调器发起了意外回滚
* @throws HeuristicCompletionException 在事务协调器一方的启发式决策导致事务失败的情况下
* @throws TransactionSystemException 在提交或系统错误(通常由基本资源故障引起)的情况下
* @throws IllegalTransactionStateException 如果给定的事务已经完成(即提交或回滚)
* @see TransactionStatus#setRollbackOnly
*/
void commit(TransactionStatus status) throws TransactionException;

/**
* 执行给定事务的回滚。
* <p>如果事务不是新事务,只需将其回滚设置为只用于正确参与周围事务。
* 如果前一个事务已被挂起,以便能够创建一个新事务,则在回滚新事务之后恢复前一个事务。
* <p>如果提交引发异常,则不要调用事务的回滚。
* 即使在提交异常的情况下,事务也将在提交返回时完成并清理。
* 因此,commit失败后的回滚调用将导致IllegalTransactionStateException.
*
* @param status 对象,该对象由{@code getTransaction}方法返回
* @throws TransactionSystemException 方法返回的对象,以防回滚或系统错误(通常由基本资源故障引起)
* @throws IllegalTransactionStateException 如果给定的事务已经完成(即提交或回滚)
*/
void rollback(TransactionStatus status) throws TransactionException;

}

即Spring的事务都交给相应的事务管理器去处理了,spring只起到调节作用。

tencent.jpg