Skip to content

Commit

Permalink
feat: record warning on 2pc commit failure
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Jul 24, 2024
1 parent f8758ac commit 68eb896
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
5 changes: 3 additions & 2 deletions go/mysql/sqlerror/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ func (e ErrorCode) ToString() string {
// See above reference for more information on each code.
const (
// Vitess specific errors, (100-999)
ERNotReplica = ErrorCode(100)
ERNonAtomicCommit = ErrorCode(301)
ERNotReplica = ErrorCode(100)
ERNonAtomicCommit = ErrorCode(301)
ERInAtomicRecovery = ErrorCode(302)

// unknown
ERUnknownError = ErrorCode(1105)
Expand Down
41 changes: 38 additions & 3 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ var txAccessModeToEOTxAccessMode = map[sqlparser.TxAccessMode]querypb.ExecuteOpt
sqlparser.ReadOnly: querypb.ExecuteOptions_READ_ONLY,
}

type CommitPhase int

const (
COMMIT2PC_CREATETRANSACTION = iota
COMMIT2PC_PREPARE = iota
COMMIT2PC_STARTCOMMIT = iota
COMMIT2PC_PREPARECOMMIT
COMMIT2PC_CONCLUDE
)

// Begin begins a new transaction. If one is already in progress, it commits it
// and starts a new one.
func (txc *TxConn) Begin(ctx context.Context, session *SafeSession, txAccessModes []sqlparser.TxAccessMode) error {
Expand Down Expand Up @@ -178,7 +188,7 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error
}

// commit2PC will not used the pinned tablets - to make sure we use the current source, we need to use the gateway's queryservice
func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) (err error) {
if len(session.PreSessions) != 0 || len(session.PostSessions) != 0 {
_ = txc.Rollback(ctx, session)
return vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "pre or post actions not allowed for 2PC commits")
Expand All @@ -195,8 +205,29 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
}
mmShard := session.ShardSessions[0]
dtid := dtids.New(mmShard)
err := txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants)
if err != nil {

txPhase := COMMIT2PC_CREATETRANSACTION
defer func() {
if err == nil {
return
}
warningMsg := fmt.Sprintf("%s distributed transaction ID failed during", dtid)
switch txPhase {
case COMMIT2PC_CREATETRANSACTION:
warningMsg += " transaction record creation"
case COMMIT2PC_PREPARE:
warningMsg += " transaction prepare phase"
case COMMIT2PC_STARTCOMMIT:
warningMsg += " metadata manager commit"
case COMMIT2PC_PREPARECOMMIT:
warningMsg += " resource manager commit"
case COMMIT2PC_CONCLUDE:
warningMsg += " transaction conclusion"
}
session.RecordWarning(&querypb.QueryWarning{Code: uint32(sqlerror.ERInAtomicRecovery), Message: warningMsg})
}()

if err = txc.tabletGateway.CreateTransaction(ctx, mmShard.Target, dtid, participants); err != nil {
// Normal rollback is safe because nothing was prepared yet.
_ = txc.Rollback(ctx, session)
return err
Expand All @@ -209,6 +240,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
}
}

txPhase = COMMIT2PC_PREPARE
err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error {
if DebugTwoPc {
// Test code to simulate a failure during RM prepare
Expand All @@ -235,6 +267,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
}
}

txPhase = COMMIT2PC_STARTCOMMIT
err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid)
if err != nil {
return err
Expand All @@ -247,6 +280,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
}
}

txPhase = COMMIT2PC_PREPARECOMMIT
err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error {
if DebugTwoPc {
// Test code to simulate a failure during RM prepare
Expand All @@ -260,6 +294,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
return err
}

txPhase = COMMIT2PC_CONCLUDE
return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid)
}

Expand Down

0 comments on commit 68eb896

Please sign in to comment.