Skip to content

Commit

Permalink
reject prepare when connection is a reserved connection
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Jul 2, 2024
1 parent 05d7e76 commit 22dea71
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,34 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

// TxExecutor is used for executing a transactional request.
// TODO: merge this with tx_engine
type TxExecutor struct {
// TODO(sougou): Parameterize this.
// DTExecutor is used for executing a distributed transactional request.
type DTExecutor struct {
ctx context.Context
logStats *tabletenv.LogStats
te *TxEngine
}

// NewDTExecutor creates a new distributed transaction executor.
func NewDTExecutor(ctx context.Context, te *TxEngine, logStats *tabletenv.LogStats) *DTExecutor {
return &DTExecutor{
ctx: ctx,
te: te,
logStats: logStats,
}
}

// Prepare performs a prepare on a connection including the redo log work.
// If there is any failure, an error is returned. No cleanup is performed.
// A subsequent call to RollbackPrepared, which is required by the 2PC
// protocol, will perform all the cleanup.
func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("PREPARE", time.Now())
txe.logStats.TransactionID = transactionID
defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now())
dte.logStats.TransactionID = transactionID

conn, err := txe.te.txPool.GetAndLock(transactionID, "for prepare")
conn, err := dte.te.txPool.GetAndLock(transactionID, "for prepare")
if err != nil {
return err
}
Expand All @@ -62,27 +69,33 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error {
return nil
}

err = txe.te.preparedPool.Put(conn, dtid)
// If the connection is tainted, we cannot prepare it. As there could be temporary tables involved.
if conn.IsTainted() {
conn.Release(tx.TxRollback)
return vterrors.VT12001("cannot prepare the transaction on a reserved connection")
}

err = dte.te.preparedPool.Put(conn, dtid)
if err != nil {
txe.te.txPool.RollbackAndRelease(txe.ctx, conn)
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err)
}

return txe.inTransaction(func(localConn *StatefulConnection) error {
return txe.te.twoPC.SaveRedo(txe.ctx, localConn, dtid, conn.TxProperties().Queries)
return dte.inTransaction(func(localConn *StatefulConnection) error {
return dte.te.twoPC.SaveRedo(dte.ctx, localConn, dtid, conn.TxProperties().Queries)
})

}

// CommitPrepared commits a prepared transaction. If the operation
// fails, an error counter is incremented and the transaction is
// marked as failed in the redo log.
func (txe *TxExecutor) CommitPrepared(dtid string) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) CommitPrepared(dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now())
conn, err := txe.te.preparedPool.FetchForCommit(dtid)
defer dte.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now())
conn, err := dte.te.preparedPool.FetchForCommit(dtid)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, state: %v", dtid, err)
}
Expand All @@ -91,19 +104,19 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error {
}
// We have to use a context that will never give up,
// even if the original context expires.
ctx := trace.CopySpan(context.Background(), txe.ctx)
defer txe.te.txPool.RollbackAndRelease(ctx, conn)
err = txe.te.twoPC.DeleteRedo(ctx, conn, dtid)
ctx := trace.CopySpan(context.Background(), dte.ctx)
defer dte.te.txPool.RollbackAndRelease(ctx, conn)
err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid)
if err != nil {
txe.markFailed(ctx, dtid)
dte.markFailed(ctx, dtid)
return err
}
_, err = txe.te.txPool.Commit(ctx, conn)
_, err = dte.te.txPool.Commit(ctx, conn)
if err != nil {
txe.markFailed(ctx, dtid)
dte.markFailed(ctx, dtid)
return err
}
txe.te.preparedPool.Forget(dtid)
dte.te.preparedPool.Forget(dtid)
return nil
}

