> 文章列表 > 【源码解析】Spring事务 @Transactional 源码解析

【源码解析】Spring事务 @Transactional 源码解析

【源码解析】Spring事务 @Transactional 源码解析

源码解析

自动化配置

spring-boot-autoconfigure查看spring.factories引入TransactionAutoConfiguration

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\\
org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration,\\

查看TransactionAutoConfiguration源码发现包含注解@EnableTransactionManagement

    @Configuration(proxyBeanMethods = false)@ConditionalOnBean({TransactionManager.class})@ConditionalOnMissingBean({AbstractTransactionManagementConfiguration.class})public static class EnableTransactionManagementConfiguration {public EnableTransactionManagementConfiguration() {}@Configuration(proxyBeanMethods = false)@EnableTransactionManagement(proxyTargetClass = true)@ConditionalOnProperty(prefix = "spring.aop",name = {"proxy-target-class"},havingValue = "true",matchIfMissing = true)public static class CglibAutoProxyConfiguration {public CglibAutoProxyConfiguration() {}}@Configuration(proxyBeanMethods = false)@EnableTransactionManagement(proxyTargetClass = false)@ConditionalOnProperty(prefix = "spring.aop",name = {"proxy-target-class"},havingValue = "false",matchIfMissing = false)public static class JdkDynamicAutoProxyConfiguration {public JdkDynamicAutoProxyConfiguration() {}}}

查看@EnableTransactionManagement源码,发现TransactionManagementConfigurationSelector

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({TransactionManagementConfigurationSelector.class})
public @interface EnableTransactionManagement {boolean proxyTargetClass() default false;AdviceMode mode() default AdviceMode.PROXY;int order() default 2147483647;
}

查看TransactionManagementConfigurationSelector,引入了ProxyTransactionManagementConfiguration

public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {public TransactionManagementConfigurationSelector() {}protected String[] selectImports(AdviceMode adviceMode) {switch(adviceMode) {case PROXY:return new String[]{AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()};case ASPECTJ:return new String[]{this.determineTransactionAspectClass()};default:return null;}}private String determineTransactionAspectClass() {return ClassUtils.isPresent("javax.transaction.Transactional", this.getClass().getClassLoader()) ? "org.springframework.transaction.aspectj.AspectJJtaTransactionManagementConfiguration" : "org.springframework.transaction.aspectj.AspectJTransactionManagementConfiguration";}
}

切面和拦截器

ProxyTransactionManagementConfiguration往容器中注入BeanFactoryTransactionAttributeSourceAdvisor,切面是TransactionAttributeSource,拦截器是TransactionInterceptor

@Configuration(proxyBeanMethods = false
)
@Role(2)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {public ProxyTransactionManagementConfiguration() {}@Bean(name = {"org.springframework.transaction.config.internalTransactionAdvisor"})@Role(2)public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();advisor.setTransactionAttributeSource(transactionAttributeSource);advisor.setAdvice(transactionInterceptor);if (this.enableTx != null) {advisor.setOrder((Integer)this.enableTx.getNumber("order"));}return advisor;}@Bean@Role(2)public TransactionAttributeSource transactionAttributeSource() {return new AnnotationTransactionAttributeSource();}@Bean@Role(2)public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {TransactionInterceptor interceptor = new TransactionInterceptor();interceptor.setTransactionAttributeSource(transactionAttributeSource);if (this.txManager != null) {interceptor.setTransactionManager(this.txManager);}return interceptor;}
}

判断类是否需要被事务拦截。

AnnotationTransactionAttributeSource#isCandidateClass,判断是候选类

    public boolean isCandidateClass(Class<?> targetClass) {Iterator var2 = this.annotationParsers.iterator();TransactionAnnotationParser parser;do {if (!var2.hasNext()) {return false;}parser = (TransactionAnnotationParser)var2.next();} while(!parser.isCandidateClass(targetClass));return true;}

SpringTransactionAnnotationParser#isCandidateClass

    public boolean isCandidateClass(Class<?> targetClass) {return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);}

拦截器的具体逻辑

拦截器TransactionInterceptor,执行TransactionInterceptor#invoke方法

    @Nullablepublic Object invoke(MethodInvocation invocation) throws Throwable {Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;Method var10001 = invocation.getMethod();invocation.getClass();return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);}

