diff --git a/go/mysql/sqlerror/constants.go b/go/mysql/sqlerror/constants.go index a247ca15aa4..42425fa68a4 100644 --- a/go/mysql/sqlerror/constants.go +++ b/go/mysql/sqlerror/constants.go @@ -34,8 +34,9 @@ func (e ErrorCode) ToString() string { // See above reference for more information on each code. const ( // Vitess specific errors, (100-999) - ERNotReplica = ErrorCode(100) - ERNonAtomicCommit = ErrorCode(301) + ERNotReplica = ErrorCode(100) + ERNonAtomicCommit = ErrorCode(301) + ERInAtomicRecovery = ErrorCode(302) // unknown ERUnknownError = ErrorCode(1105) diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 0aeab986ea6..d0bfc88bdd4 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -61,6 +61,16 @@ var txAccessModeToEOTxAccessMode = map[sqlparser.TxAccessMode]querypb.ExecuteOpt sqlparser.ReadOnly: querypb.ExecuteOptions_READ_ONLY, } +type CommitPhase int + +const ( + COMMIT2PC_CREATETRANSACTION = iota + COMMIT2PC_PREPARE = iota + COMMIT2PC_STARTCOMMIT = iota + COMMIT2PC_PREPARECOMMIT + COMMIT2PC_CONCLUDE +) + // Begin begins a new transaction. If one is already in progress, it commits it // and starts a new one. func (txc *TxConn) Begin(ctx context.Context, session *SafeSession, txAccessModes []sqlparser.TxAccessMode) error { @@ -178,7 +188,7 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error } // commit2PC will not used the pinned tablets - to make sure we use the current source, we need to use the gateway's queryservice -func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { +func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err error) { if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 { _ = txc.Rollback(ctx, session) return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "pre or post actions not allowed for 2PC commits") @@ -195,8 +205,29 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { } mmShard := session.ShardSessions[0] dtid := dtids.New(mmShard) - err := txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants) - if err != nil { + + txPhase := COMMIT2PC_CREATETRANSACTION + defer func() { + if err == nil { + return + } + warningMsg := fmt.Sprintf("%s distributed transaction ID failed during", dtid) + switch txPhase { + case COMMIT2PC_CREATETRANSACTION: + warningMsg += " transaction record creation" + case COMMIT2PC_PREPARE: + warningMsg += " transaction prepare phase" + case COMMIT2PC_STARTCOMMIT: + warningMsg += " metadata manager commit" + case COMMIT2PC_PREPARECOMMIT: + warningMsg += " resource manager commit" + case COMMIT2PC_CONCLUDE: + warningMsg += " transaction conclusion" + } + session.RecordWarning(&querypb.QueryWarning{Code: uint32(sqlerror.ERInAtomicRecovery), Message: warningMsg}) + }() + + if err = txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants); err != nil { // Normal rollback is safe because nothing was prepared yet. _ = txc.Rollback(ctx, session) return err @@ -209,6 +240,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { } } + txPhase = COMMIT2PC_PREPARE err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { if DebugTwoPc { // Test code to simulate a failure during RM prepare @@ -235,6 +267,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { } } + txPhase = COMMIT2PC_STARTCOMMIT err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid) if err != nil { return err @@ -247,6 +280,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { } } + txPhase = COMMIT2PC_PREPARECOMMIT err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { if DebugTwoPc { // Test code to simulate a failure during RM prepare @@ -260,6 +294,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { return err } + txPhase = COMMIT2PC_CONCLUDE return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) }