Expand All @@ -113,23 +126,23 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error {
// state of the transaction in the redo log as failed. If the
// state change does not succeed, it just logs the event.
// The function uses the passed in context that has no timeout
// instead of TxExecutor's context.
func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) {
txe.te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
txe.te.preparedPool.SetFailed(dtid)
conn, _, _, err := txe.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
// instead of DTExecutor's context.
func (dte *DTExecutor) markFailed(ctx context.Context, dtid string) {
dte.te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
dte.te.preparedPool.SetFailed(dtid)
conn, _, _, err := dte.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
}
defer txe.te.txPool.RollbackAndRelease(ctx, conn)
defer dte.te.txPool.RollbackAndRelease(ctx, conn)

if err = txe.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil {
if err = dte.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil {
log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err)
return
}

if _, err = txe.te.txPool.Commit(ctx, conn); err != nil {
if _, err = dte.te.txPool.Commit(ctx, conn); err != nil {
log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err)
}
}
Expand All @@ -152,126 +165,126 @@ func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) {
// If so, it must be set to 0, and the function will not attempt that
// step. If the original transaction is still alive, the transaction
// killer will be the one to eventually roll it back.
func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) RollbackPrepared(dtid string, originalID int64) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("ROLLBACK_PREPARED", time.Now())
defer dte.te.env.Stats().QueryTimings.Record("ROLLBACK_PREPARED", time.Now())
defer func() {
if preparedConn := txe.te.preparedPool.FetchForRollback(dtid); preparedConn != nil {
txe.te.txPool.RollbackAndRelease(txe.ctx, preparedConn)
if preparedConn := dte.te.preparedPool.FetchForRollback(dtid); preparedConn != nil {
dte.te.txPool.RollbackAndRelease(dte.ctx, preparedConn)
}
if originalID != 0 {
txe.te.Rollback(txe.ctx, originalID)
dte.te.Rollback(dte.ctx, originalID)
}
}()
return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.DeleteRedo(txe.ctx, conn, dtid)
return dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.DeleteRedo(dte.ctx, conn, dtid)
})
}

// CreateTransaction creates the metadata for a 2PC transaction.
func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Target) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) CreateTransaction(dtid string, participants []*querypb.Target) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("CREATE_TRANSACTION", time.Now())
return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.CreateTransaction(txe.ctx, conn, dtid, participants)
defer dte.te.env.Stats().QueryTimings.Record("CREATE_TRANSACTION", time.Now())
return dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.CreateTransaction(dte.ctx, conn, dtid, participants)
})
}

// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
func (txe *TxExecutor) StartCommit(transactionID int64, dtid string) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("START_COMMIT", time.Now())
txe.logStats.TransactionID = transactionID
defer dte.te.env.Stats().QueryTimings.Record("START_COMMIT", time.Now())
dte.logStats.TransactionID = transactionID

conn, err := txe.te.txPool.GetAndLock(transactionID, "for 2pc commit")
conn, err := dte.te.txPool.GetAndLock(transactionID, "for 2pc commit")
if err != nil {
return err
}
defer txe.te.txPool.RollbackAndRelease(txe.ctx, conn)
defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn)

err = txe.te.twoPC.Transition(txe.ctx, conn, dtid, querypb.TransactionState_COMMIT)
err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_COMMIT)
if err != nil {
return err
}
_, err = txe.te.txPool.Commit(txe.ctx, conn)
_, err = dte.te.txPool.Commit(dte.ctx, conn)
return err
}

// SetRollback transitions the 2pc transaction to the Rollback state.
// If a transaction id is provided, that transaction is also rolled back.
func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("SET_ROLLBACK", time.Now())
txe.logStats.TransactionID = transactionID
defer dte.te.env.Stats().QueryTimings.Record("SET_ROLLBACK", time.Now())
dte.logStats.TransactionID = transactionID

if transactionID != 0 {
txe.te.Rollback(txe.ctx, transactionID)
dte.te.Rollback(dte.ctx, transactionID)
}

return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.Transition(txe.ctx, conn, dtid, querypb.TransactionState_ROLLBACK)
return dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK)
})
}

// ConcludeTransaction deletes the 2pc transaction metadata
// essentially resolving it.
func (txe *TxExecutor) ConcludeTransaction(dtid string) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) ConcludeTransaction(dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("RESOLVE", time.Now())
defer dte.te.env.Stats().QueryTimings.Record("RESOLVE", time.Now())

return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.DeleteTransaction(txe.ctx, conn, dtid)
return dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.DeleteTransaction(dte.ctx, conn, dtid)
})
}

