dotnetcore/FreeSql

在 asp.net core 中使用 UnitOfWorkManager 实现多种事务传播

2881099 opened this issue · 17 comments

本篇文章内容引导,如何在 asp.net core 项目中使用特性(注解) 的方式管理事务。

UnitOfWorkManager 只可以管理 Repository 仓储对象的事务,直接 fsql.Insert<T>() 是不行的!!但是可以用 repository.Orm.Insert<T>!!repository.Orm 是特殊实现的 IFreeSql,与 当前事务保持一致。

支持六种传播方式(propagation),意味着跨方法的事务非常方便,并且支持同步异步:

  • Requierd:如果当前没有事务,就新建一个事务,如果已存在一个事务中,加入到这个事务中,默认的选择。
  • Supports:支持当前事务,如果没有当前事务,就以非事务方法执行。
  • Mandatory:使用当前事务,如果没有当前事务,就抛出异常。
  • NotSupported:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
  • Never:以非事务方式执行操作,如果当前事务存在则抛出异常。
  • Nested:以嵌套事务方式执行。

第一步:引入动态代理库

肉夹馍:https://github.com/inversionhourglass/Rougamo

dotnet add package Rougamo.Fody

[AttributeUsage(AttributeTargets.Method)]
public class TransactionalAttribute : Rougamo.MoAttribute
{
    public Propagation Propagation { get; set; } = Propagation.Required;
    public IsolationLevel IsolationLevel { get => m_IsolationLevel.Value; set => m_IsolationLevel = value; }
    IsolationLevel? m_IsolationLevel;

    static AsyncLocal<IServiceProvider> m_ServiceProvider = new AsyncLocal<IServiceProvider>();
    public static void SetServiceProvider(IServiceProvider serviceProvider) => m_ServiceProvider.Value = serviceProvider;

    IUnitOfWork _uow;
    public override void OnEntry(MethodContext context)
    {
        var uowManager = m_ServiceProvider.Value.GetService(typeof(UnitOfWorkManager)) as UnitOfWorkManager;
        _uow = uowManager.Begin(this.Propagation, this.m_IsolationLevel);
    }
    public override void OnExit(MethodContext context)
    {
        if (typeof(Task).IsAssignableFrom(context.RealReturnType))
            ((Task)context.ReturnValue).ContinueWith(t => _OnExit());
        else _OnExit();

        void _OnExit()
        {
            try
            {
                if (context.Exception == null) _uow.Commit();
                else _uow.Rollback();
            }
            finally
            {
                _uow.Dispose();
            }
        }
    }
}
UnitOfWorkManager 成员 说明
IUnitOfWork Current 返回当前的工作单元
void Binding(repository) 将仓储的事务交给它管理
IUnitOfWork Begin(propagation, isolationLevel) 创建工作单元

第二步:配置 Startup.cs 注入、中间件

//Startup.cs
public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton<IFreeSql>(fsql);
    services.AddScoped<UnitOfWorkManager>();
    services.AddFreeRepository(null, typeof(Startup).Assembly);
   //批量注入 Service
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    app.Use(async (context, next) =>
    {
        TransactionalAttribute.SetServiceProvider(context.RequestServices);
        await next();
    });
}

第三步:在 Controller 或者 Service 或者 Repository 中使用事务特性

public class SongService
{
    readonly IBaseRepository<Song> _repoSong;
    readonly IBaseRepository<Detail> _repoDetail;
    readonly SongRepository _repoSong2;

    public SongService(IBaseRepository<Song> repoSong, IBaseRepository<Detail> repoDetail, SongRepository repoSong2)
    {
        _repoSong = repoSong;
        _repoDetail = repoDetail;
        _repoSong2 = repoSong2;
    }

    [Transactional]
    public virtual void Test1()
    {
        //这里 _repoSong、_repoDetail、_repoSong2 所有操作都是一个工作单元
        this.Test2();
    }

    [Transactional(Propagation = Propagation.Nested)]
    public virtual void Test2() //嵌套事务,新的(不使用 Test1 的事务)
    {
        //这里 _repoSong、_repoDetail、_repoSong2 所有操作都是一个工作单元
    }
}

是不是进方法就开事务呢?

不一定是真实事务,有可能是虚的,就是一个假的 unitofwork(不带事务)

也有可能是延用上一次的事务

也有可能是新开事务,具体要看传播模式

重写仓储实现

以上使用的是泛型仓储,那我们如果是重写一个仓储 如何保持和UnitOfWorkManager同一个事务呢。
继承现有的DefaultRepository<,>仓储,实现自定义的仓储SongRepository.cs,

