From 22dea710a4607647030beb5862ff2dcc6939f9e1 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 2 Jul 2024 21:42:45 +0530 Subject: [PATCH] reject prepare when connection is a reserved connection Signed-off-by: Harshit Gangal --- .../{tx_executor.go => dt_executor.go} | 165 ++++++++++-------- ...x_executor_test.go => dt_executor_test.go} | 28 ++- go/vt/vttablet/tabletserver/tabletserver.go | 54 +----- go/vt/vttablet/tabletserver/twopcz.go | 2 +- 4 files changed, 121 insertions(+), 128 deletions(-) rename go/vt/vttablet/tabletserver/{tx_executor.go => dt_executor.go} (57%) rename go/vt/vttablet/tabletserver/{tx_executor_test.go => dt_executor_test.go} (95%) diff --git a/go/vt/vttablet/tabletserver/tx_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go similarity index 57% rename from go/vt/vttablet/tabletserver/tx_executor.go rename to go/vt/vttablet/tabletserver/dt_executor.go index 93d18a200f9..baaa8b43a4e 100644 --- a/go/vt/vttablet/tabletserver/tx_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -31,27 +31,34 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) -// TxExecutor is used for executing a transactional request. -// TODO: merge this with tx_engine -type TxExecutor struct { - // TODO(sougou): Parameterize this. +// DTExecutor is used for executing a distributed transactional request. +type DTExecutor struct { ctx context.Context logStats *tabletenv.LogStats te *TxEngine } +// NewDTExecutor creates a new distributed transaction executor. +func NewDTExecutor(ctx context.Context, te *TxEngine, logStats *tabletenv.LogStats) *DTExecutor { + return &DTExecutor{ + ctx: ctx, + te: te, + logStats: logStats, + } +} + // Prepare performs a prepare on a connection including the redo log work. // If there is any failure, an error is returned. No cleanup is performed. // A subsequent call to RollbackPrepared, which is required by the 2PC // protocol, will perform all the cleanup. -func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { - if !txe.te.twopcEnabled { +func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { + if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } - defer txe.te.env.Stats().QueryTimings.Record("PREPARE", time.Now()) - txe.logStats.TransactionID = transactionID + defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now()) + dte.logStats.TransactionID = transactionID - conn, err := txe.te.txPool.GetAndLock(transactionID, "for prepare") + conn, err := dte.te.txPool.GetAndLock(transactionID, "for prepare") if err != nil { return err } @@ -62,14 +69,20 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { return nil } - err = txe.te.preparedPool.Put(conn, dtid) + // If the connection is tainted, we cannot prepare it. As there could be temporary tables involved. + if conn.IsTainted() { + conn.Release(tx.TxRollback) + return vterrors.VT12001("cannot prepare the transaction on a reserved connection") + } + + err = dte.te.preparedPool.Put(conn, dtid) if err != nil { - txe.te.txPool.RollbackAndRelease(txe.ctx, conn) + dte.te.txPool.RollbackAndRelease(dte.ctx, conn) return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err) } - return txe.inTransaction(func(localConn *StatefulConnection) error { - return txe.te.twoPC.SaveRedo(txe.ctx, localConn, dtid, conn.TxProperties().Queries) + return dte.inTransaction(func(localConn *StatefulConnection) error { + return dte.te.twoPC.SaveRedo(dte.ctx, localConn, dtid, conn.TxProperties().Queries) }) } @@ -77,12 +90,12 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error { // CommitPrepared commits a prepared transaction. If the operation // fails, an error counter is incremented and the transaction is // marked as failed in the redo log. -func (txe *TxExecutor) CommitPrepared(dtid string) error { - if !txe.te.twopcEnabled { +func (dte *DTExecutor) CommitPrepared(dtid string) error { + if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } - defer txe.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now()) - conn, err := txe.te.preparedPool.FetchForCommit(dtid) + defer dte.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now()) + conn, err := dte.te.preparedPool.FetchForCommit(dtid) if err != nil { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, state: %v", dtid, err) } @@ -91,19 +104,19 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error { } // We have to use a context that will never give up, // even if the original context expires. - ctx := trace.CopySpan(context.Background(), txe.ctx) - defer txe.te.txPool.RollbackAndRelease(ctx, conn) - err = txe.te.twoPC.DeleteRedo(ctx, conn, dtid) + ctx := trace.CopySpan(context.Background(), dte.ctx) + defer dte.te.txPool.RollbackAndRelease(ctx, conn) + err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid) if err != nil { - txe.markFailed(ctx, dtid) + dte.markFailed(ctx, dtid) return err } - _, err = txe.te.txPool.Commit(ctx, conn) + _, err = dte.te.txPool.Commit(ctx, conn) if err != nil { - txe.markFailed(ctx, dtid) + dte.markFailed(ctx, dtid) return err } - txe.te.preparedPool.Forget(dtid) + dte.te.preparedPool.Forget(dtid) return nil } @@ -113,23 +126,23 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error { // state of the transaction in the redo log as failed. If the // state change does not succeed, it just logs the event. // The function uses the passed in context that has no timeout -// instead of TxExecutor's context. -func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) { - txe.te.env.Stats().InternalErrors.Add("TwopcCommit", 1) - txe.te.preparedPool.SetFailed(dtid) - conn, _, _, err := txe.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) +// instead of DTExecutor's context. +func (dte *DTExecutor) markFailed(ctx context.Context, dtid string) { + dte.te.env.Stats().InternalErrors.Add("TwopcCommit", 1) + dte.te.preparedPool.SetFailed(dtid) + conn, _, _, err := dte.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) if err != nil { log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) return } - defer txe.te.txPool.RollbackAndRelease(ctx, conn) + defer dte.te.txPool.RollbackAndRelease(ctx, conn) - if err = txe.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil { + if err = dte.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil { log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err) return } - if _, err = txe.te.txPool.Commit(ctx, conn); err != nil { + if _, err = dte.te.txPool.Commit(ctx, conn); err != nil { log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err) } } @@ -152,126 +165,126 @@ func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) { // If so, it must be set to 0, and the function will not attempt that // step. If the original transaction is still alive, the transaction // killer will be the one to eventually roll it back. -func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error { - if !txe.te.twopcEnabled { +func (dte *DTExecutor) RollbackPrepared(dtid string, originalID int64) error { + if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } - defer txe.te.env.Stats().QueryTimings.Record("ROLLBACK_PREPARED", time.Now()) + defer dte.te.env.Stats().QueryTimings.Record("ROLLBACK_PREPARED", time.Now()) defer func() { - if preparedConn := txe.te.preparedPool.FetchForRollback(dtid); preparedConn != nil { - txe.te.txPool.RollbackAndRelease(txe.ctx, preparedConn) + if preparedConn := dte.te.preparedPool.FetchForRollback(dtid); preparedConn != nil { + dte.te.txPool.RollbackAndRelease(dte.ctx, preparedConn) } if originalID != 0 { - txe.te.Rollback(txe.ctx, originalID) + dte.te.Rollback(dte.ctx, originalID) } }() - return txe.inTransaction(func(conn *StatefulConnection) error { - return txe.te.twoPC.DeleteRedo(txe.ctx, conn, dtid) + return dte.inTransaction(func(conn *StatefulConnection) error { + return dte.te.twoPC.DeleteRedo(dte.ctx, conn, dtid) }) } // CreateTransaction creates the metadata for a 2PC transaction. -func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Target) error { - if !txe.te.twopcEnabled { +func (dte *DTExecutor) CreateTransaction(dtid string, participants []*querypb.Target) error { + if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } - defer txe.te.env.Stats().QueryTimings.Record("CREATE_TRANSACTION", time.Now()) - return txe.inTransaction(func(conn *StatefulConnection) error { - return txe.te.twoPC.CreateTransaction(txe.ctx, conn, dtid, participants) + defer dte.te.env.Stats().QueryTimings.Record("CREATE_TRANSACTION", time.Now()) + return dte.inTransaction(func(conn *StatefulConnection) error { + return dte.te.twoPC.CreateTransaction(dte.ctx, conn, dtid, participants) }) } // StartCommit atomically commits the transaction along with the // decision to commit the associated 2pc transaction. -func (txe *TxExecutor) StartCommit(transactionID int64, dtid string) error { - if !txe.te.twopcEnabled { +func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error { + if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } - defer txe.te.env.Stats().QueryTimings.Record("START_COMMIT", time.Now()) - txe.logStats.TransactionID = transactionID + defer dte.te.env.Stats().QueryTimings.Record("START_COMMIT", time.Now()) + dte.logStats.TransactionID = transactionID - conn, err := txe.te.txPool.GetAndLock(transactionID, "for 2pc commit") + conn, err := dte.te.txPool.GetAndLock(transactionID, "for 2pc commit") if err != nil { return err } - defer txe.te.txPool.RollbackAndRelease(txe.ctx, conn) + defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn) - err = txe.te.twoPC.Transition(txe.ctx, conn, dtid, querypb.TransactionState_COMMIT) + err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_COMMIT) if err != nil { return err } - _, err = txe.te.txPool.Commit(txe.ctx, conn) + _, err = dte.te.txPool.Commit(dte.ctx, conn) return err } // SetRollback transitions the 2pc transaction to the Rollback state. // If a transaction id is provided, that transaction is also rolled back. -func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error { - if !txe.te.twopcEnabled { +func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error { + if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } - defer txe.te.env.Stats().QueryTimings.Record("SET_ROLLBACK", time.Now()) - txe.logStats.TransactionID = transactionID + defer dte.te.env.Stats().QueryTimings.Record("SET_ROLLBACK", time.Now()) + dte.logStats.TransactionID = transactionID if transactionID != 0 { - txe.te.Rollback(txe.ctx, transactionID) + dte.te.Rollback(dte.ctx, transactionID) } - return txe.inTransaction(func(conn *StatefulConnection) error { - return txe.te.twoPC.Transition(txe.ctx, conn, dtid, querypb.TransactionState_ROLLBACK) + return dte.inTransaction(func(conn *StatefulConnection) error { + return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK) }) } // ConcludeTransaction deletes the 2pc transaction metadata // essentially resolving it. -func (txe *TxExecutor) ConcludeTransaction(dtid string) error { - if !txe.te.twopcEnabled { +func (dte *DTExecutor) ConcludeTransaction(dtid string) error { + if !dte.te.twopcEnabled { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } - defer txe.te.env.Stats().QueryTimings.Record("RESOLVE", time.Now()) + defer dte.te.env.Stats().QueryTimings.Record("RESOLVE", time.Now()) - return txe.inTransaction(func(conn *StatefulConnection) error { - return txe.te.twoPC.DeleteTransaction(txe.ctx, conn, dtid) + return dte.inTransaction(func(conn *StatefulConnection) error { + return dte.te.twoPC.DeleteTransaction(dte.ctx, conn, dtid) }) } // ReadTransaction returns the metadata for the specified dtid. -func (txe *TxExecutor) ReadTransaction(dtid string) (*querypb.TransactionMetadata, error) { - if !txe.te.twopcEnabled { +func (dte *DTExecutor) ReadTransaction(dtid string) (*querypb.TransactionMetadata, error) { + if !dte.te.twopcEnabled { return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } - return txe.te.twoPC.ReadTransaction(txe.ctx, dtid) + return dte.te.twoPC.ReadTransaction(dte.ctx, dtid) } // ReadTwopcInflight returns info about all in-flight 2pc transactions. -func (txe *TxExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, prepared, failed []*tx.PreparedTx, err error) { - if !txe.te.twopcEnabled { +func (dte *DTExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, prepared, failed []*tx.PreparedTx, err error) { + if !dte.te.twopcEnabled { return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled") } - prepared, failed, err = txe.te.twoPC.ReadAllRedo(txe.ctx) + prepared, failed, err = dte.te.twoPC.ReadAllRedo(dte.ctx) if err != nil { return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Could not read redo: %v", err) } - distributed, err = txe.te.twoPC.ReadAllTransactions(txe.ctx) + distributed, err = dte.te.twoPC.ReadAllTransactions(dte.ctx) if err != nil { return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Could not read redo: %v", err) } return distributed, prepared, failed, nil } -func (txe *TxExecutor) inTransaction(f func(*StatefulConnection) error) error { - conn, _, _, err := txe.te.txPool.Begin(txe.ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) +func (dte *DTExecutor) inTransaction(f func(*StatefulConnection) error) error { + conn, _, _, err := dte.te.txPool.Begin(dte.ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) if err != nil { return err } - defer txe.te.txPool.RollbackAndRelease(txe.ctx, conn) + defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn) err = f(conn) if err != nil { return err } - _, err = txe.te.txPool.Commit(txe.ctx, conn) + _, err = dte.te.txPool.Commit(dte.ctx, conn) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/tx_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go similarity index 95% rename from go/vt/vttablet/tabletserver/tx_executor_test.go rename to go/vt/vttablet/tabletserver/dt_executor_test.go index c3949240147..6637cc83841 100644 --- a/go/vt/vttablet/tabletserver/tx_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -71,6 +71,22 @@ func TestTxExecutorPrepare(t *testing.T) { require.NoError(t, err) } +// TestTxExecutorPrepareResevedConn tests the case where a reserved connection is used for prepare. +func TestDTExecutorPrepareResevedConn(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + txe, tsv, db := newTestTxExecutor(t, ctx) + defer db.Close() + defer tsv.StopService() + txid := newTxForPrep(ctx, tsv) + + // Reserve a connection + txe.te.Reserve(ctx, nil, txid, nil) + + err := txe.Prepare(txid, "aa") + require.ErrorContains(t, err, "VT12001: unsupported: cannot prepare the transaction on a reserved connection") +} + func TestTxExecutorPrepareNotInTx(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -533,7 +549,7 @@ func TestNoTwopc(t *testing.T) { } } -func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) { +func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) { db = setUpQueryExecutorTest(t) logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor") tsv = newTestTabletServer(ctx, smallTxPool, db) @@ -542,7 +558,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{}) db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{}) db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{}) - return &TxExecutor{ + return &DTExecutor{ ctx: ctx, logStats: logStats, te: tsv.te, @@ -550,7 +566,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv } // newShortAgeExecutor is same as newTestTxExecutor, but shorter transaction abandon age. -func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) { +func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) { db = setUpQueryExecutorTest(t) logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor") tsv = newTestTabletServer(ctx, smallTxPool|shortTwopcAge, db) @@ -559,7 +575,7 @@ func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, ts db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{}) db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{}) db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{}) - return &TxExecutor{ + return &DTExecutor{ ctx: ctx, logStats: logStats, te: tsv.te, @@ -567,11 +583,11 @@ func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, ts } // newNoTwopcExecutor is same as newTestTxExecutor, but 2pc disabled. -func newNoTwopcExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) { +func newNoTwopcExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) { db = setUpQueryExecutorTest(t) logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor") tsv = newTestTabletServer(ctx, noTwopc, db) - return &TxExecutor{ + return &DTExecutor{ ctx: ctx, logStats: logStats, te: tsv.te, diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index eb140454c2a..7a3d7cc5c06 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -645,11 +645,7 @@ func (tsv *TabletServer) Prepare(ctx context.Context, target *querypb.Target, tr "Prepare", "prepare", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := &TxExecutor{ - ctx: ctx, - logStats: logStats, - te: tsv.te, - } + txe := NewDTExecutor(ctx, tsv.te, logStats) return txe.Prepare(transactionID, dtid) }, ) @@ -662,11 +658,7 @@ func (tsv *TabletServer) CommitPrepared(ctx context.Context, target *querypb.Tar "CommitPrepared", "commit_prepared", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := &TxExecutor{ - ctx: ctx, - logStats: logStats, - te: tsv.te, - } + txe := NewDTExecutor(ctx, tsv.te, logStats) return txe.CommitPrepared(dtid) }, ) @@ -679,11 +671,7 @@ func (tsv *TabletServer) RollbackPrepared(ctx context.Context, target *querypb.T "RollbackPrepared", "rollback_prepared", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := &TxExecutor{ - ctx: ctx, - logStats: logStats, - te: tsv.te, - } + txe := NewDTExecutor(ctx, tsv.te, logStats) return txe.RollbackPrepared(dtid, originalID) }, ) @@ -696,11 +684,7 @@ func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb. "CreateTransaction", "create_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := &TxExecutor{ - ctx: ctx, - logStats: logStats, - te: tsv.te, - } + txe := NewDTExecutor(ctx, tsv.te, logStats) return txe.CreateTransaction(dtid, participants) }, ) @@ -714,11 +698,7 @@ func (tsv *TabletServer) StartCommit(ctx context.Context, target *querypb.Target "StartCommit", "start_commit", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := &TxExecutor{ - ctx: ctx, - logStats: logStats, - te: tsv.te, - } + txe := NewDTExecutor(ctx, tsv.te, logStats) return txe.StartCommit(transactionID, dtid) }, ) @@ -732,11 +712,7 @@ func (tsv *TabletServer) SetRollback(ctx context.Context, target *querypb.Target "SetRollback", "set_rollback", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := &TxExecutor{ - ctx: ctx, - logStats: logStats, - te: tsv.te, - } + txe := NewDTExecutor(ctx, tsv.te, logStats) return txe.SetRollback(dtid, transactionID) }, ) @@ -750,11 +726,7 @@ func (tsv *TabletServer) ConcludeTransaction(ctx context.Context, target *queryp "ConcludeTransaction", "conclude_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := &TxExecutor{ - ctx: ctx, - logStats: logStats, - te: tsv.te, - } + txe := NewDTExecutor(ctx, tsv.te, logStats) return txe.ConcludeTransaction(dtid) }, ) @@ -767,11 +739,7 @@ func (tsv *TabletServer) ReadTransaction(ctx context.Context, target *querypb.Ta "ReadTransaction", "read_transaction", nil, target, nil, true, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { - txe := &TxExecutor{ - ctx: ctx, - logStats: logStats, - te: tsv.te, - } + txe := NewDTExecutor(ctx, tsv.te, logStats) metadata, err = txe.ReadTransaction(dtid) return err }, @@ -1778,11 +1746,7 @@ func (tsv *TabletServer) registerQueryListHandlers(queryLists []*QueryList) { func (tsv *TabletServer) registerTwopczHandler() { tsv.exporter.HandleFunc("/twopcz", func(w http.ResponseWriter, r *http.Request) { ctx := tabletenv.LocalContext() - txe := &TxExecutor{ - ctx: ctx, - logStats: tabletenv.NewLogStats(ctx, "twopcz"), - te: tsv.te, - } + txe := NewDTExecutor(ctx, tsv.te, tabletenv.NewLogStats(ctx, "twopcz")) twopczHandler(txe, w, r) }) } diff --git a/go/vt/vttablet/tabletserver/twopcz.go b/go/vt/vttablet/tabletserver/twopcz.go index e9547c311bd..51ed457c679 100644 --- a/go/vt/vttablet/tabletserver/twopcz.go +++ b/go/vt/vttablet/tabletserver/twopcz.go @@ -130,7 +130,7 @@ var ( `)) ) -func twopczHandler(txe *TxExecutor, w http.ResponseWriter, r *http.Request) { +func twopczHandler(txe *DTExecutor, w http.ResponseWriter, r *http.Request) { if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil { acl.SendError(w, err) return