// ReadTransaction returns the metadata for the specified dtid.
func (txe *TxExecutor) ReadTransaction(dtid string) (*querypb.TransactionMetadata, error) {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) ReadTransaction(dtid string) (*querypb.TransactionMetadata, error) {
if !dte.te.twopcEnabled {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
return txe.te.twoPC.ReadTransaction(txe.ctx, dtid)
return dte.te.twoPC.ReadTransaction(dte.ctx, dtid)
}

// ReadTwopcInflight returns info about all in-flight 2pc transactions.
func (txe *TxExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, prepared, failed []*tx.PreparedTx, err error) {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, prepared, failed []*tx.PreparedTx, err error) {
if !dte.te.twopcEnabled {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
prepared, failed, err = txe.te.twoPC.ReadAllRedo(txe.ctx)
prepared, failed, err = dte.te.twoPC.ReadAllRedo(dte.ctx)
if err != nil {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Could not read redo: %v", err)
}
distributed, err = txe.te.twoPC.ReadAllTransactions(txe.ctx)
distributed, err = dte.te.twoPC.ReadAllTransactions(dte.ctx)
if err != nil {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Could not read redo: %v", err)
}
return distributed, prepared, failed, nil
}

func (txe *TxExecutor) inTransaction(f func(*StatefulConnection) error) error {
conn, _, _, err := txe.te.txPool.Begin(txe.ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
func (dte *DTExecutor) inTransaction(f func(*StatefulConnection) error) error {
conn, _, _, err := dte.te.txPool.Begin(dte.ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
return err
}
defer txe.te.txPool.RollbackAndRelease(txe.ctx, conn)
defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn)

err = f(conn)
if err != nil {
return err
}

_, err = txe.te.txPool.Commit(txe.ctx, conn)
_, err = dte.te.txPool.Commit(dte.ctx, conn)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ func TestTxExecutorPrepare(t *testing.T) {
require.NoError(t, err)
}

// TestTxExecutorPrepareResevedConn tests the case where a reserved connection is used for prepare.
func TestDTExecutorPrepareResevedConn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txe, tsv, db := newTestTxExecutor(t, ctx)
defer db.Close()
defer tsv.StopService()
txid := newTxForPrep(ctx, tsv)

// Reserve a connection
txe.te.Reserve(ctx, nil, txid, nil)

err := txe.Prepare(txid, "aa")
require.ErrorContains(t, err, "VT12001: unsupported: cannot prepare the transaction on a reserved connection")
}

func TestTxExecutorPrepareNotInTx(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -533,7 +549,7 @@ func TestNoTwopc(t *testing.T) {
}
}

func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) {
func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) {
db = setUpQueryExecutorTest(t)
logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor")
tsv = newTestTabletServer(ctx, smallTxPool, db)
Expand All @@ -542,15 +558,15 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv
db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{})
return &TxExecutor{
return &DTExecutor{
ctx: ctx,
logStats: logStats,
te: tsv.te,
}, tsv, db
}

// newShortAgeExecutor is same as newTestTxExecutor, but shorter transaction abandon age.
func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) {
func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) {
db = setUpQueryExecutorTest(t)
logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor")
tsv = newTestTabletServer(ctx, smallTxPool|shortTwopcAge, db)
Expand All @@ -559,19 +575,19 @@ func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, ts
db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{})
return &TxExecutor{
return &DTExecutor{
ctx: ctx,
logStats: logStats,
te: tsv.te,
}, tsv, db
}

// newNoTwopcExecutor is same as newTestTxExecutor, but 2pc disabled.
func newNoTwopcExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) {
func newNoTwopcExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) {
db = setUpQueryExecutorTest(t)
logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor")
tsv = newTestTabletServer(ctx, noTwopc, db)
return &TxExecutor{
return &DTExecutor{
ctx: ctx,
logStats: logStats,
te: tsv.te,
Expand Down
Loading

0 comments on commit 22dea71

Please sign in to comment.