dtm-labs/dtm

能否将BranchBarrier中Call的提交和回滚提取出来

yeyudekuangxiang opened this issue · 4 comments

能否将BranchBarrier中Call的提交和回滚提取出来,这样就可以使用gorm的Transaction或者其他方式处理提交和回滚

func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {
	defer dtmimp.DeferDo(&rerr, func() error {
		return tx.Commit()
	}, func() error {
		return tx.Rollback()
	})
	rerr = bb.CallWithOutDefer(tx,busiCall)
	return 
}
func (bb *BranchBarrier) CallWithOutDefer(tx *sql.Tx,busiCall BarrierBusiFunc) (rerr error) {
	bid := bb.newBarrierID()
	originOp := map[string]string{
		dtmimp.OpCancel:     dtmimp.OpTry,    // tcc
		dtmimp.OpCompensate: dtmimp.OpAction, // saga
		dtmimp.OpRollback:   dtmimp.OpAction, // workflow
	}[bb.Op]

	originAffected, oerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op, bb.DBType, bb.BarrierTableName)
	currentAffected, rerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op, bb.DBType, bb.BarrierTableName)
	logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)

	if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
		return ErrDuplicated
	}

	if rerr == nil {
		rerr = oerr
	}

	if (bb.Op == dtmimp.OpCancel || bb.Op == dtmimp.OpCompensate || bb.Op == dtmimp.OpRollback) && originAffected > 0 || // null compensate
		currentAffected == 0 { // repeated request or dangled request
		return
	}
	if rerr == nil {
		rerr = busiCall(tx)
	}
	return
}

使用案例

func TestName(t *testing.T) {
	gormDB, err := gorm.Open()
	msg := dtmgrpc.NewMsgGrpc("", "")
	msg.DoAndSubmit("", func(bb *dtmcli.BranchBarrier) error {
		return gormDB.Transaction(func(gormTx *gorm.DB) error {
			return bb.CallWithOutDefer(gormTx.Statement.ConnPool.(*sql.Tx), func(tx *sql.Tx) error {
				return gormTx.Create(&User{
					Id:   1,
					Name: "neil",
				}).Error
			})
		})
	})
}
yedf2 commented

现在就已经支持使用gorm的事务进行处理,参见: https://dtm.pub/ref/sdk.html#orm

现在就已经支持使用gorm的事务进行处理,参见: https://dtm.pub/ref/sdk.html#orm

这个我看到了,不过我们是UserModel里面有一个*gorm.DB字段。如果在UserModel初始化的时候传给它一个已经开启了事务的*gorm.DB,然后再在UserModel里面使用 u.db.Begin()就会报错。
如果我们使用gorm的事务嵌套就可以了,但是需要dtm提供一个不自动commit/rollback的BranchBarrier.Call方法,也就是我上面说的简单的把现有的Call方法拆成两个方法,一个自动commit/rollback,一个由使用者自己去处理commit/rollback,更加灵活一点

func TestName(t *testing.T) {
	gormDB,_:=gorm.Open(mysql.Open("dsn"))
	//正常情况
	um:=UserModel{db: gormDB}
	um.Insert(User{})
	
	//异常情况 报错 invalid transaction
	gormTx:=gormDB.Begin()
	um2:=UserModel{db: gormTx}
	um2.Insert(User{})
}

type User struct {
	Id int64
}
type UserModel struct {
	db *gorm.DB
}

func (u UserModel) Insert(v User) error {
	tx := u.db.Begin()
	msg := dtmcli.NewMsg("server", "gid")
	return msg.DoAndSubmit("", func(bb *dtmcli.BranchBarrier) error {
		return bb.Call(tx.Statement.ConnPool.(*sql.Tx), func(tx *sql.Tx) error {
			_, err := tx.Exec("insert into user values (?)", v.Id)
			return err
		})
	})
}
yedf2 commented

感觉你的这个场景需求会不会太小众了?为什么一定要自己开启和结束事务,希望能够详细说明一下为什么需要支持这种场景

应该是我考虑的有问题,不好意思,感谢回答