public class SongRepository : DefaultRepository<Song, int>, ISongRepository
{
    public SongRepository(UnitOfWorkManager uowm) : base(uowm?.Orm, uowm)
    {
    }
    public List<Song> GetSongs()
    {
        return Select.Page(1, 10).ToList();
    }
}

其中接口。ISongRepository.cs

public interface ISongRepository : IBaseRepository<Song, int>
{
    List<Song> GetSongs();
}

在 startup.cs 注入此服务

services.AddScoped<ISongRepository, SongRepository>();

事务传播使用过程中发现一个BUG
如下面的代码

public void FunMain(){
     //代码1:
     //事务处理方法
      FuncA();

      //代码2:
      //读取、前面FuncA中添加、更新的数据
       .......
}

[Transaction]
public virtual  void FunA(){
      //代码2.1
      //查询数据,或不访问数据库
      .........

      //代码2.2
      //添加、更新数据到数据库
      FuncA_1();
}

[Transaction]
public virtual  void FuncA_1(){
      //代码2.2.1
      //添加、更新数据到数据库
      .......
}

执行完“代码1”后,“代码2”访问不到前面的事务中提交的数据,只有当方法FunMain全部结束后,数据才真正的提交到数据库。
我分析原因可能如下:
1、方法FunA和FuncA_1都标记了事务
2、首先进入FunA,UnitOfWorkManager标记启动事务,
3、“代码2.1”到“代码2.2”之间并没有对数据库执行“增、删、改”操作,所以并没有真正启动事务
4、然后“代码2.2”中,FunA中调用了FuncA_1,FuncA_1中传递了工作单元,UnitOfWorkManager标记启动事务
5、FuncA_1中对数据库执行了“增、删、改”操作,真正启动了事务
6、最后,FuncA_1执行完成提交事务,FunA执行完成提交事务

在第3步并没有真正启动事务,可能导致了Bug,执行完第6步后并没有真正把数据提交到数据库。
如果在第3步中,增加修改数据库的代码,那么第3步时就会真正启动事务,执行完第6步后,数据库里就有提交的数据。

另外请问,如果启用了主从库。
当启用事务时,会自动读取主库吗?
如果类似上面第3步的情况,会自动读取主库吗?

我确认了一下,开启事务或使用工作单元标记事务后,都是默认读取主库

有没有demo,我试试

我是直接在现有的项目中测试的,Demo一下子剥离不出来

TransactionAttribute 的 动态代理,有执行 OnBefore(Begin) OnAfter(Commit) 吗

fsql.Aop.TraceBefore
fsql.Aop.TraceAfter

这两个事件监视起来

OK,我试试 fsql.Aop.TraceBefore, fsql.Aop.TraceAfter

上面的那段代码,
fsql.Aop.TraceBefore:执行“代码1“时,启动了一次事务
fsql.Aop.TraceAfter:执行完”代码1“时,这个跟踪没有被触发,事务没有成功提交

经过测试,类似上面的代码,在同一个类中
在标记事务的方法内,调用另一个标记事务方法时。
跟踪结果是:
fsql.Aop.TraceBefore:事务在第一次时启动了
fsql.Aop.TraceAfter:方法执行结束后,事务并没有真正提交。只有当前线程执行结束后,才会真正提交事务。

与我猜测的,第3步是否更新数据库没有关系。

其实可以换个测试方式

var uowm = new UnitOfWorkManager(fsql);
var repo = new DefultRepository<t>(fsql, uowm);

using (var uow1 = uow.Begin())
{

using (var uow2 = uow.Begin())
{
repo.Insert(...);
uow2.Commit();
}
repo.Where(...).ToList(); //是否变化
uow1.Commit();
}

像这样整理一个 demo 传上来

@yus1977 这样测试可以吗?

@yus1977 这样测试可以吗?

OK,我找时间测试一下

经过测试,上面的代码没有问题,UnitOfWorkManager本身没有问题。
是我这边的问题,写的AOP有BUG。
我用的是Autofac,所以照着前面的TransactionalAttribute 重新写了AOP,TransactionInterceptor。
我的事务在Service对象上,当同一个对象多个标记了事务的方法嵌套时,抄过来的代码就会出问题

原因如下:
因为一个Servier实例通过AOP编织后,绑定的TransactionInterceptor是同一个实例。
如下面的代码中

IUnitOfWork _uow;
........
Task OnBefore(UnitOfWorkManager uowm)
    {
        _uow = uowm.Begin(this.Propagation, this.IsolationLevel);
        return Task.FromResult(false);
    }

同一个实例多次启用事务后,_uow会被后面启用的事务覆盖。
而第二次以后启动的事务创建的的工作单元是UnitOfWorkVirtual。
因此不能简单的使用变量来存储创建的IUnitOfWork

