diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index 53c1780f373..5aab1f5a2e2 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -964,6 +964,72 @@ func testWarningAndTransactionStatus(t *testing.T, conn *vtgateconn.VTGateSessio } } +// TestReadingUnresolvedTransactions tests the reading of unresolved transactions +func TestReadingUnresolvedTransactions(t *testing.T) { + testcases := []struct { + name string + queries []string + }{ + { + name: "show transaction status for explicit keyspace", + queries: []string{ + fmt.Sprintf("show unresolved transactions for %v", keyspaceName), + }, + }, + { + name: "show transaction status with use command", + queries: []string{ + fmt.Sprintf("use %v", keyspaceName), + "show unresolved transactions", + }, + }, + } + for _, testcase := range testcases { + t.Run(testcase.name, func(t *testing.T) { + conn, closer := start(t) + defer closer() + // Start an atomic transaction. + utils.Exec(t, conn, "begin") + // Insert rows such that they go to all the three shards. Given that we have sharded the table `twopc_t1` on reverse_bits + // it is very easy to figure out what value will end up in which shard. + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(4, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(6, 4)") + utils.Exec(t, conn, "insert into twopc_t1(id, col) values(9, 4)") + // We want to delay the commit on one of the shards to simulate slow commits on a shard. + writeTestCommunicationFile(t, DebugDelayCommitShard, "80-") + defer deleteFile(DebugDelayCommitShard) + writeTestCommunicationFile(t, DebugDelayCommitTime, "5") + defer deleteFile(DebugDelayCommitTime) + // We will execute a commit in a go routine, because we know it will take some time to complete. + // While the commit is ongoing, we would like to check that we see the unresolved transaction. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, err := utils.ExecAllowError(t, conn, "commit") + if err != nil { + log.Errorf("Error in commit - %v", err) + } + }() + // Allow enough time for the commit to have started. + time.Sleep(1 * time.Second) + var lastRes *sqltypes.Result + newConn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer newConn.Close() + for _, query := range testcase.queries { + lastRes = utils.Exec(t, newConn, query) + } + require.NotNil(t, lastRes) + require.Len(t, lastRes.Rows, 1) + // This verifies that we already decided to commit the transaction, but it is still unresolved. + assert.Contains(t, fmt.Sprintf("%v", lastRes.Rows), `VARCHAR("COMMIT")`) + // Wait for the commit to have returned. + wg.Wait() + }) + } +} + // TestDisruptions tests that atomic transactions persevere through various disruptions. func TestDisruptions(t *testing.T) { testcases := []struct {