Skip to content

Commit

Permalink
fix: prepare to rollback and release the transaction on reserved conn…
Browse files Browse the repository at this point in the history
…ection, log warn on failing to commit a prepare transaction, fail to accept a commit on MM

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Aug 6, 2024
1 parent 623e820 commit bf89751
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 40 deletions.
2 changes: 2 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ var (
VT09024 = errorWithoutState("VT09024", vtrpcpb.Code_FAILED_PRECONDITION, "could not map %v to a unique keyspace id: %v", "Unable to determine the shard for the given row.")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")
VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.")

VT12001 = errorWithoutState("VT12001", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: %s", "This statement is unsupported by Vitess. Please rewrite your query to use supported syntax.")
VT12002 = errorWithoutState("VT12002", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cross-shard foreign keys", "Vitess does not support cross shard foreign keys.")
Expand Down Expand Up @@ -182,6 +183,7 @@ var (
VT09023,
VT09024,
VT10001,
VT10002,
VT12001,
VT12002,
VT13001,
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,21 +458,21 @@ func (txc *TxConn) resolveTx(ctx context.Context, target *querypb.Target, transa
case querypb.TransactionState_PREPARE:
// If state is PREPARE, make a decision to rollback and
// fallthrough to the rollback workflow.
if err := txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil {
if err = txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil {
return err
}
fallthrough
case querypb.TransactionState_ROLLBACK:
if err := txc.resumeRollback(ctx, target, transaction); err != nil {
if err = txc.resumeRollback(ctx, target, transaction); err != nil {
return err
}
case querypb.TransactionState_COMMIT:
if err := txc.resumeCommit(ctx, target, transaction); err != nil {
if err = txc.resumeCommit(ctx, target, transaction); err != nil {
return err
}
default:
// Should never happen.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid state: %v", transaction.State)
return vterrors.VT13001(fmt.Sprintf("invalid state: %v", transaction.State))
}
return nil
}
Expand Down
40 changes: 28 additions & 12 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {

// If no queries were executed, we just rollback.
if len(conn.TxProperties().Queries) == 0 {
conn.Release(tx.TxRollback)
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
return nil
}

// If the connection is tainted, we cannot prepare it. As there could be temporary tables involved.
if conn.IsTainted() {
conn.Release(tx.TxRollback)
return vterrors.VT12001("cannot prepare the transaction on a reserved connection")
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
return vterrors.VT10002("cannot prepare the transaction on a reserved connection")
}

err = dte.te.preparedPool.Put(conn, dtid)
Expand All @@ -88,30 +88,34 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
// CommitPrepared commits a prepared transaction. If the operation
// fails, an error counter is incremented and the transaction is
// marked as failed in the redo log.
func (dte *DTExecutor) CommitPrepared(dtid string) error {
func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer dte.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now())
conn, err := dte.te.preparedPool.FetchForCommit(dtid)
var conn *StatefulConnection
conn, err = dte.te.preparedPool.FetchForCommit(dtid)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, state: %v", dtid, err)
}
// No connection means the transaction was already committed.
if conn == nil {
return nil
}
// We have to use a context that will never give up,
// even if the original context expires.
ctx := trace.CopySpan(context.Background(), dte.ctx)
defer dte.te.txPool.RollbackAndRelease(ctx, conn)
err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid)
if err != nil {
dte.markFailed(ctx, dtid)
defer func() {
if err != nil {
dte.markFailed(ctx, dtid)
log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err)
}
dte.te.txPool.RollbackAndRelease(ctx, conn)
}()
if err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid); err != nil {
return err
}
_, err = dte.te.txPool.Commit(ctx, conn)
if err != nil {
dte.markFailed(ctx, dtid)
if _, err = dte.te.txPool.Commit(ctx, conn); err != nil {
return err
}
dte.te.preparedPool.Forget(dtid)
Expand Down Expand Up @@ -207,6 +211,15 @@ func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error {
}
defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn)

// If the connection is tainted, we cannot take a commit decision on it.
if conn.IsTainted() {
dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK)
})
// return the error, defer call above will roll back the transaction.
return vterrors.VT10002("cannot commit the transaction on a reserved connection")
}

err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_COMMIT)
if err != nil {
return err
Expand All @@ -228,6 +241,9 @@ func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error {
// If the transaction is still open, it will be rolled back.
// Otherwise, it would have been rolled back by other means, like a timeout or vttablet/mysql restart.
dte.te.Rollback(dte.ctx, transactionID)
} else {
// This is a warning because it should not happen in normal operation.
log.Warningf("SetRollback called with no transactionID for dtid %s", dtid)
}

return dte.inTransaction(func(conn *StatefulConnection) error {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestDTExecutorPrepareResevedConn(t *testing.T) {
txe.te.Reserve(ctx, nil, txid, nil)

err := txe.Prepare(txid, "aa")
require.ErrorContains(t, err, "VT12001: unsupported: cannot prepare the transaction on a reserved connection")
require.ErrorContains(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction on a reserved connection")
}

func TestTxExecutorPrepareNotInTx(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/tx_prep_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
)

var (
errPrepCommitting = errors.New("committing")
errPrepFailed = errors.New("failed")
errPrepCommitting = errors.New("locked for committing")
errPrepFailed = errors.New("failed to commit")
)

// TxPreparedPool manages connections for prepared transactions.
Expand Down
33 changes: 12 additions & 21 deletions go/vt/vttablet/tabletserver/tx_prep_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tabletserver
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -82,38 +83,28 @@ func TestPrepFetchForCommit(t *testing.T) {
conn := &StatefulConnection{}
got, err := pp.FetchForCommit("aa")
require.NoError(t, err)
if got != nil {
t.Errorf("Get(aa): %v, want nil", got)
}
assert.Nil(t, got)

pp.Put(conn, "aa")
got, err = pp.FetchForCommit("aa")
require.NoError(t, err)
if got != conn {
t.Errorf("pp.Get(aa): %p, want %p", got, conn)
}
assert.Equal(t, conn, got)

_, err = pp.FetchForCommit("aa")
want := "committing"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
assert.ErrorContains(t, err, "locked for committing")

pp.SetFailed("aa")
_, err = pp.FetchForCommit("aa")
want = "failed"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
assert.ErrorContains(t, err, "failed to commit")

pp.SetFailed("bb")
_, err = pp.FetchForCommit("bb")
want = "failed"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
assert.ErrorContains(t, err, "failed to commit")

pp.Forget("aa")
got, err = pp.FetchForCommit("aa")
require.NoError(t, err)
if got != nil {
t.Errorf("Get(aa): %v, want nil", got)
}
assert.Nil(t, got)
}

func TestPrepFetchAll(t *testing.T) {
Expand Down

0 comments on commit bf89751

Please sign in to comment.