TransactionAspectSupport#invokeWithinTransaction,具体的事务处理逻辑。

  1. createTransactionIfNecessary。创建事务,包含数据库的连接和事务状态等信息;

  2. completeTransactionAfterThrowing。调用目标方法报错,回滚;

  3. commitTransactionAfterReturning。调用目标方法成功,提交事务(里面有一些判断,不符合条件可能会回滚)

    @Nullableprotected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable {TransactionAttributeSource tas = this.getTransactionAttributeSource();TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;TransactionManager tm = this.determineTransactionManager(txAttr);if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {// ... } else {PlatformTransactionManager ptm = this.asPlatformTransactionManager(tm);String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);Object retVal;// ...} else {TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);try {retVal = invocation.proceedWithInvocation();} catch (Throwable var18) {this.completeTransactionAfterThrowing(txInfo, var18);throw var18;} finally {this.cleanupTransactionInfo(txInfo);}if (vavrPresent && TransactionAspectSupport.VavrDelegate.isVavrTry(retVal)) {TransactionStatus status = txInfo.getTransactionStatus();if (status != null && txAttr != null) {retVal = TransactionAspectSupport.VavrDelegate.evaluateTryFailure(retVal, txAttr, status);}}this.commitTransactionAfterReturning(txInfo);return retVal;}}}

获取事务

TransactionAspectSupport#createTransactionIfNecessary,创建事务

    protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {public String getName() {return joinpointIdentification;}};}TransactionStatus status = null;if (txAttr != null) {if (tm != null) {status = tm.getTransaction((TransactionDefinition)txAttr);} else if (this.logger.isDebugEnabled()) {this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");}}return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);}

AbstractPlatformTransactionManager#getTransaction,获取事务。

    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {TransactionDefinition def = definition != null ? definition : TransactionDefinition.withDefaults();Object transaction = this.doGetTransaction();boolean debugEnabled = this.logger.isDebugEnabled();if (this.isExistingTransaction(transaction)) {return this.handleExistingTransaction(def, transaction, debugEnabled);} else if (def.getTimeout() < -1) {throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());} else if (def.getPropagationBehavior() == 2) {throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");} else if (def.getPropagationBehavior() != 0 && def.getPropagationBehavior() != 3 && def.getPropagationBehavior() != 6) {if (def.getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + def);}boolean newSynchronization = this.getTransactionSynchronization() == 0;return this.prepareTransactionStatus(def, (Object)null, true, newSynchronization, debugEnabled, (Object)null);} else {AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);if (debugEnabled) {this.logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);}try {return this.startTransaction(def, transaction, debugEnabled, suspendedResources);} catch (Error | RuntimeException var7) {this.resume((Object)null, suspendedResources);throw var7;}}}private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources) {boolean newSynchronization = this.getTransactionSynchronization() != 2;DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);this.doBegin(transaction, definition);this.prepareSynchronization(status, definition);return status;}

DataSourceTransactionManager#doBegin,事务管理器开启连接,设置未提交。判断事务是新的,则将资源绑定到TransactionSynchronizationManager

    protected void doBegin(Object transaction, TransactionDefinition definition) {DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;Connection con = null;try {if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {Connection newCon = this.obtainDataSource().getConnection();if (this.logger.isDebugEnabled()) {this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");}txObject.setConnectionHolder(new ConnectionHolder(newCon), true);}txObject.getConnectionHolder().setSynchronizedWithTransaction(true);con = txObject.getConnectionHolder().getConnection();Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);txObject.setPreviousIsolationLevel(previousIsolationLevel);txObject.setReadOnly(definition.isReadOnly());if (con.getAutoCommit()) {txObject.setMustRestoreAutoCommit(true);if (this.logger.isDebugEnabled()) {this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");}con.setAutoCommit(false);}this.prepareTransactionalConnection(con, definition);txObject.getConnectionHolder().setTransactionActive(true);int timeout = this.determineTimeout(definition);if (timeout != -1) {txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}if (txObject.isNewConnectionHolder()) {TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());}} catch (Throwable var7) {if (txObject.isNewConnectionHolder()) {DataSourceUtils.releaseConnection(con, this.obtainDataSource());txObject.setConnectionHolder((ConnectionHolder)null, false);}throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);}}

事务的数据库连接信息将保存到TransactionSynchronizationManager

    private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal("Transactional resources");public static void bindResource(Object key, Object value) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Assert.notNull(value, "Value must not be null");Map<Object, Object> map = (Map)resources.get();if (map == null) {map = new HashMap();resources.set(map);}Object oldValue = ((Map)map).put(actualKey, value);if (oldValue instanceof ResourceHolder && ((ResourceHolder)oldValue).isVoid()) {oldValue = null;}if (oldValue != null) {throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");} else {if (logger.isTraceEnabled()) {logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]");}}}

