Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vttablet api distributed transaction changes #16506

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ var (
VT09024 = errorWithoutState("VT09024", vtrpcpb.Code_FAILED_PRECONDITION, "could not map %v to a unique keyspace id: %v", "Unable to determine the shard for the given row.")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")
VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "atomic distributed transaction not allowed: %s", "The distributed transaction cannot be committed. A rollback decision is taken.")

VT12001 = errorWithoutState("VT12001", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: %s", "This statement is unsupported by Vitess. Please rewrite your query to use supported syntax.")
VT12002 = errorWithoutState("VT12002", vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cross-shard foreign keys", "Vitess does not support cross shard foreign keys.")
Expand Down Expand Up @@ -182,6 +183,7 @@ var (
VT09023,
VT09024,
VT10001,
VT10002,
VT12001,
VT12002,
VT13001,
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,21 +458,21 @@ func (txc *TxConn) resolveTx(ctx context.Context, target *querypb.Target, transa
case querypb.TransactionState_PREPARE:
// If state is PREPARE, make a decision to rollback and
// fallthrough to the rollback workflow.
if err := txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil {
if err = txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil {
return err
}
fallthrough
case querypb.TransactionState_ROLLBACK:
if err := txc.resumeRollback(ctx, target, transaction); err != nil {
if err = txc.resumeRollback(ctx, target, transaction); err != nil {
return err
}
case querypb.TransactionState_COMMIT:
if err := txc.resumeCommit(ctx, target, transaction); err != nil {
if err = txc.resumeCommit(ctx, target, transaction); err != nil {
return err
}
default:
// Should never happen.
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid state: %v", transaction.State)
return vterrors.VT13001(fmt.Sprintf("invalid state: %v", transaction.State))
}
return nil
}
Expand Down
40 changes: 28 additions & 12 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {

// If no queries were executed, we just rollback.
if len(conn.TxProperties().Queries) == 0 {
conn.Release(tx.TxRollback)
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
return nil
}

// If the connection is tainted, we cannot prepare it. As there could be temporary tables involved.
if conn.IsTainted() {
conn.Release(tx.TxRollback)
return vterrors.VT12001("cannot prepare the transaction on a reserved connection")
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
return vterrors.VT10002("cannot prepare the transaction on a reserved connection")
}

err = dte.te.preparedPool.Put(conn, dtid)
Expand All @@ -88,30 +88,34 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
// CommitPrepared commits a prepared transaction. If the operation
// fails, an error counter is incremented and the transaction is
// marked as failed in the redo log.
func (dte *DTExecutor) CommitPrepared(dtid string) error {
func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer dte.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now())
conn, err := dte.te.preparedPool.FetchForCommit(dtid)
var conn *StatefulConnection
conn, err = dte.te.preparedPool.FetchForCommit(dtid)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, state: %v", dtid, err)
}
// No connection means the transaction was already committed.
if conn == nil {
return nil
}
// We have to use a context that will never give up,
// even if the original context expires.
ctx := trace.CopySpan(context.Background(), dte.ctx)
defer dte.te.txPool.RollbackAndRelease(ctx, conn)
err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid)
if err != nil {
dte.markFailed(ctx, dtid)
defer func() {
if err != nil {
dte.markFailed(ctx, dtid)
log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err)
}
dte.te.txPool.RollbackAndRelease(ctx, conn)
}()
if err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid); err != nil {
return err
}
_, err = dte.te.txPool.Commit(ctx, conn)
if err != nil {
dte.markFailed(ctx, dtid)
if _, err = dte.te.txPool.Commit(ctx, conn); err != nil {
return err
}
dte.te.preparedPool.Forget(dtid)
Expand Down Expand Up @@ -207,6 +211,15 @@ func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error {
}
defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn)

// If the connection is tainted, we cannot take a commit decision on it.
if conn.IsTainted() {
dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK)
})
// return the error, defer call above will roll back the transaction.
return vterrors.VT10002("cannot commit the transaction on a reserved connection")
}

err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_COMMIT)
if err != nil {
return err
Expand All @@ -228,6 +241,9 @@ func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error {
// If the transaction is still open, it will be rolled back.
// Otherwise, it would have been rolled back by other means, like a timeout or vttablet/mysql restart.
dte.te.Rollback(dte.ctx, transactionID)
} else {
// This is a warning because it should not happen in normal operation.
log.Warningf("SetRollback called with no transactionID for dtid %s", dtid)
}

return dte.inTransaction(func(conn *StatefulConnection) error {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestDTExecutorPrepareResevedConn(t *testing.T) {
txe.te.Reserve(ctx, nil, txid, nil)

err := txe.Prepare(txid, "aa")
require.ErrorContains(t, err, "VT12001: unsupported: cannot prepare the transaction on a reserved connection")
require.ErrorContains(t, err, "VT10002: atomic distributed transaction not allowed: cannot prepare the transaction on a reserved connection")
}

func TestTxExecutorPrepareNotInTx(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/tx_prep_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
)

var (
errPrepCommitting = errors.New("committing")
errPrepFailed = errors.New("failed")
errPrepCommitting = errors.New("locked for committing")
errPrepFailed = errors.New("failed to commit")
)

// TxPreparedPool manages connections for prepared transactions.
Expand Down
33 changes: 12 additions & 21 deletions go/vt/vttablet/tabletserver/tx_prep_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package tabletserver
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -82,38 +83,28 @@ func TestPrepFetchForCommit(t *testing.T) {
conn := &StatefulConnection{}
got, err := pp.FetchForCommit("aa")
require.NoError(t, err)
if got != nil {
t.Errorf("Get(aa): %v, want nil", got)
}
assert.Nil(t, got)

pp.Put(conn, "aa")
got, err = pp.FetchForCommit("aa")
require.NoError(t, err)
if got != conn {
t.Errorf("pp.Get(aa): %p, want %p", got, conn)
}
assert.Equal(t, conn, got)

_, err = pp.FetchForCommit("aa")
want := "committing"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
assert.ErrorContains(t, err, "locked for committing")

pp.SetFailed("aa")
_, err = pp.FetchForCommit("aa")
want = "failed"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
assert.ErrorContains(t, err, "failed to commit")

pp.SetFailed("bb")
_, err = pp.FetchForCommit("bb")
want = "failed"
if err == nil || err.Error() != want {
t.Errorf("FetchForCommit err: %v, want %s", err, want)
}
assert.ErrorContains(t, err, "failed to commit")

pp.Forget("aa")
got, err = pp.FetchForCommit("aa")
require.NoError(t, err)
if got != nil {
t.Errorf("Get(aa): %v, want nil", got)
}
assert.Nil(t, got)
}

func TestPrepFetchAll(t *testing.T) {
Expand Down
Loading