下面是完整的代码
AOP使用是的是 Autofac +Castle.DynamicProxy + Castle.Core.AsyncInterceptor

    /// <summary>
    /// 事务拦截器TranAOP, 支持同步和异步方法
    /// </summary>
    public class TransactionInterceptor : AsyncInterceptorBase
    {
        public TransactionInterceptor(UnitOfWorkManager uowManager, IConfiguration configuration)
        {
            _uowManager = uowManager;

            this.configuration = configuration;
            transactionMonitor = configuration["ContextAutoSetup:TransactionMonitor"].ToBool();
        }

        private readonly UnitOfWorkManager _uowManager;
        private readonly IConfiguration configuration;
        bool transactionMonitor = false;

        /// <summary>
        /// 无返回值的 异步/同步 方法拦截
        /// </summary>
        /// <param name="invocation"></param>
        /// <param name="proceed"></param>
        /// <returns></returns>
        protected override async Task InterceptAsync(IInvocation invocation, Func<IInvocation, Task> proceed)
        {
            var methodInfo = invocation.MethodInvocationTarget ?? invocation.Method;
            //对当前方法的特性验证
            if (methodInfo.HasAttribute<TransactionAttribute>())
            {
                using (var unitOfWork = BeginTransaction(methodInfo))
                {
                    try
                    {
                        await proceed(invocation).ConfigureAwait(false);

                        CommitTransaction(unitOfWork);
                    }
                    catch (Exception ex)
                    {
                        RollbackTransaction(unitOfWork);
                        throw ex;
                    }
                }
            }
            else
                await proceed(invocation).ConfigureAwait(false);
        }

        /// <summary>
        /// 有返回值的 异步/同步 方法拦截
        /// </summary>
        /// <param name="invocation"></param>
        /// <param name="proceed"></param>
        /// <returns></returns>
        protected override async Task<TResult> InterceptAsync<TResult>(IInvocation invocation, Func<IInvocation, Task<TResult>> proceed)
        {
            var methodInfo = invocation.MethodInvocationTarget ?? invocation.Method;
            //对当前方法的特性验证
            if (methodInfo.HasAttribute<TransactionAttribute>())
            {
                using (var unitOfWork = BeginTransaction(methodInfo))
                {
                    try
                    {
                        var result = await proceed(invocation).ConfigureAwait(false);

                        CommitTransaction(unitOfWork);

                        return result;
                    }
                    catch (Exception ex)
                    {
                        RollbackTransaction(unitOfWork);
                        throw ex;
                    }
                }
            }
            else
                return await proceed(invocation).ConfigureAwait(false);
        }

        /// <summary>
        /// 开始事务
        /// </summary>
        /// <param name="methodInfo"></param>
        private IUnitOfWork BeginTransaction(MethodInfo methodInfo)
        {
            WriteConsole(0);

            TransactionAttribute attribute = methodInfo.GetCustomAttribute<TransactionAttribute>();
            //启动事务
            return _uowManager.Begin(attribute.Propagation, attribute.IsolationLevel);
        }

        /// <summary>
        /// //提交事务
        /// </summary>
        private void CommitTransaction(IUnitOfWork unitOfWork)
        {
            unitOfWork.Commit();

            WriteConsole(1);
        }

        /// <summary>
        /// 回滚事务
        /// </summary>
        private void RollbackTransaction(IUnitOfWork unitOfWork)
        {
            unitOfWork.Rollback();

            WriteConsole(2);
        }

        /// <summary>
        /// 控制台输出信息
        /// </summary>
        /// <param name="msg"></param>
        /// <param name="type">0开始事务,1提交事务,2回滚事务</param>
        private void WriteConsole(int type)
        {
            if (transactionMonitor)
            {
                string msg = "";
                switch (type)
                {
                    case 0:
                        msg = $"***Transaction Begin***\n";
                        break;
                    case 1:
                        msg = $"***Transactio Commit***\n";
                        break;
                    case 2:
                        msg = $"***Transaction* Rollback**\n";
                        break;
                }

                ConsoleHelper.WriteSuccessLine(msg);
            }
        }
    }
}

IdleBus + UnitOfWorkManager

默认 UnitOfWorkManager 只能绑定一个 FreeSql 使用,所以 Scoped 生命范围内不切换的情况下,可以实现 IdleBus + UnitOfWorkManager 以来实现。

第一步:定义 IdleBus 扩展方法

public static class IdleBusExtesions
{
    static AsyncLocal<string> AsyncLocalTenantId = new AsyncLocal<string>();
    public static IdleBus<IFreeSql> ChangeTenant(this IdleBus<IFreeSql> ib, string tenantId)
    {
        AsyncLocalTenantId.Value = tenantId;
        return ib;
    }
    public static IFreeSql Get(this IdleBus<IFreeSql> ib) => ib.Get(AsyncLocalTenantId.Value ?? "default");
    public static IBaseRepository<T> GetRepository<T>(this IdleBus<IFreeSql> ib) where T : class => ib.Get().GetRepository<T>();
}