事务回滚

TransactionAspectSupport#completeTransactionAfterThrowing

    protected void completeTransactionAfterThrowing(@Nullable TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) {if (txInfo != null && txInfo.getTransactionStatus() != null) {if (this.logger.isTraceEnabled()) {this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex);}if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {try {txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());} catch (TransactionSystemException var6) {this.logger.error("Application exception overridden by rollback exception", ex);var6.initApplicationException(ex);throw var6;} catch (Error | RuntimeException var7) {this.logger.error("Application exception overridden by rollback exception", ex);throw var7;}} else {try {txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());} catch (TransactionSystemException var4) {this.logger.error("Application exception overridden by commit exception", ex);var4.initApplicationException(ex);throw var4;} catch (Error | RuntimeException var5) {this.logger.error("Application exception overridden by commit exception", ex);throw var5;}}}}

AbstractPlatformTransactionManager#rollback

    public final void rollback(TransactionStatus status) throws TransactionException {if (status.isCompleted()) {throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");} else {DefaultTransactionStatus defStatus = (DefaultTransactionStatus)status;this.processRollback(defStatus, false);}}

DataSourceTransactionManager#doRollback,调用到Connection的回滚方法

    protected void doRollback(DefaultTransactionStatus status) {DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();Connection con = txObject.getConnectionHolder().getConnection();if (status.isDebug()) {this.logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");}try {con.rollback();} catch (SQLException var5) {throw new TransactionSystemException("Could not roll back JDBC transaction", var5);}}

事务提交

TransactionAspectSupport#commitTransactionAfterReturning,进行事务提交

    protected void commitTransactionAfterReturning(@Nullable TransactionAspectSupport.TransactionInfo txInfo) {if (txInfo != null && txInfo.getTransactionStatus() != null) {if (this.logger.isTraceEnabled()) {this.logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");}txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());}}

获取连接

SpringManagedTransaction#getConnection

    public Connection getConnection() throws SQLException {if (this.connection == null) {this.openConnection();}return this.connection;}private void openConnection() throws SQLException {this.connection = DataSourceUtils.getConnection(this.dataSource);this.autoCommit = this.connection.getAutoCommit();this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);LOGGER.debug(() -> {return "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring";});}

DataSourceUtils#getConnection,获取连接,是从TransactionSynchronizationManager获取的,用到了线程变量。

    public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {try {return doGetConnection(dataSource);} catch (SQLException var2) {throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", var2);} catch (IllegalStateException var3) {throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + var3.getMessage());}}public static Connection doGetConnection(DataSource dataSource) throws SQLException {Assert.notNull(dataSource, "No DataSource specified");ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(dataSource);if (conHolder == null || !conHolder.hasConnection() && !conHolder.isSynchronizedWithTransaction()) {logger.debug("Fetching JDBC Connection from DataSource");Connection con = fetchConnection(dataSource);if (TransactionSynchronizationManager.isSynchronizationActive()) {try {ConnectionHolder holderToUse = conHolder;if (conHolder == null) {holderToUse = new ConnectionHolder(con);} else {conHolder.setConnection(con);}holderToUse.requested();TransactionSynchronizationManager.registerSynchronization(new DataSourceUtils.ConnectionSynchronization(holderToUse, dataSource));holderToUse.setSynchronizedWithTransaction(true);if (holderToUse != conHolder) {TransactionSynchronizationManager.bindResource(dataSource, holderToUse);}} catch (RuntimeException var4) {releaseConnection(con, dataSource);throw var4;}}return con;} else {conHolder.requested();if (!conHolder.hasConnection()) {logger.debug("Fetching resumed JDBC Connection from DataSource");conHolder.setConnection(fetchConnection(dataSource));}return conHolder.getConnection();}}

【源码解析】Spring事务 @Transactional 源码解析