From ae7214d9bcd4db23c7aeeb566757951501aba158 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 23 Jul 2024 15:54:01 +0530 Subject: [PATCH] Distributed Transaction Resolver (#16381) Signed-off-by: Harshit Gangal Signed-off-by: Manan Gupta Co-authored-by: Manan Gupta --- .../cluster_endtoend_vtgate_transaction.yml | 2 +- Makefile | 4 + go/cmd/vtgateclienttest/services/fallback.go | 4 - go/cmd/vtgateclienttest/services/terminal.go | 4 - .../transaction/restart/main_test.go | 0 .../transaction/restart/schema.sql | 0 .../rollback/txn_rollback_shutdown_test.go | 0 .../{vtgate => }/transaction/schema.sql | 0 .../transaction/single/main_test.go | 0 .../transaction/single/schema.sql | 0 .../transaction/single/vschema.json | 0 .../transaction/twopc/main_test.go | 37 +- .../{vtgate => }/transaction/twopc/schema.sql | 0 .../transaction/twopc/twopc_test.go | 442 ++++++++++++++++-- .../transaction/twopc/vschema.json | 0 .../{vtgate => }/transaction/tx_test.go | 0 .../{vtgate => }/transaction/vschema.json | 0 go/vt/proto/vtgateservice/vtgateservice.pb.go | 95 ++-- .../vtgateservice/vtgateservice_grpc.pb.go | 40 -- go/vt/vitessdriver/fakeserver_test.go | 9 - go/vt/vtgate/debug_2pc.go | 21 + go/vt/vtgate/fakerpcvtgateconn/conn.go | 5 - go/vt/vtgate/grpcvtgateconn/conn.go | 9 - go/vt/vtgate/grpcvtgateconn/suite_test.go | 16 - go/vt/vtgate/grpcvtgateservice/server.go | 12 - go/vt/vtgate/production.go | 28 ++ go/vt/vtgate/tx_conn.go | 138 +++++- go/vt/vtgate/tx_conn_test.go | 171 ++++--- go/vt/vtgate/txresolver/tx_resolver.go | 104 +++++ go/vt/vtgate/vtgate.go | 13 +- go/vt/vtgate/vtgateconn/vtgateconn.go | 8 - go/vt/vtgate/vtgateservice/interface.go | 3 - go/vt/vttablet/endtoend/transaction_test.go | 14 +- go/vt/vttablet/sandboxconn/sandboxconn.go | 53 ++- go/vt/vttablet/tabletserver/dt_executor.go | 2 + go/vt/vttablet/tabletserver/twopc.go | 5 +- go/vt/vttablet/tabletserver/twopc_test.go | 13 +- proto/vtgateservice.proto | 4 - test.go | 4 + test/ci_workflow_gen.go | 10 +- test/config.json | 10 +- test/config_partial_keyspace.json | 6 +- test/templates/cluster_endtoend_test.tpl | 2 +- 43 files changed, 911 insertions(+), 377 deletions(-) rename go/test/endtoend/{vtgate => }/transaction/restart/main_test.go (100%) rename go/test/endtoend/{vtgate => }/transaction/restart/schema.sql (100%) rename go/test/endtoend/{vtgate => }/transaction/rollback/txn_rollback_shutdown_test.go (100%) rename go/test/endtoend/{vtgate => }/transaction/schema.sql (100%) rename go/test/endtoend/{vtgate => }/transaction/single/main_test.go (100%) rename go/test/endtoend/{vtgate => }/transaction/single/schema.sql (100%) rename go/test/endtoend/{vtgate => }/transaction/single/vschema.json (100%) rename go/test/endtoend/{vtgate => }/transaction/twopc/main_test.go (90%) rename go/test/endtoend/{vtgate => }/transaction/twopc/schema.sql (100%) rename go/test/endtoend/{vtgate => }/transaction/twopc/twopc_test.go (53%) rename go/test/endtoend/{vtgate => }/transaction/twopc/vschema.json (100%) rename go/test/endtoend/{vtgate => }/transaction/tx_test.go (100%) rename go/test/endtoend/{vtgate => }/transaction/vschema.json (100%) create mode 100644 go/vt/vtgate/debug_2pc.go create mode 100644 go/vt/vtgate/production.go create mode 100644 go/vt/vtgate/txresolver/tx_resolver.go diff --git a/.github/workflows/cluster_endtoend_vtgate_transaction.yml b/.github/workflows/cluster_endtoend_vtgate_transaction.yml index 41a84f42d3f..85cf22efcc1 100644 --- a/.github/workflows/cluster_endtoend_vtgate_transaction.yml +++ b/.github/workflows/cluster_endtoend_vtgate_transaction.yml @@ -136,7 +136,7 @@ jobs: set -exo pipefail # run the tests however you normally do, then produce a JUnit XML file - eatmydata -- go run test.go -docker=false -follow -shard vtgate_transaction | tee -a output.txt | go-junit-report -set-exit-code > report.xml + eatmydata -- go run test.go -docker=false -follow -shard vtgate_transaction -build-tag=debug2PC | tee -a output.txt | go-junit-report -set-exit-code > report.xml - name: Print test output and Record test result in launchable if PR is not a draft if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always() diff --git a/Makefile b/Makefile index e46f5d1ca5a..ab367408873 100644 --- a/Makefile +++ b/Makefile @@ -81,6 +81,7 @@ endif bash ./build.env go build -trimpath $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) \ -ldflags "$(EXTRA_BUILD_LDFLAGS) $(shell tools/build_version_flags.sh)" \ + -tags "$(EXTRA_BUILD_TAGS)" \ -o ${VTROOTBIN} ./go/... # build the vitess binaries statically @@ -94,6 +95,7 @@ endif CGO_ENABLED=0 go build \ -trimpath $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) \ -ldflags "$(EXTRA_BUILD_LDFLAGS) $(shell tools/build_version_flags.sh)" \ + -tags "$(EXTRA_BUILD_TAGS)" \ -o ${VTROOTBIN} ./go/... ifndef NOVTADMINBUILD echo "Building VTAdmin Web, disable VTAdmin build by setting 'NOVTADMINBUILD'" @@ -116,6 +118,7 @@ endif CGO_ENABLED=0 GOOS=${GOOS} GOARCH=${GOARCH} go build \ -trimpath $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) \ -ldflags "$(EXTRA_BUILD_LDFLAGS) $(shell tools/build_version_flags.sh)" \ + -tags "$(EXTRA_BUILD_TAGS)" \ -o ${VTROOTBIN}/${GOOS}_${GOARCH} ./go/... @if [ ! -x "${VTROOTBIN}/${GOOS}_${GOARCH}/vttablet" ]; then \ @@ -130,6 +133,7 @@ endif go build -trimpath \ $(EXTRA_BUILD_FLAGS) $(VT_GO_PARALLEL) \ -ldflags "$(EXTRA_BUILD_LDFLAGS) $(shell tools/build_version_flags.sh)" \ + -tags "$(EXTRA_BUILD_TAGS)" \ -gcflags -'N -l' \ -o ${VTROOTBIN} ./go/... diff --git a/go/cmd/vtgateclienttest/services/fallback.go b/go/cmd/vtgateclienttest/services/fallback.go index 72175fe01ce..dab0e912ddb 100644 --- a/go/cmd/vtgateclienttest/services/fallback.go +++ b/go/cmd/vtgateclienttest/services/fallback.go @@ -60,10 +60,6 @@ func (c fallbackClient) CloseSession(ctx context.Context, session *vtgatepb.Sess return c.fallback.CloseSession(ctx, session) } -func (c fallbackClient) ResolveTransaction(ctx context.Context, dtid string) error { - return c.fallback.ResolveTransaction(ctx, dtid) -} - func (c fallbackClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error { return c.fallback.VStream(ctx, tabletType, vgtid, filter, flags, send) } diff --git a/go/cmd/vtgateclienttest/services/terminal.go b/go/cmd/vtgateclienttest/services/terminal.go index 7245be547ac..8fa321e2606 100644 --- a/go/cmd/vtgateclienttest/services/terminal.go +++ b/go/cmd/vtgateclienttest/services/terminal.go @@ -71,10 +71,6 @@ func (c *terminalClient) CloseSession(ctx context.Context, session *vtgatepb.Ses return errTerminal } -func (c *terminalClient) ResolveTransaction(ctx context.Context, dtid string) error { - return errTerminal -} - func (c *terminalClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error { return errTerminal } diff --git a/go/test/endtoend/vtgate/transaction/restart/main_test.go b/go/test/endtoend/transaction/restart/main_test.go similarity index 100% rename from go/test/endtoend/vtgate/transaction/restart/main_test.go rename to go/test/endtoend/transaction/restart/main_test.go diff --git a/go/test/endtoend/vtgate/transaction/restart/schema.sql b/go/test/endtoend/transaction/restart/schema.sql similarity index 100% rename from go/test/endtoend/vtgate/transaction/restart/schema.sql rename to go/test/endtoend/transaction/restart/schema.sql diff --git a/go/test/endtoend/vtgate/transaction/rollback/txn_rollback_shutdown_test.go b/go/test/endtoend/transaction/rollback/txn_rollback_shutdown_test.go similarity index 100% rename from go/test/endtoend/vtgate/transaction/rollback/txn_rollback_shutdown_test.go rename to go/test/endtoend/transaction/rollback/txn_rollback_shutdown_test.go diff --git a/go/test/endtoend/vtgate/transaction/schema.sql b/go/test/endtoend/transaction/schema.sql similarity index 100% rename from go/test/endtoend/vtgate/transaction/schema.sql rename to go/test/endtoend/transaction/schema.sql diff --git a/go/test/endtoend/vtgate/transaction/single/main_test.go b/go/test/endtoend/transaction/single/main_test.go similarity index 100% rename from go/test/endtoend/vtgate/transaction/single/main_test.go rename to go/test/endtoend/transaction/single/main_test.go diff --git a/go/test/endtoend/vtgate/transaction/single/schema.sql b/go/test/endtoend/transaction/single/schema.sql similarity index 100% rename from go/test/endtoend/vtgate/transaction/single/schema.sql rename to go/test/endtoend/transaction/single/schema.sql diff --git a/go/test/endtoend/vtgate/transaction/single/vschema.json b/go/test/endtoend/transaction/single/vschema.json similarity index 100% rename from go/test/endtoend/vtgate/transaction/single/vschema.json rename to go/test/endtoend/transaction/single/vschema.json diff --git a/go/test/endtoend/vtgate/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go similarity index 90% rename from go/test/endtoend/vtgate/transaction/twopc/main_test.go rename to go/test/endtoend/transaction/twopc/main_test.go index af5c1a395ad..8ac7cfc1f21 100644 --- a/go/test/endtoend/vtgate/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -75,10 +75,11 @@ func TestMain(m *testing.M) { // Set extra args for twopc clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--transaction_mode", "TWOPC", + "--grpc_use_effective_callerid", ) clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--twopc_enable", - "--twopc_abandon_age", "3600", + "--twopc_abandon_age", "1", "--queryserver-config-transaction-cap", "3", ) @@ -89,7 +90,7 @@ func TestMain(m *testing.M) { VSchema: VSchema, SidecarDBName: sidecarDBName, } - if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 0, false); err != nil { return 1 } @@ -110,22 +111,23 @@ func start(t *testing.T) (*mysql.Conn, func()) { conn, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) - deleteAll := func() { - tables := []string{"twopc_user"} - for _, table := range tables { - _, _ = utils.ExecAllowError(t, conn, "delete from "+table) - } - } - - deleteAll() - return conn, func() { - deleteAll() conn.Close() - cluster.PanicHandler(t) + cleanup(t) } } +func cleanup(t *testing.T) { + cluster.PanicHandler(t) + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + _, _ = utils.ExecAllowError(t, conn, "delete from twopc_user") +} + type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value var tables = map[string]extractInterestingValues{ @@ -171,7 +173,8 @@ func getDTID(dtidMap map[string]string, dtKey string) string { func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent, vtgateConn *vtgateconn.VTGateConn) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{ - {Keyspace: keyspaceName, Shard: "-80", Gtid: "current"}, + {Keyspace: keyspaceName, Shard: "-40", Gtid: "current"}, + {Keyspace: keyspaceName, Shard: "40-80", Gtid: "current"}, {Keyspace: keyspaceName, Shard: "80-", Gtid: "current"}, }} filter := &binlogdatapb.Filter{ @@ -211,6 +214,10 @@ func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent, } func retrieveTransitions(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap map[string][]*querypb.Field, dtMap map[string]string) map[string][]string { + return retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 1*time.Second) +} + +func retrieveTransitionsWithTimeout(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap map[string][]*querypb.Field, dtMap map[string]string, timeout time.Duration) map[string][]string { logTable := make(map[string][]string) keepWaiting := true @@ -229,7 +236,7 @@ func retrieveTransitions(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap ma if re.FieldEvent != nil { tableMap[re.FieldEvent.TableName] = re.FieldEvent.Fields } - case <-time.After(1 * time.Second): + case <-time.After(timeout): keepWaiting = false } } diff --git a/go/test/endtoend/vtgate/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql similarity index 100% rename from go/test/endtoend/vtgate/transaction/twopc/schema.sql rename to go/test/endtoend/transaction/twopc/schema.sql diff --git a/go/test/endtoend/vtgate/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go similarity index 53% rename from go/test/endtoend/vtgate/transaction/twopc/twopc_test.go rename to go/test/endtoend/transaction/twopc/twopc_test.go index f18073c5827..2e8f21a453b 100644 --- a/go/test/endtoend/vtgate/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -23,6 +23,7 @@ import ( "sort" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -30,6 +31,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/callerid" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" ) @@ -68,23 +70,33 @@ func TestDTCommit(t *testing.T) { "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", }, "ks.dt_participant:80-": { - "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", - "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", }, - "ks.redo_state:-80": { + "ks.redo_state:-40": { "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", }, - "ks.redo_statement:-80": { + "ks.redo_state:40-80": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:-40": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + }, + "ks.redo_statement:40-80": { "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", - "insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", - "delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", }, - "ks.twopc_user:-80": { - `insert:[INT64(8) VARCHAR("bar")]`, + "ks.twopc_user:-40": { `insert:[INT64(10) VARCHAR("apa")]`, }, + "ks.twopc_user:40-80": { + `insert:[INT64(8) VARCHAR("bar")]`, + }, "ks.twopc_user:80-": { `insert:[INT64(7) VARCHAR("foo")]`, `insert:[INT64(9) VARCHAR("baz")]`, @@ -107,19 +119,19 @@ func TestDTCommit(t *testing.T) { "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"COMMIT\")]", }, "ks.dt_participant:80-": { - "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", - "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", }, - "ks.redo_state:-80": { + "ks.redo_state:40-80": { "insert:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", }, - "ks.redo_statement:-80": { + "ks.redo_statement:40-80": { "insert:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]", "delete:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]", }, - "ks.twopc_user:-80": {"update:[INT64(8) VARCHAR(\"newfoo\")]"}, - "ks.twopc_user:80-": {"update:[INT64(7) VARCHAR(\"newfoo\")]"}, + "ks.twopc_user:40-80": {"update:[INT64(8) VARCHAR(\"newfoo\")]"}, + "ks.twopc_user:80-": {"update:[INT64(7) VARCHAR(\"newfoo\")]"}, } assert.Equal(t, expectations, logTable, "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) @@ -138,18 +150,18 @@ func TestDTCommit(t *testing.T) { "delete:[VARCHAR(\"dtid-3\") VARCHAR(\"COMMIT\")]", }, "ks.dt_participant:80-": { - "insert:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", - "delete:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-40\")]", }, - "ks.redo_state:-80": { + "ks.redo_state:-40": { "insert:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", "delete:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", }, - "ks.redo_statement:-80": { + "ks.redo_statement:-40": { "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]", "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]", }, - "ks.twopc_user:-80": {"delete:[INT64(10) VARCHAR(\"apa\")]"}, + "ks.twopc_user:-40": {"delete:[INT64(10) VARCHAR(\"apa\")]"}, "ks.twopc_user:80-": {"delete:[INT64(9) VARCHAR(\"baz\")]"}, } assert.Equal(t, expectations, logTable, @@ -228,7 +240,8 @@ func TestDTCommitDMLOnlyOnMM(t *testing.T) { // Insert into multiple shards utils.Exec(t, conn, "begin") utils.Exec(t, conn, "insert into twopc_user(id, name) values(7,'foo')") - utils.Exec(t, conn, "select * from twopc_user") + utils.Exec(t, conn, "select * from twopc_user where id = 8") + utils.Exec(t, conn, "select * from twopc_user where id = 10") utils.Exec(t, conn, "commit") tableMap := make(map[string][]*querypb.Field) @@ -241,8 +254,10 @@ func TestDTCommitDMLOnlyOnMM(t *testing.T) { "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", }, "ks.dt_participant:80-": { - "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", - "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", }, "ks.twopc_user:80-": {"insert:[INT64(7) VARCHAR(\"foo\")]"}, } @@ -252,7 +267,8 @@ func TestDTCommitDMLOnlyOnMM(t *testing.T) { // Update from multiple shard utils.Exec(t, conn, "begin") utils.Exec(t, conn, "update twopc_user set name='newfoo' where id = 7") - utils.Exec(t, conn, "select * from twopc_user") + utils.Exec(t, conn, "select * from twopc_user where id = 8") + utils.Exec(t, conn, "select * from twopc_user where id = 10") utils.Exec(t, conn, "commit") logTable = retrieveTransitions(t, ch, tableMap, dtMap) @@ -263,8 +279,10 @@ func TestDTCommitDMLOnlyOnMM(t *testing.T) { "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"COMMIT\")]", }, "ks.dt_participant:80-": { - "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", - "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "insert:[VARCHAR(\"dtid-2\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", }, "ks.twopc_user:80-": {"update:[INT64(7) VARCHAR(\"newfoo\")]"}, } @@ -274,7 +292,8 @@ func TestDTCommitDMLOnlyOnMM(t *testing.T) { // DELETE from multiple shard utils.Exec(t, conn, "begin") utils.Exec(t, conn, "delete from twopc_user where id = 7") - utils.Exec(t, conn, "select * from twopc_user") + utils.Exec(t, conn, "select * from twopc_user where id = 8") + utils.Exec(t, conn, "select * from twopc_user where id = 10") utils.Exec(t, conn, "commit") logTable = retrieveTransitions(t, ch, tableMap, dtMap) @@ -285,8 +304,10 @@ func TestDTCommitDMLOnlyOnMM(t *testing.T) { "delete:[VARCHAR(\"dtid-3\") VARCHAR(\"COMMIT\")]", }, "ks.dt_participant:80-": { - "insert:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", - "delete:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", }, "ks.twopc_user:80-": {"delete:[INT64(7) VARCHAR(\"newfoo\")]"}, } @@ -321,12 +342,12 @@ func TestDTCommitDMLOnlyOnRM(t *testing.T) { dtMap := make(map[string]string) logTable := retrieveTransitions(t, ch, tableMap, dtMap) expectations := map[string][]string{ - "ks.dt_state:-80": { + "ks.dt_state:40-80": { "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", "update:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", }, - "ks.dt_participant:-80": { + "ks.dt_participant:40-80": { "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", }, @@ -351,12 +372,12 @@ func TestDTCommitDMLOnlyOnRM(t *testing.T) { logTable = retrieveTransitions(t, ch, tableMap, dtMap) expectations = map[string][]string{ - "ks.dt_state:-80": { + "ks.dt_state:40-80": { "insert:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", "update:[VARCHAR(\"dtid-2\") VARCHAR(\"COMMIT\")]", "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"COMMIT\")]", }, - "ks.dt_participant:-80": { + "ks.dt_participant:40-80": { "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", }, @@ -381,12 +402,12 @@ func TestDTCommitDMLOnlyOnRM(t *testing.T) { logTable = retrieveTransitions(t, ch, tableMap, dtMap) expectations = map[string][]string{ - "ks.dt_state:-80": { + "ks.dt_state:40-80": { "insert:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", "update:[VARCHAR(\"dtid-3\") VARCHAR(\"COMMIT\")]", "delete:[VARCHAR(\"dtid-3\") VARCHAR(\"COMMIT\")]", }, - "ks.dt_participant:-80": { + "ks.dt_participant:40-80": { "insert:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", "delete:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", }, @@ -430,7 +451,7 @@ func TestDTPrepareFailOnRM(t *testing.T) { utils.Exec(t, conn2, "begin") utils.Exec(t, conn2, "insert into twopc_user(id, name) values(9,'baz')") - utils.Exec(t, conn2, "insert into twopc_user(id, name) values(10,'apa')") + utils.Exec(t, conn2, "insert into twopc_user(id, name) values(18,'apa')") var wg sync.WaitGroup wg.Add(2) @@ -465,31 +486,31 @@ func TestDTPrepareFailOnRM(t *testing.T) { "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"ROLLBACK\")]", }, "ks.dt_participant:80-": { - "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", - "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", - "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", - "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", }, - "ks.redo_state:-80": { + "ks.redo_state:40-80": { "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", }, - "ks.redo_statement:-80": { /* flexi Expectation */ }, - "ks.twopc_user:-80": { /* flexi Expectation */ }, - "ks.twopc_user:80-": { /* flexi Expectation */ }, + "ks.redo_statement:40-80": { /* flexi Expectation */ }, + "ks.twopc_user:40-80": { /* flexi Expectation */ }, + "ks.twopc_user:80-": { /* flexi Expectation */ }, } flexiExpectations := map[string][2][]string{ - "ks.redo_statement:-80": {{ + "ks.redo_statement:40-80": {{ "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", }, { - "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", - "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (18, 'apa')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (18, 'apa')\")]", }}, - "ks.twopc_user:-80": {{ + "ks.twopc_user:40-80": {{ "insert:[INT64(8) VARCHAR(\"bar\")]", }, { - "insert:[INT64(10) VARCHAR(\"apa\")]", + "insert:[INT64(18) VARCHAR(\"apa\")]", }}, "ks.twopc_user:80-": {{ "insert:[INT64(7) VARCHAR(\"foo\")]", @@ -521,3 +542,328 @@ func compareMaps(t *testing.T, expected, actual map[string][]string, flexibleExp } } } + +// TestDTResolveAfterMMCommit tests that transaction is committed on recovery +// failure after MM commit. +func TestDTResolveAfterMMCommit(t *testing.T) { + defer cleanup(t) + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + conn := vtgateConn.Session("", nil) + qCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Insert into multiple shards + _, err = conn.Execute(qCtx, "begin", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(7,'foo')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(9,'baz')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(10,'apa')", nil) + require.NoError(t, err) + + // The caller ID is used to simulate the failure at the desired point. + newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("MMCommitted_FailNow", "", ""), nil) + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "Fail After MM commit") + + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second) + expectations := map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + }, + "ks.redo_state:-40": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_state:40-80": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:-40": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + }, + "ks.redo_statement:40-80": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + }, + "ks.twopc_user:-40": { + `insert:[INT64(10) VARCHAR("apa")]`, + }, + "ks.twopc_user:40-80": { + `insert:[INT64(8) VARCHAR("bar")]`, + }, + "ks.twopc_user:80-": { + `insert:[INT64(7) VARCHAR("foo")]`, + `insert:[INT64(9) VARCHAR("baz")]`, + }, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) +} + +// TestDTResolveAfterRMPrepare tests that transaction is rolled back on recovery +// failure after RM prepare and before MM commit. +func TestDTResolveAfterRMPrepare(t *testing.T) { + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + conn := vtgateConn.Session("", nil) + qCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Insert into multiple shards + _, err = conn.Execute(qCtx, "begin", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(7,'foo')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil) + require.NoError(t, err) + + // The caller ID is used to simulate the failure at the desired point. + newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("RMPrepared_FailNow", "", ""), nil) + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "Fail After RM prepared") + + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second) + expectations := map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-1\") VARCHAR(\"ROLLBACK\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"ROLLBACK\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + }, + "ks.redo_state:40-80": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:40-80": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + }, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) +} + +// TestDTResolveDuringRMPrepare tests that transaction is rolled back on recovery +// failure after semi RM prepare. +func TestDTResolveDuringRMPrepare(t *testing.T) { + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + conn := vtgateConn.Session("", nil) + qCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Insert into multiple shards + _, err = conn.Execute(qCtx, "begin", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(7,'foo')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(10,'bar')", nil) + require.NoError(t, err) + + // The caller ID is used to simulate the failure at the desired point. + newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("RMPrepare_-40_FailNow", "", ""), nil) + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "Fail During RM prepare") + + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second) + expectations := map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-1\") VARCHAR(\"ROLLBACK\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"ROLLBACK\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + }, + "ks.redo_state:40-80": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:40-80": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + }, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) +} + +// TestDTResolveDuringRMCommit tests that transaction is committed on recovery +// failure after semi RM commit. +func TestDTResolveDuringRMCommit(t *testing.T) { + defer cleanup(t) + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + conn := vtgateConn.Session("", nil) + qCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Insert into multiple shards + _, err = conn.Execute(qCtx, "begin", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(7,'foo')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(10,'apa')", nil) + require.NoError(t, err) + + // The caller ID is used to simulate the failure at the desired point. + newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("RMCommit_-40_FailNow", "", ""), nil) + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "Fail During RM commit") + + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second) + expectations := map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(2) VARCHAR(\"ks\") VARCHAR(\"-40\")]", + }, + "ks.redo_state:-40": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_state:40-80": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:-40": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + }, + "ks.redo_statement:40-80": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + }, + "ks.twopc_user:-40": { + `insert:[INT64(10) VARCHAR("apa")]`, + }, + "ks.twopc_user:40-80": { + `insert:[INT64(8) VARCHAR("bar")]`, + }, + "ks.twopc_user:80-": { + `insert:[INT64(7) VARCHAR("foo")]`, + }, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) +} + +// TestDTResolveAfterTransactionRecord tests that transaction is rolled back on recovery +// failure after TR created and before RM prepare. +func TestDTResolveAfterTransactionRecord(t *testing.T) { + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + conn := vtgateConn.Session("", nil) + qCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Insert into multiple shards + _, err = conn.Execute(qCtx, "begin", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(7,'foo')", nil) + require.NoError(t, err) + _, err = conn.Execute(qCtx, "insert into twopc_user(id, name) values(8,'bar')", nil) + require.NoError(t, err) + + // The caller ID is used to simulate the failure at the desired point. + newCtx := callerid.NewContext(qCtx, callerid.NewEffectiveCallerID("TRCreated_FailNow", "", ""), nil) + _, err = conn.Execute(newCtx, "commit", nil) + require.ErrorContains(t, err, "Fail After TR created") + + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second) + expectations := map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-1\") VARCHAR(\"ROLLBACK\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"ROLLBACK\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"40-80\")]", + }, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) +} diff --git a/go/test/endtoend/vtgate/transaction/twopc/vschema.json b/go/test/endtoend/transaction/twopc/vschema.json similarity index 100% rename from go/test/endtoend/vtgate/transaction/twopc/vschema.json rename to go/test/endtoend/transaction/twopc/vschema.json diff --git a/go/test/endtoend/vtgate/transaction/tx_test.go b/go/test/endtoend/transaction/tx_test.go similarity index 100% rename from go/test/endtoend/vtgate/transaction/tx_test.go rename to go/test/endtoend/transaction/tx_test.go diff --git a/go/test/endtoend/vtgate/transaction/vschema.json b/go/test/endtoend/transaction/vschema.json similarity index 100% rename from go/test/endtoend/vtgate/transaction/vschema.json rename to go/test/endtoend/transaction/vschema.json diff --git a/go/vt/proto/vtgateservice/vtgateservice.pb.go b/go/vt/proto/vtgateservice/vtgateservice.pb.go index fbe32f082e9..165e3c0bbef 100644 --- a/go/vt/proto/vtgateservice/vtgateservice.pb.go +++ b/go/vt/proto/vtgateservice/vtgateservice.pb.go @@ -44,7 +44,7 @@ var file_vtgateservice_proto_rawDesc = []byte{ 0x0a, 0x13, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x1a, 0x0c, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x32, 0x8f, 0x04, 0x0a, 0x06, 0x56, 0x69, 0x74, 0x65, 0x73, 0x73, 0x12, 0x3c, 0x0a, + 0x74, 0x6f, 0x32, 0xb0, 0x03, 0x0a, 0x06, 0x56, 0x69, 0x74, 0x65, 0x73, 0x73, 0x12, 0x3c, 0x0a, 0x07, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x12, 0x16, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, @@ -58,65 +58,56 @@ var file_vtgateservice_proto_rawDesc = []byte{ 0x74, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x5d, 0x0a, 0x12, 0x52, 0x65, - 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x12, 0x21, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, - 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x52, 0x65, 0x73, - 0x6f, 0x6c, 0x76, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x07, 0x56, 0x53, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x12, 0x16, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x76, - 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3c, 0x0a, 0x07, 0x50, 0x72, 0x65, - 0x70, 0x61, 0x72, 0x65, 0x12, 0x16, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x50, 0x72, - 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x76, - 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0c, 0x43, 0x6c, 0x6f, 0x73, 0x65, - 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, - 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x42, 0x42, 0x0a, 0x14, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, - 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x5a, 0x2a, 0x76, 0x69, - 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, - 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x74, 0x67, 0x61, 0x74, - 0x65, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3e, 0x0a, 0x07, 0x56, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x16, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, + 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x3c, 0x0a, 0x07, 0x50, 0x72, + 0x65, 0x70, 0x61, 0x72, 0x65, 0x12, 0x16, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x50, + 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, + 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4b, 0x0a, 0x0c, 0x43, 0x6c, 0x6f, 0x73, + 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, + 0x65, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x43, + 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x42, 0x0a, 0x14, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, + 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x5a, 0x2a, 0x76, + 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, + 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x74, 0x67, 0x61, + 0x74, 0x65, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var file_vtgateservice_proto_goTypes = []any{ - (*vtgate.ExecuteRequest)(nil), // 0: vtgate.ExecuteRequest - (*vtgate.ExecuteBatchRequest)(nil), // 1: vtgate.ExecuteBatchRequest - (*vtgate.StreamExecuteRequest)(nil), // 2: vtgate.StreamExecuteRequest - (*vtgate.ResolveTransactionRequest)(nil), // 3: vtgate.ResolveTransactionRequest - (*vtgate.VStreamRequest)(nil), // 4: vtgate.VStreamRequest - (*vtgate.PrepareRequest)(nil), // 5: vtgate.PrepareRequest - (*vtgate.CloseSessionRequest)(nil), // 6: vtgate.CloseSessionRequest - (*vtgate.ExecuteResponse)(nil), // 7: vtgate.ExecuteResponse - (*vtgate.ExecuteBatchResponse)(nil), // 8: vtgate.ExecuteBatchResponse - (*vtgate.StreamExecuteResponse)(nil), // 9: vtgate.StreamExecuteResponse - (*vtgate.ResolveTransactionResponse)(nil), // 10: vtgate.ResolveTransactionResponse - (*vtgate.VStreamResponse)(nil), // 11: vtgate.VStreamResponse - (*vtgate.PrepareResponse)(nil), // 12: vtgate.PrepareResponse - (*vtgate.CloseSessionResponse)(nil), // 13: vtgate.CloseSessionResponse + (*vtgate.ExecuteRequest)(nil), // 0: vtgate.ExecuteRequest + (*vtgate.ExecuteBatchRequest)(nil), // 1: vtgate.ExecuteBatchRequest + (*vtgate.StreamExecuteRequest)(nil), // 2: vtgate.StreamExecuteRequest + (*vtgate.VStreamRequest)(nil), // 3: vtgate.VStreamRequest + (*vtgate.PrepareRequest)(nil), // 4: vtgate.PrepareRequest + (*vtgate.CloseSessionRequest)(nil), // 5: vtgate.CloseSessionRequest + (*vtgate.ExecuteResponse)(nil), // 6: vtgate.ExecuteResponse + (*vtgate.ExecuteBatchResponse)(nil), // 7: vtgate.ExecuteBatchResponse + (*vtgate.StreamExecuteResponse)(nil), // 8: vtgate.StreamExecuteResponse + (*vtgate.VStreamResponse)(nil), // 9: vtgate.VStreamResponse + (*vtgate.PrepareResponse)(nil), // 10: vtgate.PrepareResponse + (*vtgate.CloseSessionResponse)(nil), // 11: vtgate.CloseSessionResponse } var file_vtgateservice_proto_depIdxs = []int32{ 0, // 0: vtgateservice.Vitess.Execute:input_type -> vtgate.ExecuteRequest 1, // 1: vtgateservice.Vitess.ExecuteBatch:input_type -> vtgate.ExecuteBatchRequest 2, // 2: vtgateservice.Vitess.StreamExecute:input_type -> vtgate.StreamExecuteRequest - 3, // 3: vtgateservice.Vitess.ResolveTransaction:input_type -> vtgate.ResolveTransactionRequest - 4, // 4: vtgateservice.Vitess.VStream:input_type -> vtgate.VStreamRequest - 5, // 5: vtgateservice.Vitess.Prepare:input_type -> vtgate.PrepareRequest - 6, // 6: vtgateservice.Vitess.CloseSession:input_type -> vtgate.CloseSessionRequest - 7, // 7: vtgateservice.Vitess.Execute:output_type -> vtgate.ExecuteResponse - 8, // 8: vtgateservice.Vitess.ExecuteBatch:output_type -> vtgate.ExecuteBatchResponse - 9, // 9: vtgateservice.Vitess.StreamExecute:output_type -> vtgate.StreamExecuteResponse - 10, // 10: vtgateservice.Vitess.ResolveTransaction:output_type -> vtgate.ResolveTransactionResponse - 11, // 11: vtgateservice.Vitess.VStream:output_type -> vtgate.VStreamResponse - 12, // 12: vtgateservice.Vitess.Prepare:output_type -> vtgate.PrepareResponse - 13, // 13: vtgateservice.Vitess.CloseSession:output_type -> vtgate.CloseSessionResponse - 7, // [7:14] is the sub-list for method output_type - 0, // [0:7] is the sub-list for method input_type + 3, // 3: vtgateservice.Vitess.VStream:input_type -> vtgate.VStreamRequest + 4, // 4: vtgateservice.Vitess.Prepare:input_type -> vtgate.PrepareRequest + 5, // 5: vtgateservice.Vitess.CloseSession:input_type -> vtgate.CloseSessionRequest + 6, // 6: vtgateservice.Vitess.Execute:output_type -> vtgate.ExecuteResponse + 7, // 7: vtgateservice.Vitess.ExecuteBatch:output_type -> vtgate.ExecuteBatchResponse + 8, // 8: vtgateservice.Vitess.StreamExecute:output_type -> vtgate.StreamExecuteResponse + 9, // 9: vtgateservice.Vitess.VStream:output_type -> vtgate.VStreamResponse + 10, // 10: vtgateservice.Vitess.Prepare:output_type -> vtgate.PrepareResponse + 11, // 11: vtgateservice.Vitess.CloseSession:output_type -> vtgate.CloseSessionResponse + 6, // [6:12] is the sub-list for method output_type + 0, // [0:6] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/go/vt/proto/vtgateservice/vtgateservice_grpc.pb.go b/go/vt/proto/vtgateservice/vtgateservice_grpc.pb.go index 44dd83d7f0b..80042781649 100644 --- a/go/vt/proto/vtgateservice/vtgateservice_grpc.pb.go +++ b/go/vt/proto/vtgateservice/vtgateservice_grpc.pb.go @@ -39,9 +39,6 @@ type VitessClient interface { // Use this method if the query returns a large number of rows. // API group: v3 StreamExecute(ctx context.Context, in *vtgate.StreamExecuteRequest, opts ...grpc.CallOption) (Vitess_StreamExecuteClient, error) - // ResolveTransaction resolves a transaction. - // API group: Transactions - ResolveTransaction(ctx context.Context, in *vtgate.ResolveTransactionRequest, opts ...grpc.CallOption) (*vtgate.ResolveTransactionResponse, error) // VStream streams binlog events from the requested sources. VStream(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (Vitess_VStreamClient, error) // Prepare is used by the MySQL server plugin as part of supporting prepared statements. @@ -110,15 +107,6 @@ func (x *vitessStreamExecuteClient) Recv() (*vtgate.StreamExecuteResponse, error return m, nil } -func (c *vitessClient) ResolveTransaction(ctx context.Context, in *vtgate.ResolveTransactionRequest, opts ...grpc.CallOption) (*vtgate.ResolveTransactionResponse, error) { - out := new(vtgate.ResolveTransactionResponse) - err := c.cc.Invoke(ctx, "/vtgateservice.Vitess/ResolveTransaction", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *vitessClient) VStream(ctx context.Context, in *vtgate.VStreamRequest, opts ...grpc.CallOption) (Vitess_VStreamClient, error) { stream, err := c.cc.NewStream(ctx, &Vitess_ServiceDesc.Streams[1], "/vtgateservice.Vitess/VStream", opts...) if err != nil { @@ -189,9 +177,6 @@ type VitessServer interface { // Use this method if the query returns a large number of rows. // API group: v3 StreamExecute(*vtgate.StreamExecuteRequest, Vitess_StreamExecuteServer) error - // ResolveTransaction resolves a transaction. - // API group: Transactions - ResolveTransaction(context.Context, *vtgate.ResolveTransactionRequest) (*vtgate.ResolveTransactionResponse, error) // VStream streams binlog events from the requested sources. VStream(*vtgate.VStreamRequest, Vitess_VStreamServer) error // Prepare is used by the MySQL server plugin as part of supporting prepared statements. @@ -216,9 +201,6 @@ func (UnimplementedVitessServer) ExecuteBatch(context.Context, *vtgate.ExecuteBa func (UnimplementedVitessServer) StreamExecute(*vtgate.StreamExecuteRequest, Vitess_StreamExecuteServer) error { return status.Errorf(codes.Unimplemented, "method StreamExecute not implemented") } -func (UnimplementedVitessServer) ResolveTransaction(context.Context, *vtgate.ResolveTransactionRequest) (*vtgate.ResolveTransactionResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method ResolveTransaction not implemented") -} func (UnimplementedVitessServer) VStream(*vtgate.VStreamRequest, Vitess_VStreamServer) error { return status.Errorf(codes.Unimplemented, "method VStream not implemented") } @@ -298,24 +280,6 @@ func (x *vitessStreamExecuteServer) Send(m *vtgate.StreamExecuteResponse) error return x.ServerStream.SendMsg(m) } -func _Vitess_ResolveTransaction_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(vtgate.ResolveTransactionRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(VitessServer).ResolveTransaction(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/vtgateservice.Vitess/ResolveTransaction", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(VitessServer).ResolveTransaction(ctx, req.(*vtgate.ResolveTransactionRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _Vitess_VStream_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(vtgate.VStreamRequest) if err := stream.RecvMsg(m); err != nil { @@ -388,10 +352,6 @@ var Vitess_ServiceDesc = grpc.ServiceDesc{ MethodName: "ExecuteBatch", Handler: _Vitess_ExecuteBatch_Handler, }, - { - MethodName: "ResolveTransaction", - Handler: _Vitess_ResolveTransaction_Handler, - }, { MethodName: "Prepare", Handler: _Vitess_Prepare_Handler, diff --git a/go/vt/vitessdriver/fakeserver_test.go b/go/vt/vitessdriver/fakeserver_test.go index a74e44e682c..a4b17fc65d6 100644 --- a/go/vt/vitessdriver/fakeserver_test.go +++ b/go/vt/vitessdriver/fakeserver_test.go @@ -18,7 +18,6 @@ package vitessdriver import ( "context" - "errors" "fmt" "reflect" @@ -156,14 +155,6 @@ func (f *fakeVTGateService) CloseSession(ctx context.Context, session *vtgatepb. return nil } -// ResolveTransaction is part of the VTGateService interface -func (f *fakeVTGateService) ResolveTransaction(ctx context.Context, dtid string) error { - if dtid != dtid2 { - return errors.New("ResolveTransaction: dtid mismatch") - } - return nil -} - func (f *fakeVTGateService) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error { return nil } diff --git a/go/vt/vtgate/debug_2pc.go b/go/vt/vtgate/debug_2pc.go new file mode 100644 index 00000000000..be859553d2f --- /dev/null +++ b/go/vt/vtgate/debug_2pc.go @@ -0,0 +1,21 @@ +//go:build debug2PC + +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +const DEBUG_2PC = true diff --git a/go/vt/vtgate/fakerpcvtgateconn/conn.go b/go/vt/vtgate/fakerpcvtgateconn/conn.go index 3f6236ea9ec..372ddfb8cfc 100644 --- a/go/vt/vtgate/fakerpcvtgateconn/conn.go +++ b/go/vt/vtgate/fakerpcvtgateconn/conn.go @@ -182,11 +182,6 @@ func (conn *FakeVTGateConn) CloseSession(ctx context.Context, session *vtgatepb. panic("not implemented") } -// ResolveTransaction please see vtgateconn.Impl.ResolveTransaction -func (conn *FakeVTGateConn) ResolveTransaction(ctx context.Context, dtid string) error { - return nil -} - // VStream streams binlog events. func (conn *FakeVTGateConn) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (vtgateconn.VStreamReader, error) { diff --git a/go/vt/vtgate/grpcvtgateconn/conn.go b/go/vt/vtgate/grpcvtgateconn/conn.go index a681e3661cd..fa16fa0d602 100644 --- a/go/vt/vtgate/grpcvtgateconn/conn.go +++ b/go/vt/vtgate/grpcvtgateconn/conn.go @@ -234,15 +234,6 @@ func (conn *vtgateConn) CloseSession(ctx context.Context, session *vtgatepb.Sess return nil } -func (conn *vtgateConn) ResolveTransaction(ctx context.Context, dtid string) error { - request := &vtgatepb.ResolveTransactionRequest{ - CallerId: callerid.EffectiveCallerIDFromContext(ctx), - Dtid: dtid, - } - _, err := conn.c.ResolveTransaction(ctx, request) - return vterrors.FromGRPC(err) -} - type vstreamAdapter struct { stream vtgateservicepb.Vitess_VStreamClient } diff --git a/go/vt/vtgate/grpcvtgateconn/suite_test.go b/go/vt/vtgate/grpcvtgateconn/suite_test.go index e5cd5c3ac81..0e544b05e66 100644 --- a/go/vt/vtgate/grpcvtgateconn/suite_test.go +++ b/go/vt/vtgate/grpcvtgateconn/suite_test.go @@ -23,7 +23,6 @@ package grpcvtgateconn import ( "context" - "errors" "fmt" "io" "strings" @@ -231,21 +230,6 @@ func (f *fakeVTGateService) CloseSession(ctx context.Context, session *vtgatepb. panic("unimplemented") } -// ResolveTransaction is part of the VTGateService interface -func (f *fakeVTGateService) ResolveTransaction(ctx context.Context, dtid string) error { - if f.hasError { - return errTestVtGateError - } - if f.panics { - panic(fmt.Errorf("test forced panic")) - } - f.checkCallerID(ctx, "ResolveTransaction") - if dtid != dtid2 { - return errors.New("ResolveTransaction: dtid mismatch") - } - return nil -} - func (f *fakeVTGateService) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error { panic("unimplemented") } diff --git a/go/vt/vtgate/grpcvtgateservice/server.go b/go/vt/vtgate/grpcvtgateservice/server.go index bf00db4ea1c..d9fef3d4e31 100644 --- a/go/vt/vtgate/grpcvtgateservice/server.go +++ b/go/vt/vtgate/grpcvtgateservice/server.go @@ -247,18 +247,6 @@ func (vtg *VTGate) CloseSession(ctx context.Context, request *vtgatepb.CloseSess }, nil } -// ResolveTransaction is the RPC version of vtgateservice.VTGateService method -func (vtg *VTGate) ResolveTransaction(ctx context.Context, request *vtgatepb.ResolveTransactionRequest) (response *vtgatepb.ResolveTransactionResponse, err error) { - defer vtg.server.HandlePanic(&err) - ctx = withCallerIDContext(ctx, request.CallerId) - vtgErr := vtg.server.ResolveTransaction(ctx, request.Dtid) - response = &vtgatepb.ResolveTransactionResponse{} - if vtgErr == nil { - return response, nil - } - return nil, vterrors.ToGRPC(vtgErr) -} - // VStream is the RPC version of vtgateservice.VTGateService method func (vtg *VTGate) VStream(request *vtgatepb.VStreamRequest, stream vtgateservicepb.Vitess_VStreamServer) (err error) { defer vtg.server.HandlePanic(&err) diff --git a/go/vt/vtgate/production.go b/go/vt/vtgate/production.go new file mode 100644 index 00000000000..d98ed5b2946 --- /dev/null +++ b/go/vt/vtgate/production.go @@ -0,0 +1,28 @@ +//go:build !debug2PC + +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +// This file defines debug constants that are always false. +// This file is used for building production code. +// We use go build directives to include a file that defines the constant to true +// when certain tags are provided while building binaries. +// This allows to have debugging code written in normal code flow without affecting +// production performance. + +const DEBUG_2PC = false diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 2eccdc54992..7bb23ea2b26 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -23,17 +23,17 @@ import ( "sync" "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/dtids" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/queryservice" - querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/queryservice" ) // nonAtomicCommitWarnMaxShards limits the number of shard names reported in @@ -201,25 +201,58 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { return err } + if DEBUG_2PC { + // Test code to simulate a failure after RM prepare + if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "TRCreated_FailNow", nil); failNow { + return err + } + } + err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { + if DEBUG_2PC { + // Test code to simulate a failure during RM prepare + if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "RMPrepare_-40_FailNow", s.Target); failNow { + return err + } + } return txc.tabletGateway.Prepare(ctx, s.Target, s.TransactionId, dtid) }) if err != nil { // TODO(sougou): Perform a more fine-grained cleanup // including unprepared transactions. - if resumeErr := txc.Resolve(ctx, dtid); resumeErr != nil { + if resumeErr := txc.rollbackTx(ctx, dtid, mmShard, session.ShardSessions[1:], session.logging); resumeErr != nil { log.Warningf("Rollback failed after Prepare failure: %v", resumeErr) } // Return the original error even if the previous operation fails. return err } + if DEBUG_2PC { + // Test code to simulate a failure after RM prepare + if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "RMPrepared_FailNow", nil); failNow { + return err + } + } + err = txc.tabletGateway.StartCommit(ctx, mmShard.Target, mmShard.TransactionId, dtid) if err != nil { return err } + if DEBUG_2PC { + // Test code to simulate a failure after MM commit + if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "MMCommitted_FailNow", nil); failNow { + return err + } + } + err = txc.runSessions(ctx, session.ShardSessions[1:], session.logging, func(ctx context.Context, s *vtgatepb.Session_ShardSession, logging *executeLogger) error { + if DEBUG_2PC { + // Test code to simulate a failure during RM prepare + if failNow, err := checkTestFailure(callerid.EffectiveCallerIDFromContext(ctx), "RMCommit_-40_FailNow", s.Target); failNow { + return err + } + } return txc.tabletGateway.CommitPrepared(ctx, s.Target, dtid) }) if err != nil { @@ -229,6 +262,42 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error { return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) } +func checkTestFailure(callerID *vtrpcpb.CallerID, expectCaller string, target *querypb.Target) (bool, error) { + if callerID == nil || callerID.GetPrincipal() != expectCaller { + return false, nil + } + switch callerID.Principal { + case "TRCreated_FailNow": + log.Errorf("Fail After TR created") + // no commit decision is made. Transaction should be a rolled back. + return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After TR created") + case "RMPrepare_-40_FailNow": + if target.Shard != "-40" { + return false, nil + } + log.Errorf("Fail During RM prepare") + // no commit decision is made. Transaction should be a rolled back. + return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM prepare") + case "RMPrepared_FailNow": + log.Errorf("Fail After RM prepared") + // no commit decision is made. Transaction should be a rolled back. + return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After RM prepared") + case "MMCommitted_FailNow": + log.Errorf("Fail After MM commit") + // commit decision is made. Transaction should be committed. + return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail After MM commit") + case "RMCommit_-40_FailNow": + if target.Shard != "-40" { + return false, nil + } + log.Errorf("Fail During RM commit") + // commit decision is made. Transaction should be a committed. + return true, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "Fail During RM commit") + default: + return false, nil + } +} + // Rollback rolls back the current transaction. There are no retries on this operation. func (txc *TxConn) Rollback(ctx context.Context, session *SafeSession) error { if !session.InTransaction() { @@ -343,39 +412,49 @@ func (txc *TxConn) ReleaseAll(ctx context.Context, session *SafeSession) error { }) } -// Resolve resolves the specified 2PC transaction. -func (txc *TxConn) Resolve(ctx context.Context, dtid string) error { - mmShard, err := dtids.ShardSession(dtid) +// ResolveTransactions fetches all unresolved transactions and resolves them. +func (txc *TxConn) ResolveTransactions(ctx context.Context, target *querypb.Target) error { + transactions, err := txc.tabletGateway.UnresolvedTransactions(ctx, target) if err != nil { return err } - transaction, err := txc.tabletGateway.ReadTransaction(ctx, mmShard.Target, dtid) - if err != nil { - return err + failedResolution := 0 + for _, txRecord := range transactions { + log.Infof("Resolving transaction ID: %s", txRecord.Dtid) + err = txc.resolveTx(ctx, target, txRecord) + if err != nil { + failedResolution++ + log.Errorf("Failed to resolve transaction ID: %s with error: %v", txRecord.Dtid, err) + } } - if transaction == nil || transaction.Dtid == "" { - // It was already resolved. + if failedResolution == 0 { return nil } + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to resolve %d out of %d transactions", failedResolution, len(transactions)) +} + +// resolveTx resolves the specified distributed transaction. +func (txc *TxConn) resolveTx(ctx context.Context, target *querypb.Target, transaction *querypb.TransactionMetadata) error { + mmShard, err := dtids.ShardSession(transaction.Dtid) + if err != nil { + return err + } + switch transaction.State { case querypb.TransactionState_PREPARE: // If state is PREPARE, make a decision to rollback and // fallthrough to the rollback workflow. - qs, err := txc.queryService(ctx, mmShard.TabletAlias) - if err != nil { - return err - } - if err := qs.SetRollback(ctx, mmShard.Target, transaction.Dtid, mmShard.TransactionId); err != nil { + if err := txc.tabletGateway.SetRollback(ctx, target, transaction.Dtid, mmShard.TransactionId); err != nil { return err } fallthrough case querypb.TransactionState_ROLLBACK: - if err := txc.resumeRollback(ctx, mmShard.Target, transaction); err != nil { + if err := txc.resumeRollback(ctx, target, transaction); err != nil { return err } case querypb.TransactionState_COMMIT: - if err := txc.resumeCommit(ctx, mmShard.Target, transaction); err != nil { + if err := txc.resumeCommit(ctx, target, transaction); err != nil { return err } default: @@ -385,6 +464,25 @@ func (txc *TxConn) Resolve(ctx context.Context, dtid string) error { return nil } +// rollbackTx rollbacks the specified distributed transaction. +func (txc *TxConn) rollbackTx(ctx context.Context, dtid string, mmShard *vtgatepb.Session_ShardSession, participants []*vtgatepb.Session_ShardSession, logging *executeLogger) error { + qs, err := txc.queryService(ctx, mmShard.TabletAlias) + if err != nil { + return err + } + if err := qs.SetRollback(ctx, mmShard.Target, dtid, mmShard.TransactionId); err != nil { + return err + } + err = txc.runSessions(ctx, participants, logging, func(ctx context.Context, session *vtgatepb.Session_ShardSession, logger *executeLogger) error { + return txc.tabletGateway.RollbackPrepared(ctx, session.Target, dtid, session.TransactionId) + }) + if err != nil { + return err + } + return txc.tabletGateway.ConcludeTransaction(ctx, mmShard.Target, dtid) + +} + func (txc *TxConn) resumeRollback(ctx context.Context, target *querypb.Target, transaction *querypb.TransactionMetadata) error { err := txc.runTargets(transaction.Participants, func(t *querypb.Target) error { return txc.tabletGateway.RollbackPrepared(ctx, t, transaction.Dtid, 0) diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index 4d77ea16c92..74329153936 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -20,25 +20,24 @@ import ( "context" "fmt" "strconv" + "strings" "testing" "github.com/stretchr/testify/assert" - - "vitess.io/vitess/go/test/utils" - "github.com/stretchr/testify/require" + "vitess.io/vitess/go/event/syslogger" "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/srvtopo" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/sandboxconn" - querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/sandboxconn" ) var queries = []*querypb.BoundQuery{{Sql: "query1"}} @@ -1024,9 +1023,15 @@ func TestTxConnCommit2PCPrepareFail(t *testing.T) { assert.Contains(t, err.Error(), want, "Commit") assert.EqualValues(t, 1, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount") assert.EqualValues(t, 1, sbc1.PrepareCount.Load(), "sbc1.PrepareCount") + // Prepared failed on RM, so no commit on MM or RMs. assert.EqualValues(t, 0, sbc0.StartCommitCount.Load(), "sbc0.StartCommitCount") assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount") - assert.EqualValues(t, 0, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount") + // rollback original transaction on MM + assert.EqualValues(t, 1, sbc0.SetRollbackCount.Load(), "sbc0.SetRollbackCount") + // rollback prepare transaction on RM + assert.EqualValues(t, 1, sbc1.RollbackPreparedCount.Load(), "sbc1.RollbackPreparedCount") + // conclude the transaction. + assert.EqualValues(t, 1, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount") } func TestTxConnCommit2PCStartCommitFail(t *testing.T) { @@ -1179,14 +1184,21 @@ func TestTxConnReservedRollbackFailure(t *testing.T) { assert.EqualValues(t, 1, sbc1.ReleaseCount.Load(), "sbc1.ReleaseCount") } +func getMMTarget() *querypb.Target { + return &querypb.Target{ + Keyspace: "TestTxConn", + Shard: "0", + TabletType: topodatapb.TabletType_PRIMARY, + } +} + func TestTxConnResolveOnPrepare(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, sbc1, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - dtid := "TestTxConn:0:1234" - sbc0.ReadTransactionResults = []*querypb.TransactionMetadata{{ - Dtid: dtid, + sbc0.UnresolvedTransactionsResult = []*querypb.TransactionMetadata{{ + Dtid: "TestTxConn:0:1234", State: querypb.TransactionState_PREPARE, Participants: []*querypb.Target{{ Keyspace: "TestTxConn", @@ -1194,8 +1206,9 @@ func TestTxConnResolveOnPrepare(t *testing.T) { TabletType: topodatapb.TabletType_PRIMARY, }}, }} - err := sc.txConn.Resolve(ctx, dtid) + err := sc.txConn.ResolveTransactions(ctx, getMMTarget()) require.NoError(t, err) + assert.EqualValues(t, 1, sbc0.UnresolvedTransactionsCount.Load(), "sbc0.UnresolvedTransactionsCount") assert.EqualValues(t, 1, sbc0.SetRollbackCount.Load(), "sbc0.SetRollbackCount") assert.EqualValues(t, 1, sbc1.RollbackPreparedCount.Load(), "sbc1.RollbackPreparedCount") assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount") @@ -1207,9 +1220,8 @@ func TestTxConnResolveOnRollback(t *testing.T) { sc, sbc0, sbc1, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - dtid := "TestTxConn:0:1234" - sbc0.ReadTransactionResults = []*querypb.TransactionMetadata{{ - Dtid: dtid, + sbc0.UnresolvedTransactionsResult = []*querypb.TransactionMetadata{{ + Dtid: "TestTxConn:0:1234", State: querypb.TransactionState_ROLLBACK, Participants: []*querypb.Target{{ Keyspace: "TestTxConn", @@ -1217,8 +1229,9 @@ func TestTxConnResolveOnRollback(t *testing.T) { TabletType: topodatapb.TabletType_PRIMARY, }}, }} - require.NoError(t, - sc.txConn.Resolve(ctx, dtid)) + err := sc.txConn.ResolveTransactions(ctx, getMMTarget()) + require.NoError(t, err) + assert.EqualValues(t, 1, sbc0.UnresolvedTransactionsCount.Load(), "sbc0.UnresolvedTransactionsCount") assert.EqualValues(t, 0, sbc0.SetRollbackCount.Load(), "sbc0.SetRollbackCount") assert.EqualValues(t, 1, sbc1.RollbackPreparedCount.Load(), "sbc1.RollbackPreparedCount") assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount") @@ -1230,9 +1243,8 @@ func TestTxConnResolveOnCommit(t *testing.T) { sc, sbc0, sbc1, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - dtid := "TestTxConn:0:1234" - sbc0.ReadTransactionResults = []*querypb.TransactionMetadata{{ - Dtid: dtid, + sbc0.UnresolvedTransactionsResult = []*querypb.TransactionMetadata{{ + Dtid: "TestTxConn:0:1234", State: querypb.TransactionState_COMMIT, Participants: []*querypb.Target{{ Keyspace: "TestTxConn", @@ -1240,35 +1252,45 @@ func TestTxConnResolveOnCommit(t *testing.T) { TabletType: topodatapb.TabletType_PRIMARY, }}, }} - require.NoError(t, - sc.txConn.Resolve(ctx, dtid)) + err := sc.txConn.ResolveTransactions(ctx, getMMTarget()) + require.NoError(t, err) + assert.EqualValues(t, 1, sbc0.UnresolvedTransactionsCount.Load(), "sbc0.UnresolvedTransactionsCount") assert.EqualValues(t, 0, sbc0.SetRollbackCount.Load(), "sbc0.SetRollbackCount") assert.EqualValues(t, 0, sbc1.RollbackPreparedCount.Load(), "sbc1.RollbackPreparedCount") assert.EqualValues(t, 1, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount") assert.EqualValues(t, 1, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount") } -func TestTxConnResolveInvalidDTID(t *testing.T) { +func TestTxConnUnresolvedTransactionsFail(t *testing.T) { ctx := utils.LeakCheckContext(t) - sc, _, _, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") + sc, sbc0, _, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - err := sc.txConn.Resolve(ctx, "abcd") - want := "invalid parts in dtid: abcd" - require.EqualError(t, err, want, "Resolve") + sbc0.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 + err := sc.txConn.ResolveTransactions(ctx, getMMTarget()) + require.ErrorContains(t, err, "target: TestTxConn.0.primary: INVALID_ARGUMENT error") } -func TestTxConnResolveReadTransactionFail(t *testing.T) { +func TestTxConnResolveInvalidDTID(t *testing.T) { ctx := utils.LeakCheckContext(t) sc, sbc0, _, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - dtid := "TestTxConn:0:1234" - sbc0.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 - err := sc.txConn.Resolve(ctx, dtid) - want := "INVALID_ARGUMENT error" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Resolve") + tl := syslogger.NewTestLogger() + defer tl.Close() + + sbc0.UnresolvedTransactionsResult = []*querypb.TransactionMetadata{{ + Dtid: "abcd", + State: querypb.TransactionState_COMMIT, + Participants: []*querypb.Target{{ + Keyspace: "TestTxConn", + Shard: "1", + TabletType: topodatapb.TabletType_PRIMARY, + }}, + }} + err := sc.txConn.ResolveTransactions(ctx, getMMTarget()) + require.ErrorContains(t, err, "failed to resolve 1 out of 1 transactions") + require.Contains(t, strings.Join(tl.GetAllLogs(), "|"), "invalid parts in dtid: abcd") } func TestTxConnResolveInternalError(t *testing.T) { @@ -1276,9 +1298,11 @@ func TestTxConnResolveInternalError(t *testing.T) { sc, sbc0, _, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - dtid := "TestTxConn:0:1234" - sbc0.ReadTransactionResults = []*querypb.TransactionMetadata{{ - Dtid: dtid, + tl := syslogger.NewTestLogger() + defer tl.Close() + + sbc0.UnresolvedTransactionsResult = []*querypb.TransactionMetadata{{ + Dtid: "TestTxConn:0:1234", State: querypb.TransactionState_UNKNOWN, Participants: []*querypb.Target{{ Keyspace: "TestTxConn", @@ -1286,10 +1310,9 @@ func TestTxConnResolveInternalError(t *testing.T) { TabletType: topodatapb.TabletType_PRIMARY, }}, }} - err := sc.txConn.Resolve(ctx, dtid) - want := "invalid state: UNKNOWN" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Resolve") + err := sc.txConn.ResolveTransactions(ctx, getMMTarget()) + require.ErrorContains(t, err, "failed to resolve 1 out of 1 transactions") + require.Contains(t, strings.Join(tl.GetAllLogs(), "|"), "invalid state: UNKNOWN") } func TestTxConnResolveSetRollbackFail(t *testing.T) { @@ -1297,9 +1320,11 @@ func TestTxConnResolveSetRollbackFail(t *testing.T) { sc, sbc0, sbc1, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - dtid := "TestTxConn:0:1234" - sbc0.ReadTransactionResults = []*querypb.TransactionMetadata{{ - Dtid: dtid, + tl := syslogger.NewTestLogger() + defer tl.Close() + + sbc0.UnresolvedTransactionsResult = []*querypb.TransactionMetadata{{ + Dtid: "TestTxConn:0:1234", State: querypb.TransactionState_PREPARE, Participants: []*querypb.Target{{ Keyspace: "TestTxConn", @@ -1308,10 +1333,11 @@ func TestTxConnResolveSetRollbackFail(t *testing.T) { }}, }} sbc0.MustFailSetRollback = 1 - err := sc.txConn.Resolve(ctx, dtid) - want := "error: err" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Resolve") + err := sc.txConn.ResolveTransactions(ctx, getMMTarget()) + require.ErrorContains(t, err, "failed to resolve 1 out of 1 transactions") + require.Contains(t, strings.Join(tl.GetAllLogs(), "|"), "error: err") + assert.EqualValues(t, 1, sbc0.UnresolvedTransactionsCount.Load(), "sbc0.UnresolvedTransactionsCount") + assert.EqualValues(t, 1, sbc0.UnresolvedTransactionsCount.Load(), "sbc0.UnresolvedTransactionsCount") assert.EqualValues(t, 1, sbc0.SetRollbackCount.Load(), "sbc0.SetRollbackCount") assert.EqualValues(t, 0, sbc1.RollbackPreparedCount.Load(), "sbc1.RollbackPreparedCount") assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount") @@ -1323,9 +1349,11 @@ func TestTxConnResolveRollbackPreparedFail(t *testing.T) { sc, sbc0, sbc1, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - dtid := "TestTxConn:0:1234" - sbc0.ReadTransactionResults = []*querypb.TransactionMetadata{{ - Dtid: dtid, + tl := syslogger.NewTestLogger() + defer tl.Close() + + sbc0.UnresolvedTransactionsResult = []*querypb.TransactionMetadata{{ + Dtid: "TestTxConn:0:1234", State: querypb.TransactionState_ROLLBACK, Participants: []*querypb.Target{{ Keyspace: "TestTxConn", @@ -1334,10 +1362,10 @@ func TestTxConnResolveRollbackPreparedFail(t *testing.T) { }}, }} sbc1.MustFailRollbackPrepared = 1 - err := sc.txConn.Resolve(ctx, dtid) - want := "error: err" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Resolve") + err := sc.txConn.ResolveTransactions(ctx, getMMTarget()) + require.ErrorContains(t, err, "failed to resolve 1 out of 1 transactions") + require.Contains(t, strings.Join(tl.GetAllLogs(), "|"), "error: err") + assert.EqualValues(t, 1, sbc0.UnresolvedTransactionsCount.Load(), "sbc0.UnresolvedTransactionsCount") assert.EqualValues(t, 0, sbc0.SetRollbackCount.Load(), "sbc0.SetRollbackCount") assert.EqualValues(t, 1, sbc1.RollbackPreparedCount.Load(), "sbc1.RollbackPreparedCount") assert.EqualValues(t, 0, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount") @@ -1349,9 +1377,11 @@ func TestTxConnResolveCommitPreparedFail(t *testing.T) { sc, sbc0, sbc1, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - dtid := "TestTxConn:0:1234" - sbc0.ReadTransactionResults = []*querypb.TransactionMetadata{{ - Dtid: dtid, + tl := syslogger.NewTestLogger() + defer tl.Close() + + sbc0.UnresolvedTransactionsResult = []*querypb.TransactionMetadata{{ + Dtid: "TestTxConn:0:1234", State: querypb.TransactionState_COMMIT, Participants: []*querypb.Target{{ Keyspace: "TestTxConn", @@ -1360,11 +1390,10 @@ func TestTxConnResolveCommitPreparedFail(t *testing.T) { }}, }} sbc1.MustFailCommitPrepared = 1 - err := sc.txConn.Resolve(ctx, dtid) - want := "error: err" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Resolve") - assert.EqualValues(t, 0, sbc0.SetRollbackCount.Load(), "sbc0.SetRollbackCount") + err := sc.txConn.ResolveTransactions(ctx, getMMTarget()) + require.ErrorContains(t, err, "failed to resolve 1 out of 1 transactions") + require.Contains(t, strings.Join(tl.GetAllLogs(), "|"), "error: err") + assert.EqualValues(t, 1, sbc0.UnresolvedTransactionsCount.Load(), "sbc0.UnresolvedTransactionsCount") assert.EqualValues(t, 0, sbc1.RollbackPreparedCount.Load(), "sbc1.RollbackPreparedCount") assert.EqualValues(t, 1, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount") assert.EqualValues(t, 0, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount") @@ -1375,9 +1404,11 @@ func TestTxConnResolveConcludeTransactionFail(t *testing.T) { sc, sbc0, sbc1, _, _, _ := newTestTxConnEnv(t, ctx, "TestTxConn") - dtid := "TestTxConn:0:1234" - sbc0.ReadTransactionResults = []*querypb.TransactionMetadata{{ - Dtid: dtid, + tl := syslogger.NewTestLogger() + defer tl.Close() + + sbc0.UnresolvedTransactionsResult = []*querypb.TransactionMetadata{{ + Dtid: "TestTxConn:0:1234", State: querypb.TransactionState_COMMIT, Participants: []*querypb.Target{{ Keyspace: "TestTxConn", @@ -1386,10 +1417,10 @@ func TestTxConnResolveConcludeTransactionFail(t *testing.T) { }}, }} sbc0.MustFailConcludeTransaction = 1 - err := sc.txConn.Resolve(ctx, dtid) - want := "error: err" - require.Error(t, err) - assert.Contains(t, err.Error(), want, "Resolve") + err := sc.txConn.ResolveTransactions(ctx, getMMTarget()) + require.ErrorContains(t, err, "failed to resolve 1 out of 1 transactions") + require.Contains(t, strings.Join(tl.GetAllLogs(), "|"), "error: err") + assert.EqualValues(t, 1, sbc0.UnresolvedTransactionsCount.Load(), "sbc0.UnresolvedTransactionsCount") assert.EqualValues(t, 0, sbc0.SetRollbackCount.Load(), "sbc0.SetRollbackCount") assert.EqualValues(t, 0, sbc1.RollbackPreparedCount.Load(), "sbc1.RollbackPreparedCount") assert.EqualValues(t, 1, sbc1.CommitPreparedCount.Load(), "sbc1.CommitPreparedCount") diff --git a/go/vt/vtgate/txresolver/tx_resolver.go b/go/vt/vtgate/txresolver/tx_resolver.go new file mode 100644 index 00000000000..3c96c7bc836 --- /dev/null +++ b/go/vt/vtgate/txresolver/tx_resolver.go @@ -0,0 +1,104 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package txresolver + +import ( + "context" + "sync" + + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +type TxResolver struct { + ch chan *discovery.TabletHealth + cancel context.CancelFunc + + txConn TxConnection + + mu sync.Mutex + resolve map[string]bool +} + +type TxConnection interface { + ResolveTransactions(ctx context.Context, target *querypb.Target) error +} + +func NewTxResolver(ch chan *discovery.TabletHealth, txConn TxConnection) *TxResolver { + return &TxResolver{ + ch: ch, + txConn: txConn, + resolve: make(map[string]bool), + } +} + +func (tr *TxResolver) Start() { + ctx, cancel := context.WithCancel(context.Background()) + tr.cancel = cancel + + go func() { + for { + select { + case <-ctx.Done(): + return + case th := <-tr.ch: + if th.Stats != nil && th.Target.TabletType == topodatapb.TabletType_PRIMARY && th.Stats.TxUnresolved { + go tr.resolveTransactions(ctx, th.Target) + } + } + } + }() +} + +func (tr *TxResolver) Stop() { + if tr.cancel != nil { + log.Info("Stopping transaction resolver") + tr.cancel() + } +} + +func (tr *TxResolver) resolveTransactions(ctx context.Context, target *querypb.Target) { + dest := target.Keyspace + ":" + target.Shard + if !tr.tryLockTarget(dest) { + return + } + log.Infof("resolving transactions for shard: %s", dest) + + defer func() { + tr.mu.Lock() + delete(tr.resolve, dest) + tr.mu.Unlock() + }() + err := tr.txConn.ResolveTransactions(ctx, target) + if err != nil { + log.Errorf("failed to resolve transactions for shard: %s, %v", dest, err) + return + } + log.Infof("successfully resolved all the transactions for shard: %s", dest) +} + +func (tr *TxResolver) tryLockTarget(dest string) bool { + tr.mu.Lock() + defer tr.mu.Unlock() + if tr.resolve[dest] { + return false + } + tr.resolve[dest] = true + return true +} diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 9ea5da7a7e3..7d28c0e9697 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -30,8 +30,6 @@ import ( "github.com/spf13/pflag" - "vitess.io/vitess/go/vt/vtenv" - "vitess.io/vitess/go/acl" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" @@ -51,9 +49,11 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" vtschema "vitess.io/vitess/go/vt/vtgate/schema" + "vitess.io/vitess/go/vt/vtgate/txresolver" "vitess.io/vitess/go/vt/vtgate/vtgateservice" ) @@ -284,6 +284,8 @@ func Init( tc := NewTxConn(gw, getTxMode()) // ScatterConn depends on TxConn to perform forced rollbacks. sc := NewScatterConn("VttabletCall", tc, gw) + // TxResolver depends on TxConn to complete distributed transaction. + tr := txresolver.NewTxResolver(gw.hc.Subscribe(), tc) srvResolver := srvtopo.NewResolver(serv, gw, cell) resolver := NewResolver(srvResolver, serv, cell, sc) vsm := newVStreamManager(srvResolver, serv, cell) @@ -360,6 +362,7 @@ func Init( if st != nil && enableSchemaChangeSignal { st.Start() } + tr.Start() srv := initMySQLProtocol(vtgateInst) if srv != nil { servenv.OnTermSync(srv.shutdownMysqlProtocolAndDrain) @@ -370,6 +373,7 @@ func Init( if st != nil && enableSchemaChangeSignal { st.Stop() } + tr.Stop() }) vtgateInst.registerDebugHealthHandler() vtgateInst.registerDebugEnvHandler() @@ -552,11 +556,6 @@ func (vtg *VTGate) CloseSession(ctx context.Context, session *vtgatepb.Session) return vtg.executor.CloseSession(ctx, NewSafeSession(session)) } -// ResolveTransaction resolves the specified 2PC transaction. -func (vtg *VTGate) ResolveTransaction(ctx context.Context, dtid string) error { - return formatError(vtg.txConn.Resolve(ctx, dtid)) -} - // Prepare supports non-streaming prepare statement query with multi shards func (vtg *VTGate) Prepare(ctx context.Context, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (newSession *vtgatepb.Session, fld []*querypb.Field, err error) { // In this context, we don't care if we can't fully parse destination diff --git a/go/vt/vtgate/vtgateconn/vtgateconn.go b/go/vt/vtgate/vtgateconn/vtgateconn.go index ae0da3fdf43..38899550c1d 100644 --- a/go/vt/vtgate/vtgateconn/vtgateconn.go +++ b/go/vt/vtgate/vtgateconn/vtgateconn.go @@ -87,11 +87,6 @@ func (conn *VTGateConn) SessionFromPb(sn *vtgatepb.Session) *VTGateSession { } } -// ResolveTransaction resolves the 2pc transaction. -func (conn *VTGateConn) ResolveTransaction(ctx context.Context, dtid string) error { - return conn.impl.ResolveTransaction(ctx, dtid) -} - // Close must be called for releasing resources. func (conn *VTGateConn) Close() { conn.impl.Close() @@ -178,9 +173,6 @@ type Impl interface { // CloseSession closes the session provided by rolling back any active transaction. CloseSession(ctx context.Context, session *vtgatepb.Session) error - // ResolveTransaction resolves the specified 2pc transaction. - ResolveTransaction(ctx context.Context, dtid string) error - // VStream streams binlogevents VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (VStreamReader, error) diff --git a/go/vt/vtgate/vtgateservice/interface.go b/go/vt/vtgate/vtgateservice/interface.go index bbfb2b2657e..c829f553d26 100644 --- a/go/vt/vtgate/vtgateservice/interface.go +++ b/go/vt/vtgate/vtgateservice/interface.go @@ -42,9 +42,6 @@ type VTGateService interface { // but does not affect the query statistics. CloseSession(ctx context.Context, session *vtgatepb.Session) error - // 2PC support - ResolveTransaction(ctx context.Context, dtid string) error - // Update Stream methods VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags, send func([]*binlogdatapb.VEvent) error) error diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index 94453dd70e7..ad6ff558c40 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -773,7 +773,7 @@ func TestUnresolvedTransactions(t *testing.T) { client := framework.NewClient() participants := []*querypb.Target{ - {Keyspace: "ks1", Shard: "80-c0"}, + {Keyspace: "ks1", Shard: "80-c0", TabletType: topodatapb.TabletType_PRIMARY}, } err := client.CreateTransaction("dtid01", participants) require.NoError(t, err) @@ -802,16 +802,16 @@ func TestUnresolvedTransactionsOrdering(t *testing.T) { client := framework.NewClient() participants1 := []*querypb.Target{ - {Keyspace: "ks1", Shard: "c0-"}, - {Keyspace: "ks1", Shard: "80-c0"}, + {Keyspace: "ks1", Shard: "c0-", TabletType: topodatapb.TabletType_PRIMARY}, + {Keyspace: "ks1", Shard: "80-c0", TabletType: topodatapb.TabletType_PRIMARY}, } participants2 := []*querypb.Target{ - {Keyspace: "ks1", Shard: "-40"}, - {Keyspace: "ks1", Shard: "80-c0"}, + {Keyspace: "ks1", Shard: "-40", TabletType: topodatapb.TabletType_PRIMARY}, + {Keyspace: "ks1", Shard: "80-c0", TabletType: topodatapb.TabletType_PRIMARY}, } participants3 := []*querypb.Target{ - {Keyspace: "ks1", Shard: "c0-"}, - {Keyspace: "ks1", Shard: "-40"}, + {Keyspace: "ks1", Shard: "c0-", TabletType: topodatapb.TabletType_PRIMARY}, + {Keyspace: "ks1", Shard: "-40", TabletType: topodatapb.TabletType_PRIMARY}, } // prepare state err := client.CreateTransaction("dtid01", participants1) diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index e7bc3221058..3c7f4c8c445 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -26,17 +26,15 @@ import ( "time" "vitess.io/vitess/go/mysql/collations" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/queryservice" - + "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/queryservice" ) // SandboxConn satisfies the QueryService interface @@ -65,22 +63,23 @@ type SandboxConn struct { // These Count vars report how often the corresponding // functions were called. - ExecCount atomic.Int64 - BeginCount atomic.Int64 - CommitCount atomic.Int64 - RollbackCount atomic.Int64 - AsTransactionCount atomic.Int64 - PrepareCount atomic.Int64 - CommitPreparedCount atomic.Int64 - RollbackPreparedCount atomic.Int64 - CreateTransactionCount atomic.Int64 - StartCommitCount atomic.Int64 - SetRollbackCount atomic.Int64 - ConcludeTransactionCount atomic.Int64 - ReadTransactionCount atomic.Int64 - ReserveCount atomic.Int64 - ReleaseCount atomic.Int64 - GetSchemaCount atomic.Int64 + ExecCount atomic.Int64 + BeginCount atomic.Int64 + CommitCount atomic.Int64 + RollbackCount atomic.Int64 + AsTransactionCount atomic.Int64 + PrepareCount atomic.Int64 + CommitPreparedCount atomic.Int64 + RollbackPreparedCount atomic.Int64 + CreateTransactionCount atomic.Int64 + StartCommitCount atomic.Int64 + SetRollbackCount atomic.Int64 + ConcludeTransactionCount atomic.Int64 + ReadTransactionCount atomic.Int64 + UnresolvedTransactionsCount atomic.Int64 + ReserveCount atomic.Int64 + ReleaseCount atomic.Int64 + GetSchemaCount atomic.Int64 queriesRequireLocking bool queriesMu sync.Mutex @@ -102,6 +101,9 @@ type SandboxConn struct { // ReadTransactionResults is used for returning results for ReadTransaction. ReadTransactionResults []*querypb.TransactionMetadata + // UnresolvedTransactionsResult is used for returning results for UnresolvedTransactions. + UnresolvedTransactionsResult []*querypb.TransactionMetadata + MessageIDs []*querypb.Value // vstream expectations. @@ -429,8 +431,11 @@ func (sbc *SandboxConn) ReadTransaction(ctx context.Context, target *querypb.Tar // UnresolvedTransactions is part of the QueryService interface. func (sbc *SandboxConn) UnresolvedTransactions(context.Context, *querypb.Target) ([]*querypb.TransactionMetadata, error) { - // TODO implement me - panic("implement me") + sbc.UnresolvedTransactionsCount.Add(1) + if err := sbc.getError(); err != nil { + return nil, err + } + return sbc.UnresolvedTransactionsResult, nil } // BeginExecute is part of the QueryService interface. diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 3bc4d4d98b5..edf4438b8b2 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -225,6 +225,8 @@ func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error { dte.logStats.TransactionID = transactionID if transactionID != 0 { + // If the transaction is still open, it will be rolled back. + // Otherwise, it would have been rolled back by other means, like a timeout or vttablet/mysql restart. dte.te.Rollback(dte.ctx, transactionID) } diff --git a/go/vt/vttablet/tabletserver/twopc.go b/go/vt/vttablet/tabletserver/twopc.go index fff6d4b0cd8..0bdf4ac0c91 100644 --- a/go/vt/vttablet/tabletserver/twopc.go +++ b/go/vt/vttablet/tabletserver/twopc.go @@ -515,8 +515,9 @@ func (tpc *TwoPC) UnresolvedTransactions(ctx context.Context, abandonTime time.T // Add the current participant (keyspace and shard) to the transaction currentTx.Participants = append(currentTx.Participants, &querypb.Target{ - Keyspace: row[2].ToString(), - Shard: row[3].ToString(), + Keyspace: row[2].ToString(), + Shard: row[3].ToString(), + TabletType: topodatapb.TabletType_PRIMARY, }) } diff --git a/go/vt/vttablet/tabletserver/twopc_test.go b/go/vt/vttablet/tabletserver/twopc_test.go index f0fa77e0ff8..cc9e987664c 100644 --- a/go/vt/vttablet/tabletserver/twopc_test.go +++ b/go/vt/vttablet/tabletserver/twopc_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" ) @@ -429,8 +430,8 @@ func TestUnresolvedTransactions(t *testing.T) { Dtid: "dtid0", State: querypb.TransactionState_PREPARE, Participants: []*querypb.Target{ - {Keyspace: "ks01", Shard: "shard01"}, - {Keyspace: "ks01", Shard: "shard02"}, + {Keyspace: "ks01", Shard: "shard01", TabletType: topodatapb.TabletType_PRIMARY}, + {Keyspace: "ks01", Shard: "shard02", TabletType: topodatapb.TabletType_PRIMARY}, }}}, }, { name: "two unresolved transaction", @@ -445,14 +446,14 @@ func TestUnresolvedTransactions(t *testing.T) { Dtid: "dtid0", State: querypb.TransactionState_COMMIT, Participants: []*querypb.Target{ - {Keyspace: "ks01", Shard: "shard01"}, - {Keyspace: "ks01", Shard: "shard02"}, + {Keyspace: "ks01", Shard: "shard01", TabletType: topodatapb.TabletType_PRIMARY}, + {Keyspace: "ks01", Shard: "shard02", TabletType: topodatapb.TabletType_PRIMARY}, }}, { Dtid: "dtid1", State: querypb.TransactionState_ROLLBACK, Participants: []*querypb.Target{ - {Keyspace: "ks02", Shard: "shard03"}, - {Keyspace: "ks01", Shard: "shard02"}, + {Keyspace: "ks02", Shard: "shard03", TabletType: topodatapb.TabletType_PRIMARY}, + {Keyspace: "ks01", Shard: "shard02", TabletType: topodatapb.TabletType_PRIMARY}, }}, }, }} diff --git a/proto/vtgateservice.proto b/proto/vtgateservice.proto index 745302ecdad..fe6170b3ecc 100644 --- a/proto/vtgateservice.proto +++ b/proto/vtgateservice.proto @@ -48,10 +48,6 @@ service Vitess { // API group: v3 rpc StreamExecute(vtgate.StreamExecuteRequest) returns (stream vtgate.StreamExecuteResponse) {}; - // ResolveTransaction resolves a transaction. - // API group: Transactions - rpc ResolveTransaction(vtgate.ResolveTransactionRequest) returns (vtgate.ResolveTransactionResponse) {}; - // VStream streams binlog events from the requested sources. rpc VStream(vtgate.VStreamRequest) returns (stream vtgate.VStreamResponse) {}; diff --git a/test.go b/test.go index 74438cbb504..360b231e889 100755 --- a/test.go +++ b/test.go @@ -98,6 +98,7 @@ var ( dryRun = flag.Bool("dry-run", false, "For each test to be run, it will output the test attributes, but NOT run the tests. Useful while debugging changes to test.go (this file)") remoteStats = flag.String("remote-stats", "", "url to send remote stats") buildVTAdmin = flag.Bool("build-vtadmin", false, "Enable or disable VTAdmin build during 'make build'") + buildTag = flag.String("build-tag", "", "Build tag to create a custom debug build") ) var ( @@ -433,6 +434,9 @@ func main() { if !*buildVTAdmin { command.Env = append(os.Environ(), "NOVTADMINBUILD=1") } + if *buildTag != "" { + command.Env = append(command.Env, fmt.Sprintf(`EXTRA_BUILD_TAGS=%s`, *buildTag)) + } if out, err := command.CombinedOutput(); err != nil { log.Fatalf("make build failed; exit code: %d, error: %v\n%s", command.ProcessState.ExitCode(), err, out) diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index 9ead7f07963..7ad71b84040 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -122,6 +122,10 @@ var ( "vttablet_prscomplex", } + buildTag = map[string]string{ + "vtgate_transaction": "debug2PC", + } + vitessTesterMap = map[string]string{ "vtgate": "./go/test/endtoend/vtgate/vitess_tester", } @@ -158,6 +162,7 @@ type unitTest struct { type clusterTest struct { Name, Shard, Platform string FileName string + BuildTag string MemoryCheck bool MakeTools, InstallXtraBackup bool Docker bool @@ -245,8 +250,9 @@ func generateClusterWorkflows(list []string, tpl string) { for _, cluster := range clusters { for _, mysqlVersion := range clusterMySQLVersions() { test := &clusterTest{ - Name: fmt.Sprintf("Cluster (%s)", cluster), - Shard: cluster, + Name: fmt.Sprintf("Cluster (%s)", cluster), + Shard: cluster, + BuildTag: buildTag[cluster], } cores16Clusters := canonnizeList(clusterRequiring16CoresMachines) for _, cores16Cluster := range cores16Clusters { diff --git a/test/config.json b/test/config.json index 8b48ccc3ec0..1cdf92127ef 100644 --- a/test/config.json +++ b/test/config.json @@ -799,7 +799,7 @@ }, "vtgate_transaction": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction"], + "Args": ["vitess.io/vitess/go/test/endtoend/transaction"], "Command": [], "Manual": false, "Shard": "vtgate_transaction", @@ -808,7 +808,7 @@ }, "vtgate_transaction_restart": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/restart"], + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/restart"], "Command": [], "Manual": false, "Shard": "vtgate_transaction", @@ -817,7 +817,7 @@ }, "vtgate_transaction_rollback": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/rollback"], + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/rollback"], "Command": [], "Manual": false, "Shard": "vtgate_transaction", @@ -826,7 +826,7 @@ }, "vtgate_transaction_single": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/single"], + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/single"], "Command": [], "Manual": false, "Shard": "vtgate_transaction", @@ -835,7 +835,7 @@ }, "vtgate_transaction_twopc": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/twopc"], + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/twopc"], "Command": [], "Manual": false, "Shard": "vtgate_transaction", diff --git a/test/config_partial_keyspace.json b/test/config_partial_keyspace.json index 38149fe7acd..bcd445d34bc 100644 --- a/test/config_partial_keyspace.json +++ b/test/config_partial_keyspace.json @@ -29,7 +29,7 @@ }, "vtgate_transaction_partial_keyspace": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction"], + "Args": ["vitess.io/vitess/go/test/endtoend/transaction"], "Command": [], "Manual": false, "Shard": "vtgate_partial_keyspace", @@ -38,7 +38,7 @@ }, "vtgate_transaction_rollback_partial_keyspace": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/rollback"], + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/rollback"], "Command": [], "Manual": false, "Shard": "vtgate_partial_keyspace", @@ -47,7 +47,7 @@ }, "vtgate_transaction_single_partial_keyspace": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/single"], + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/single"], "Command": [], "Manual": false, "Shard": "vtgate_partial_keyspace", diff --git a/test/templates/cluster_endtoend_test.tpl b/test/templates/cluster_endtoend_test.tpl index 46078cfcc0c..78a1a4616d0 100644 --- a/test/templates/cluster_endtoend_test.tpl +++ b/test/templates/cluster_endtoend_test.tpl @@ -207,7 +207,7 @@ jobs: {{end}} # run the tests however you normally do, then produce a JUnit XML file - eatmydata -- go run test.go -docker={{if .Docker}}true -flavor={{.Platform}}{{else}}false{{end}} -follow -shard {{.Shard}}{{if .PartialKeyspace}} -partial-keyspace=true {{end}} | tee -a output.txt | go-junit-report -set-exit-code > report.xml + eatmydata -- go run test.go -docker={{if .Docker}}true -flavor={{.Platform}}{{else}}false{{end}} -follow -shard {{.Shard}}{{if .PartialKeyspace}} -partial-keyspace=true {{end}}{{if .BuildTag}} -build-tag={{.BuildTag}} {{end}} | tee -a output.txt | go-junit-report -set-exit-code > report.xml - name: Print test output and Record test result in launchable if PR is not a draft if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && always()