diff --git a/go/vt/vterrors/code.go b/go/vt/vterrors/code.go index d485c930b77..857ba538ebe 100644 --- a/go/vt/vterrors/code.go +++ b/go/vt/vterrors/code.go @@ -98,6 +98,7 @@ var ( VT09024 = errorWithoutState("VT09024", vtrpcpb.Code_FAILED_PRECONDITION, "could not map %v to a unique keyspace id: %v", "Unable to determine the shard for the given row.") VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.") + VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.") VT12001 = errorWithoutState("VT12001", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: %s", "This statement is unsupported by Vitess. Please rewrite your query to use supported syntax.") VT12002 = errorWithoutState("VT12002", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cross-shard foreign keys", "Vitess does not support cross shard foreign keys.") @@ -182,6 +183,7 @@ var ( VT09023, VT09024, VT10001, + VT10002, VT12001, VT12002, VT13001, diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index f8b08def10c..e388740ee6a 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -458,21 +458,21 @@ func (txc *TxConn) resolveTx(ctx context.Context, target *querypb.Target, transa case querypb.TransactionState_PREPARE: // If state is PREPARE, make a decision to rollback and // fallthrough to the rollback workflow. - if err := txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil { + if err = txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil { return err } fallthrough case querypb.TransactionState_ROLLBACK: - if err := txc.resumeRollback(ctx, target, transaction); err != nil { + if err = txc.resumeRollback(ctx, target, transaction); err != nil { return err } case querypb.TransactionState_COMMIT: - if err := txc.resumeCommit(ctx, target, transaction); err != nil { + if err = txc.resumeCommit(ctx, target, transaction); err != nil { return err } default: // Should never happen. - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid state: %v", transaction.State) + return vterrors.VT13001(fmt.Sprintf("invalid state: %v", transaction.State)) } return nil } diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index edf4438b8b2..5f4e7644766 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -63,14 +63,14 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { // If no queries were executed, we just rollback. if len(conn.TxProperties().Queries) == 0 { - conn.Release(tx.TxRollback) + dte.te.txPool.RollbackAndRelease(dte.ctx, conn) return nil } // If the connection is tainted, we cannot prepare it. As there could be temporary tables involved. if conn.IsTainted() { - conn.Release(tx.TxRollback) - return vterrors.VT12001("cannot prepare the transaction on a reserved connection") + dte.te.txPool.RollbackAndRelease(dte.ctx, conn) + return vterrors.VT10002("cannot prepare the transaction on a reserved connection") } err = dte.te.preparedPool.Put(conn, dtid) @@ -88,30 +88,34 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { // CommitPrepared commits a prepared transaction. If the operation // fails, an error counter is incremented and the transaction is // marked as failed in the redo log. -func (dte *DTExecutor) CommitPrepared(dtid string) error { +func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } defer dte.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now()) - conn, err := dte.te.preparedPool.FetchForCommit(dtid) + var conn *StatefulConnection + conn, err = dte.te.preparedPool.FetchForCommit(dtid) if err != nil { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, state: %v", dtid, err) } + // No connection means the transaction was already committed. if conn == nil { return nil } // We have to use a context that will never give up, // even if the original context expires. ctx := trace.CopySpan(context.Background(), dte.ctx) - defer dte.te.txPool.RollbackAndRelease(ctx, conn) - err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid) - if err != nil { - dte.markFailed(ctx, dtid) + defer func() { + if err != nil { + dte.markFailed(ctx, dtid) + log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err) + } + dte.te.txPool.RollbackAndRelease(ctx, conn) + }() + if err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid); err != nil { return err } - _, err = dte.te.txPool.Commit(ctx, conn) - if err != nil { - dte.markFailed(ctx, dtid) + if _, err = dte.te.txPool.Commit(ctx, conn); err != nil { return err } dte.te.preparedPool.Forget(dtid) @@ -207,6 +211,15 @@ func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error { } defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn) + // If the connection is tainted, we cannot take a commit decision on it. + if conn.IsTainted() { + dte.inTransaction(func(conn *StatefulConnection) error { + return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK) + }) + // return the error, defer call above will roll back the transaction. + return vterrors.VT10002("cannot commit the transaction on a reserved connection") + } + err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_COMMIT) if err != nil { return err @@ -228,6 +241,9 @@ func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error { // If the transaction is still open, it will be rolled back. // Otherwise, it would have been rolled back by other means, like a timeout or vttablet/mysql restart. dte.te.Rollback(dte.ctx, transactionID) + } else { + // This is a warning because it should not happen in normal operation. + log.Warningf("SetRollback called with no transactionID for dtid %s", dtid) } return dte.inTransaction(func(conn *StatefulConnection) error { diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index 448dd63bf5a..410374c0fb7 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -82,7 +82,7 @@ func TestDTExecutorPrepareResevedConn(t *testing.T) { txe.te.Reserve(ctx, nil, txid, nil) err := txe.Prepare(txid, "aa") - require.ErrorContains(t, err, "VT12001: unsupported: cannot prepare the transaction on a reserved connection") + require.ErrorContains(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction on a reserved connection") } func TestTxExecutorPrepareNotInTx(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool.go b/go/vt/vttablet/tabletserver/tx_prep_pool.go index 89547570cfc..22e0ce295c0 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool.go @@ -23,8 +23,8 @@ import ( ) var ( - errPrepCommitting = errors.New("committing") - errPrepFailed = errors.New("failed") + errPrepCommitting = errors.New("locked for committing") + errPrepFailed = errors.New("failed to commit") ) // TxPreparedPool manages connections for prepared transactions. diff --git a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go index a1cf50edb56..cd2b5a180c1 100644 --- a/go/vt/vttablet/tabletserver/tx_prep_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_prep_pool_test.go @@ -19,6 +19,7 @@ package tabletserver import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -82,38 +83,28 @@ func TestPrepFetchForCommit(t *testing.T) { conn := &StatefulConnection{} got, err := pp.FetchForCommit("aa") require.NoError(t, err) - if got != nil { - t.Errorf("Get(aa): %v, want nil", got) - } + assert.Nil(t, got) + pp.Put(conn, "aa") got, err = pp.FetchForCommit("aa") require.NoError(t, err) - if got != conn { - t.Errorf("pp.Get(aa): %p, want %p", got, conn) - } + assert.Equal(t, conn, got) + _, err = pp.FetchForCommit("aa") - want := "committing" - if err == nil || err.Error() != want { - t.Errorf("FetchForCommit err: %v, want %s", err, want) - } + assert.ErrorContains(t, err, "locked for committing") + pp.SetFailed("aa") _, err = pp.FetchForCommit("aa") - want = "failed" - if err == nil || err.Error() != want { - t.Errorf("FetchForCommit err: %v, want %s", err, want) - } + assert.ErrorContains(t, err, "failed to commit") + pp.SetFailed("bb") _, err = pp.FetchForCommit("bb") - want = "failed" - if err == nil || err.Error() != want { - t.Errorf("FetchForCommit err: %v, want %s", err, want) - } + assert.ErrorContains(t, err, "failed to commit") + pp.Forget("aa") got, err = pp.FetchForCommit("aa") require.NoError(t, err) - if got != nil { - t.Errorf("Get(aa): %v, want nil", got) - } + assert.Nil(t, got) } func TestPrepFetchAll(t *testing.T) {