欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

spring事务@Transactional执行流程源码学习

时间:2023-06-30
@Transactional工作机制简述

spring定义了@Transactional注解,基于AbstractBeanFactoryPointcutAdvisor、StaticMethodMatcherPointcut、MethodInterceptor的aop编程模式,增强了添加@Transactional注解的方法。同时抽象了事务行为为PlatformTransactionManager(事务管理器)、TransactionStatus(事务状态)、TransactionDefinition(事务定义)等形态。最终将事务的开启、提交、回滚等逻辑嵌入到被增强的方法的前后,完成统一的事务模型管理。

版本:spring5.1.5

1、AOP增强拦截

service bean基于AOP生成代理对象,执行service方法时被AOP拦截
CglibAopProxy.DynamicAdvisedInterceptor

public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {...if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) { Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = methodProxy.invoke(target, argsToUse);} else { retVal = (new CglibAopProxy.CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy)).proceed();}...}

2、被事务拦截器拦截

TransactionInterceptor

public Object invoke(MethodInvocation invocation) throws Throwable { Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null; Method var10001 = invocation.getMethod(); invocation.getClass(); //调用父类方法 return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);}

类图

3、事务处理

TransactionAspectSupport

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable {//事务属性TransactionAttributeSource tas = this.getTransactionAttributeSource();TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null;//事务管理器PlatformTransactionManager tm = this.determineTransactionManager(txAttr);String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr);//返回结果Object result;... } else {//创建事务 TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification); result = null; try { //通过回调执行业务方法 result = invocation.proceedWithInvocation(); } catch (Throwable var17) { //异常回滚 this.completeTransactionAfterThrowing(txInfo, var17); throw var17; } finally { //恢复自动提交 this.cleanupTransactionInfo(txInfo); }//提交事务 this.commitTransactionAfterReturning(txInfo); return result; }

创建事务

protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) { txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) { public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { //从事务管理器获取事务 status = tm.getTransaction((TransactionDefinition)txAttr); } else if (this.logger.isDebugEnabled()) { this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);}

4、从事务管理器获取事务

父类PlatformTransactionManager

public interface PlatformTransactionManager { TransactionStatus getTransaction(@Nullable TransactionDefinition var1) throws TransactionException; void commit(TransactionStatus var1) throws TransactionException; void rollback(TransactionStatus var1) throws TransactionException;}

子类AbstractPlatformTransactionManager

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { //获取事务对象 Object transaction = this.doGetTransaction();......//已存在事务时的处理情况 if (this.isExistingTransaction(transaction)) { return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled); } .... //根据事务的传播行为进行区分处理} else { AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null); if (debugEnabled) { this.logger.debug("Creating new transaction with name [" + ((TransactionDefinition)definition).getName() + "]: " + definition); } try { boolean newSynchronization = this.getTransactionSynchronization() != 2; DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //开启事务 this.doBegin(transaction, (TransactionDefinition)definition); this.prepareSynchronization(status, (TransactionDefinition)definition); return status; } catch (Error | RuntimeException var7) { this.resume((Object)null, suspendedResources); throw var7; } }

子类DataSourceTransactionManager

protected Object doGetTransaction() { //构造数据源事务对象 DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject(); //是否允许设置回滚点 txObject.setSavepointAllowed(this.isNestedTransactionAllowed()); //设置连接 ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject; }

5、开启事务

DataSourceTransactionManager

protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction; Connection con = null; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = this.obtainDataSource().getConnection(); if (this.logger.isDebugEnabled()) { this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); }//设置连接 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); //获取连接 con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (this.logger.isDebugEnabled()) { this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); }//事务设置为手动提交 con.setAutoCommit(false); } this.prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true); int timeout = this.determineTimeout(definition); if (timeout != -1) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable var7) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.obtainDataSource()); txObject.setConnectionHolder((ConnectionHolder)null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7); } }

数据库连接池
DruidPooledConnection

public void setAutoCommit(boolean autoCommit) throws SQLException { this.checkState(); boolean useLocalSessionState = this.holder.getDataSource().isUseLocalSessionState(); if (!useLocalSessionState || autoCommit != this.holder.underlyingAutoCommit) { try { //设置手动提交方式 this.conn.setAutoCommit(autoCommit); this.holder.setUnderlyingAutoCommit(autoCommit); } catch (SQLException var4) { this.handleException(var4, (String)null); } } }

mysql连接驱动
ConnectionImpl

public void setAutoCommit(final boolean autoCommitFlag) throws SQLException {....... try { boolean needsSetonServer = true; if ((Boolean)this.useLocalSessionState.getValue() && this.session.getServerSession().isAutoCommit() == autoCommitFlag) { needsSetonServer = false; } else if (!(Boolean)this.autoReconnect.getValue()) { needsSetonServer = this.getSession().isSetNeededForAutoCommitMode(autoCommitFlag); }//设置为手动提交 this.session.getServerSession().setAutoCommit(autoCommitFlag); if (needsSetOnServer) { //设置SQL参数 this.session.execSQL((Query)null, autoCommitFlag ? "SET autocommit=1" : "SET autocommit=0", -1, (NativePacketPayload)null, false, this.nullStatementResultSetFactory, this.database, (ColumnDefinition)null, false); } } .....}

6、回调执行业务逻辑

TransactionAspectSupport

7、提交事务

TransactionAspectSupport
→AbstractPlatformTransactionManager(平台事务管理器)
→DataSourceTransactionManager(数据源事务管理器)
→DruidPooledConnection(数据库连接池)
→ConnectionImpl(MySQL数据库连接驱动)

8、事务回滚

TransactionAspectSupport
→AbstractPlatformTransactionManager(平台事务管理器)
→DataSourceTransactionManager(数据源事务管理器)
→DruidPooledConnection(数据库连接池)
→ConnectionImpl(MySQL数据库连接驱动)

9、恢复自动提交

TransactionAspectSupport
restoreThreadLocalStatus()

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。