在我的上一篇文章中已经比较详细的对 Spring 的 AOP 实现原理进行了说明,而 Spring 的事务实现原理呢又跟 Spring 的 AOP 实现有着较大的关系,所以这里选择成热打铁,对 Spring 的事务实现机制进行一下说明。上一篇文章在对 AOP 实现进行解释时,只分析了它的 cglib 代理实现,因为提供的例子中被代理类没有实现相应的接口,导致不能通过 jdk 原生方式进行代理,而本文将要解释的 Spring 事务实现中,例子是实现了用户服务接口的,所以代理方式是采用的 jdk 原生方式,正好能够对上文进行补充。同样的示例代码我已经提交到我的 github 上,欢迎自取。在阅读本文之前,十分的建议先看我的上一篇 Spring AOP 实现原理文章,因为分析 Spring 事务实现原理的方式和分析 Spring AOP 实现原理的方式十分相似,如果对上一篇文章理解得好,Spring 的事务实现完全可以按照同样的方式来进行分析。
示例程序
实例程序很简单,已经同步到上文 github 链接上了,在 spring-example 模块的 top.aprilyolies.example.tx 包之下,执行前还需要先在数据库中创建数据表,sql 文件在当前模块下的 db 文件夹下。
try { if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) { // 对于 equals 方法的处理 // The target does not implement the equals(Object) method itself. return equals(args[0]); } elseif (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) { // 对于 hashcode 方法的处理 // The target does not implement the hashCode() method itself. return hashCode(); } elseif (method.getDeclaringClass() == DecoratingProxy.class) { // 对于 DecoratingProxy 接口方法的处理 // There is only getDecoratedClass() declared -> dispatch to proxy config. return AopProxyUtils.ultimateTargetClass(this.advised); } elseif (!this.advised.opaque && method.getDeclaringClass().isInterface() && // 对于 Advised 接口实现类方法的处理 method.getDeclaringClass().isAssignableFrom(Advised.class)) { // Service invocations on ProxyConfig with the proxy config... return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args); }
Object retVal;
if (this.advised.exposeProxy) { // 看是否需要暴露代理类 // Make invocation available if necessary. oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true; }
// Get as late as possible to minimize the time we "own" the target, // in case it comes from a pool. target = targetSource.getTarget(); // 获取目标实例 Class<?> targetClass = (target != null ? target.getClass() : null); // 获取目标实例类 // 尝试从缓存中获取 interceptorList,没有的话就从当前实例中获取(获取 Advised 的全部 advisors,看 advisor 是否适配当前方法,适配的话从 advisor 中获取到 Advice,然后将其添加到 interceptorList 集合返回) // Get the interception chain for this method. List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
// Check whether we have any advice. If we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a MethodInvocation. if (chain.isEmpty()) { // We can skip creating a MethodInvocation: just invoke the target directly // Note that the final invoker must be an InvokerInterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { // We need to create a method invocation... MethodInvocationinvocation=// 构建 ReflectiveMethodInvocation 来控制 interceptor 链的调用 newReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain); // Proceed to the joinpoint through the interceptor chain. retVal = invocation.proceed(); }
// Massage return value if necessary. Class<?> returnType = method.getReturnType(); if (retVal != null && retVal == target && returnType != Object.class && returnType.isInstance(proxy) && !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) { // Special case: it returned "this" and the return type of the method // is type-compatible. Note that we can't help if the target sets // a reference to itself in another returned object. retVal = proxy; } elseif (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) { thrownewAopInvocationException( "Null return value from advice does not match primitive return type for: " + method); } return retVal; } finally { if (target != null && !targetSource.isStatic()) { // Must have come from TargetSource. targetSource.releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } } }
// We need to create a method invocation... MethodInvocationinvocation=// 构建 ReflectiveMethodInvocation 来控制 interceptor 链的调用 newReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain); // Proceed to the joinpoint through the interceptor chain. retVal = invocation.proceed();
@Override @Nullable public Object proceed()throws Throwable { // We start with an index of -1 and increment early. if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) { return invokeJoinpoint(); // 这里就是触发连接点方法 }
ObjectinterceptorOrInterceptionAdvice=// 逐个获取 interceptorOrInterceptionAdvice this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex); if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) { // Evaluate dynamic method matcher here: static part will already have // been evaluated and found to match. InterceptorAndDynamicMethodMatcherdm= (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice; Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass()); if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) { return dm.interceptor.invoke(this); } else { // Dynamic matching failed. // Skip this interceptor and invoke the next in the chain. return proceed(); } } else { // It's an interceptor, so we just invoke it: The pointcut will have // been evaluated statically before this object was constructed. // 对逐个获取的 interceptorOrInterceptionAdvice 进行 invoke 调用 return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this); // 参数为通过代理各项参数构建的 CglibMethodInvocation } }
实现如下,常规操作,只有这里采用了一个特殊的传参方式,传入的是一个方法引用,了解就好,继续往下跟。
1 2 3 4 5 6 7 8 9
public Object invoke(MethodInvocation invocation)throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. 获取目标类的类型 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
终于到了我们期待已久的事务实现逻辑代码了,赶紧的贴出源码开心一下,代码主干分为两个分支,就本例而言,我们只使用到了 if 分支,所以就只截取这一部分展示。而没用到的那一部分,我就没有做深入了解了。
if (this.reactiveAdapterRegistry != null) { // 暂不做了解 ReactiveAdapteradapter=this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); if (adapter != null) { returnnewReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation); } }
// If the transaction attribute is null, the method is non-transactional. TransactionAttributeSourcetas= getTransactionAttributeSource(); finalTransactionAttributetxAttr= (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); finalPlatformTransactionManagertm= determineTransactionManager(txAttr); // 获取事务管理器,尝试从缓存中获取 PlatformTransactionManager,没有的话就从 BeanFactory 中获取,缓存后进行返回 finalStringjoinpointIdentification= methodIdentification(method, targetClass, txAttr); // 获取方法的全限定名,没有的话通过 TransactionAttribute 的描述符代替
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 将 TransactionAttribute 封装为 DelegatingTransactionAttribute,从 PlatformTransactionManager 中获取 TransactionStatus TransactionInfotxInfo= createTransactionIfNecessary(tm, txAttr, joinpointIdentification); // 最后构建 TransactionInfo(通过参数构建 TransactionInfo,设置了状态信息, 将事务信心保存到线程本地变量中,同时保留了原先的事务信息,返回)
Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation(); // 继续下一个 advice 的调用,如果没有后续的 advice,直接进行目标方法的的调用 } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); // 对于异常发生后,事物的处理方式,提交或者回滚 throw ex; } finally { // 恢复线程本地变量中的 TransactionInfo cleanupTransactionInfo(txInfo); }
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatusstatus= txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } // 正常情况下,进行事务的提交 commitTransactionAfterReturning(txInfo); return retVal; } }
再回到我们 if 分支那里,因为我们在业务代码中抛出的异常是运行时异常,所以 if 判断的第二个条件也是成立的,也就是说我们将要进行事务的回滚操作。但是如果我们将业务代码中的异常修改为非运行异常,那么此时事务就会正常提交。这样带来的结果就是数据入库了。还是看看事务回滚的代码实现吧,根本的就是调用的下边的代码,更加细节的部分我已经没有去深究了,因为到这里对于理解事务机制的实现已经足够了,我这里就仅仅是将代码段的作用给进行标注。
if (status.hasSavepoint()) { // 看事务状态是否有保存点 if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } status.rollbackToHeldSavepoint(); // 回滚到保存点 } elseif (status.isNewTransaction()) { // 是否是新事务呢? if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } // 直接进行回滚 doRollback(status); // 进行事务的回滚操作 } else { // 不是新事务的处理方式,存在现有的事务处理方式 // Participating in larger transaction if (status.hasTransaction()) { // 检查回滚条件 if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { // 如果局部的事务失败,是否设置全局的事务回滚状态 if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); // 仅仅是设置回滚的标志 } else { if (status.isDebug()) { logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug("Should roll back transaction but cannot - no transaction available"); } // Unexpected rollback only matters here if we're asked to fail early if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Error ex) { // 触发 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } // 逐个触发同步器链每个元素的 afterCompletion 方法 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker if (unexpectedRollback) { thrownewUnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { // 事务回滚后的扫尾工作,设置事务的状态, 线程本地变量中移除资源信息(连接信息),设置连接的自动提交,恢复隔离级别,释放连接,恢复 connection holder 的状态信息 cleanupAfterCompletion(status); } }
只要注意一点,就是真正的事务回滚是在上边代码中的 doRollback(status); 语句中完成的,而其他部分就只是一些事务回滚生命周期方法的处理,或者是对于事务回滚后的一些善后工作。关于 doRollback(status); 的实现,我们不妨看看,其实就是跟我们自己进行事务回滚的操作一样,根本还是调用的 java.sql.Connection#rollback() 看到这里我想小伙伴已经对 spring 的事务实现有比较清晰的理解了,至于一些其他的细节部分,如果有精力就可以去看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@Override// 进行事务的回滚操作 protectedvoiddoRollback(DefaultTransactionStatus status) { DataSourceTransactionObjecttxObject= (DataSourceTransactionObject) status.getTransaction(); // 数据源事务对象 Connectioncon= txObject.getConnectionHolder().getConnection(); // 也就是说和数据库交互相关的信息都保存在 DataSourceTransactionObject 实例中 if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { con.rollback(); // 调用真正的回滚操作 } catch (SQLException ex) { thrownewTransactionSystemException("Could not roll back JDBC transaction", ex); } }