第二步:定义 IFreeSql 代理类实现

class FreeSqlProxy : IFreeSql
{
    readonly IFreeSql _orm;

    public FreeSqlProxy(IFreeSql fsql)
    {
        _orm = fsql;
    }

    public IAdo Ado => _orm.Ado;
    public IAop Aop => _orm.Aop;
    public ICodeFirst CodeFirst => _orm.CodeFirst;
    public IDbFirst DbFirst => _orm.DbFirst;
    public GlobalFilter GlobalFilter => _orm.GlobalFilter;
    //关键在此处,释放无任何操作
    public void Dispose() { }

    public void Transaction(Action handler) => _orm.Transaction(handler);
    public void Transaction(IsolationLevel isolationLevel, Action handler) => _orm.Transaction(isolationLevel, handler);

    public ISelect<T1> Select<T1>() where T1 : class
    {
        return _orm.Select<T1>();
    }
    public ISelect<T1> Select<T1>(object dywhere) where T1 : class => Select<T1>().WhereDynamic(dywhere);

    public IDelete<T1> Delete<T1>() where T1 : class
    {
        return _orm.Delete<T1>();
    }
    public IDelete<T1> Delete<T1>(object dywhere) where T1 : class => Delete<T1>().WhereDynamic(dywhere);

    public IUpdate<T1> Update<T1>() where T1 : class
    {
        return _orm.Update<T1>();
    }
    public IUpdate<T1> Update<T1>(object dywhere) where T1 : class => Update<T1>().WhereDynamic(dywhere);

    public IInsert<T1> Insert<T1>() where T1 : class
    {
        return _orm.Insert<T1>();
    }
    public IInsert<T1> Insert<T1>(T1 source) where T1 : class => Insert<T1>().AppendData(source);
    public IInsert<T1> Insert<T1>(T1[] source) where T1 : class => Insert<T1>().AppendData(source);
    public IInsert<T1> Insert<T1>(List<T1> source) where T1 : class => Insert<T1>().AppendData(source);
    public IInsert<T1> Insert<T1>(IEnumerable<T1> source) where T1 : class => Insert<T1>().AppendData(source);

    public IInsertOrUpdate<T1> InsertOrUpdate<T1>() where T1 : class
    {
        return _orm.InsertOrUpdate<T1>();
    }
}

第三步:改造 Startup.cs 注入

public static IdleBus<IFreeSql> ib = new IdleBus<IFreeSql>();

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllersWithViews();

    //ib.Register(..);
    //注意此时注入 AddScoped IFreeSql
    services.AddScoped<IFreeSql>(serviceProvider => new FreeSqlProxy(ib.Get()));
    services.AddScoped<UnitOfWorkManager>();
    services.AddFreeRepository(null, typeof(Startup).Assembly);
}

第四步:使用方法请参照 一楼

.net 6.0,按照上面的式例,在方法添加Transactional特性,但是没有生效,不是进入事务里,这个是什么原因引起吗?
Service相关代码:
[Transactional]
private async Task SaveFlowInfoAsync(FlowInfoAddInput flowInfoInput, CancellationToken cancellationToken)
{
//using IUnitOfWork unitOfWork = _unitOfWorkManager.Begin(isolationLevel: IsolationLevel.Snapshot);
//try
//{
await this._flowDetailService.AddAsync(flowInfoInput, cancellationToken);
//其他业务逻辑
。。。

//    unitOfWork.Commit();
//}
//catch (Exception e)
//{
//    unitOfWork.Rollback();
//    LogUtils.LogErr(this, $"流程提交异常:{e.Message}。PARAMS:{flowInfoInput.FlowSubmitInput.ToJson()}", e);
//    throw new CustomException(ExceptionConst.FlowSubmitFail);
//}

}

programe.cs代码:
var app = builder.Build();

//使用Correlation中间件
app.UseCorrelationId();
if (app.Environment.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
//使用自定义的异常中间件
app.UseErrorHandling();
app.UseStaticFiles();
app.UseLocalLogViewer();
app.Use(async (context, next) =>
{
TransactionalAttribute.SetServiceProvider(context.RequestServices);
await next();
});
//使用路由中间件
app.UseRouting()
//使用终端中间件
.UseEndpoints(endpoints =>
{
endpoints.MapGet("/", async context =>
{
await context.Response.WriteAsync("Hello World!");
});
endpoints.MapControllers();
});

app.Run();