Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed Transaction: Action on commit prepared or redo prepared failure #16803

Merged
merged 11 commits into from
Sep 24, 2024
1 change: 1 addition & 0 deletions go/vt/sidecardb/schema/twopc/redo_state.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ CREATE TABLE IF NOT EXISTS redo_state(
dtid varbinary(512) NOT NULL,
state bigint NOT NULL,
time_created bigint NOT NULL,
message text,
primary key(dtid)
) ENGINE = InnoDB CHARSET = utf8mb4
2 changes: 1 addition & 1 deletion go/vt/vttablet/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ var tableACLConfig = `{
},
{
"name": "vitess_twopc",
"table_names_or_prefixes": ["dt_state"],
"table_names_or_prefixes": ["dt_state", "redo_state"],
"readers": ["dev"],
"writers": ["dev"],
"admins": ["dev"]
Expand Down
41 changes: 41 additions & 0 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,3 +873,44 @@ func TestUnresolvedTransactionsOrdering(t *testing.T) {
assert.Equal(t, want[i].Participants, transaction.Participants)
}
}

// TestCommitPreparedFail tests the case where the commit_prepared fails when the query is killed.
// The transaction remains in the prepare state.
func TestCommitPreparedFail(t *testing.T) {
client := framework.NewClient()
defer client.RollbackPrepared("aa", 0)

_, err := client.BeginExecute(`insert into vitess_test (intval, floatval, charval, binval) values(4, null, null, null)`, nil, nil)
require.NoError(t, err)
connRes, err := client.Execute(`select connection_id()`, nil)
require.NoError(t, err)
err = client.Prepare("aa")
require.NoError(t, err)

client2 := framework.NewClient()
_, err = client2.BeginExecute(`select * from _vt.redo_state for update`, nil, nil)
require.NoError(t, err)

ch := make(chan any)
go func() {
err = client.CommitPrepared("aa")
require.ErrorContains(t, err, "commit_prepared")
ch <- nil
}()
time.Sleep(100 * time.Millisecond)

dbaConnector := framework.Server.Config().DB.DbaWithDB()
conn, err := dbaConnector.Connect(context.Background())
require.NoError(t, err)
defer conn.Close()

_, err = conn.ExecuteFetch(fmt.Sprintf("kill query %s", connRes.Rows[0][0].ToString()), 1, false)
require.NoError(t, err)

client2.Release()
<-ch

qr, err := client.Execute("select dtid, state, message from _vt.redo_state where dtid = 'aa'", nil)
require.NoError(t, err)
require.Equal(t, `[[VARBINARY("aa") INT64(1) TEXT("Query execution was interrupted (errno 1317) (sqlstate 70100) during query: delete from _vt.redo_state where dtid = 'aa'")]]`, fmt.Sprintf("%v", qr.Rows))
}
29 changes: 1 addition & 28 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
ctx := trace.CopySpan(context.Background(), dte.ctx)
defer func() {
if err != nil {
dte.markFailed(ctx, dtid)
log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err)
dte.te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcCommit")
}
dte.te.txPool.RollbackAndRelease(ctx, conn)
}()
Expand All @@ -172,33 +172,6 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) {
return nil
}

// markFailed does the necessary work to mark a CommitPrepared
// as failed. It marks the dtid as failed in the prepared pool,
// increments the InternalErros counter, and also changes the
// state of the transaction in the redo log as failed. If the
// state change does not succeed, it just logs the event.
// The function uses the passed in context that has no timeout
// instead of DTExecutor's context.
func (dte *DTExecutor) markFailed(ctx context.Context, dtid string) {
dte.te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
dte.te.preparedPool.SetFailed(dtid)
conn, _, _, err := dte.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
}
defer dte.te.txPool.RollbackAndRelease(ctx, conn)

if err = dte.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil {
log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err)
return
}

if _, err = dte.te.txPool.Commit(ctx, conn); err != nil {
log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err)
}
}

// RollbackPrepared rolls back a prepared transaction. This function handles
// the case of an incomplete prepare.
//
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/dt_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"vitess.io/vitess/go/event/syslogger"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
Expand Down Expand Up @@ -683,6 +684,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv
db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{})
db.AddRejectedQuery("bogus", sqlerror.NewSQLError(sqlerror.ERUnknownError, sqlerror.SSUnknownSQLState, "bogus query"))
return &DTExecutor{
ctx: ctx,
logStats: logStats,
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,14 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) {
{Type: sqltypes.Uint64},
{Type: sqltypes.Uint64},
{Type: sqltypes.VarBinary},
{Type: sqltypes.Text},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("dtid0"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"),
sqltypes.NULL,
}},
})
turnOnTxEngine()
Expand All @@ -257,22 +259,26 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) {
{Type: sqltypes.Uint64},
{Type: sqltypes.Uint64},
{Type: sqltypes.VarBinary},
{Type: sqltypes.Text},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("bogus"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("bogus"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("a:b:10"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("a:b:20"),
sqltypes.NewInt64(RedoStateFailed),
sqltypes.NewVarBinary(""),
sqltypes.NewVarBinary("unused"),
sqltypes.TestValue(sqltypes.Text, "deadlock detected, transaction rolled back"),
}},
})
turnOnTxEngine()
Expand All @@ -282,7 +288,10 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) {
Sql: "update test_table set `name` = 2 where pk = 1 limit 10001",
Tables: []string{"test_table"}}}
utils.MustMatch(t, want, got, "Prepared queries")
wantFailed := map[string]error{"a:b:20": errPrepFailed}
wantFailed := map[string]error{
"bogus": errPrepFailed, // The query is rejected by database so added to failed list.
"a:b:20": errPrepFailed, // The DTID is already in failed state.
}
utils.MustMatch(t, tsv.te.preparedPool.reserved, wantFailed, fmt.Sprintf("Failed dtids: %v, want %v", tsv.te.preparedPool.reserved, wantFailed))
// Verify last id got adjusted.
assert.EqualValues(t, 20, tsv.te.txPool.scp.lastID.Load(), "tsv.te.txPool.lastID.Get()")
Expand Down
18 changes: 10 additions & 8 deletions go/vt/vttablet/tabletserver/twopc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
// DTStateRollback represents the ROLLBACK state for dt_state.
DTStateRollback = querypb.TransactionState_ROLLBACK

readAllRedo = `select t.dtid, t.state, t.time_created, s.statement
readAllRedo = `select t.dtid, t.state, t.time_created, s.statement, t.message
from %s.redo_state t
join %s.redo_statement s on t.dtid = s.dtid
order by t.dtid, s.id`
Expand Down Expand Up @@ -109,8 +109,8 @@ func (tpc *TwoPC) initializeQueries() {
"insert into %s.redo_statement(dtid, id, statement) values %a",
dbname, ":vals")
tpc.updateRedoTx = sqlparser.BuildParsedQuery(
"update %s.redo_state set state = %a where dtid = %a",
dbname, ":state", ":dtid")
"update %s.redo_state set state = %a, message = %a where dtid = %a",
dbname, ":state", ":message", ":dtid")
tpc.deleteRedoTx = sqlparser.BuildParsedQuery(
"delete from %s.redo_state where dtid = %a",
dbname, ":dtid")
Expand Down Expand Up @@ -200,10 +200,11 @@ func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid s
}

// UpdateRedo changes the state of the redo log for the dtid.
func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *StatefulConnection, dtid string, state int) error {
func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *StatefulConnection, dtid string, state int, message string) error {
bindVars := map[string]*querypb.BindVariable{
"dtid": sqltypes.StringBindVariable(dtid),
"state": sqltypes.Int64BindVariable(int64(state)),
"dtid": sqltypes.StringBindVariable(dtid),
"state": sqltypes.Int64BindVariable(int64(state)),
"message": sqltypes.StringBindVariable(message),
}
_, err := tpc.exec(ctx, conn, tpc.updateRedoTx, bindVars)
return err
Expand Down Expand Up @@ -244,8 +245,9 @@ func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared, failed []*tx.Prepa
// which is harmless.
tm, _ := row[2].ToCastInt64()
curTx = &tx.PreparedTx{
Dtid: dtid,
Time: time.Unix(0, tm),
Dtid: dtid,
Time: time.Unix(0, tm),
Message: row[4].ToString(),
}
st, err := row[1].ToCastInt64()
if err != nil {
Expand Down
32 changes: 23 additions & 9 deletions go/vt/vttablet/tabletserver/twopc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ func TestReadAllRedo(t *testing.T) {
{Type: sqltypes.Int64},
{Type: sqltypes.Int64},
{Type: sqltypes.VarChar},
{Type: sqltypes.Text},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("dtid0"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt01"),
sqltypes.NULL,
}},
})
prepared, failed, err = tpc.ReadAllRedo(ctx)
Expand All @@ -96,17 +98,20 @@ func TestReadAllRedo(t *testing.T) {
{Type: sqltypes.Int64},
{Type: sqltypes.Int64},
{Type: sqltypes.VarChar},
{Type: sqltypes.Text},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("dtid0"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt01"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("dtid0"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt02"),
sqltypes.NULL,
}},
})
prepared, failed, err = tpc.ReadAllRedo(ctx)
Expand All @@ -131,22 +136,26 @@ func TestReadAllRedo(t *testing.T) {
{Type: sqltypes.Int64},
{Type: sqltypes.Int64},
{Type: sqltypes.VarChar},
{Type: sqltypes.Text},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("dtid0"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt01"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("dtid0"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt02"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("dtid1"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt11"),
sqltypes.NULL,
}},
})
prepared, failed, err = tpc.ReadAllRedo(ctx)
Expand Down Expand Up @@ -175,37 +184,44 @@ func TestReadAllRedo(t *testing.T) {
{Type: sqltypes.Int64},
{Type: sqltypes.Int64},
{Type: sqltypes.VarChar},
{Type: sqltypes.Text},
},
Rows: [][]sqltypes.Value{{
sqltypes.NewVarBinary("dtid0"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt01"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("dtid0"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt02"),
sqltypes.NULL,
}, {
sqltypes.NewVarBinary("dtid1"),
sqltypes.NewVarBinary("Failed"),
sqltypes.NewInt64(RedoStateFailed),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt11"),
sqltypes.TestValue(sqltypes.Text, "error1"),
}, {
sqltypes.NewVarBinary("dtid2"),
sqltypes.NewVarBinary("Failed"),
sqltypes.NewInt64(RedoStateFailed),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt21"),
sqltypes.TestValue(sqltypes.Text, "error2"),
}, {
sqltypes.NewVarBinary("dtid2"),
sqltypes.NewVarBinary("Failed"),
sqltypes.NewInt64(RedoStateFailed),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt22"),
sqltypes.TestValue(sqltypes.Text, "error2"),
}, {
sqltypes.NewVarBinary("dtid3"),
sqltypes.NewInt64(RedoStatePrepared),
sqltypes.NewVarBinary("1"),
sqltypes.NewVarBinary("stmt31"),
sqltypes.NULL,
}},
})
prepared, failed, err = tpc.ReadAllRedo(ctx)
Expand All @@ -221,21 +237,19 @@ func TestReadAllRedo(t *testing.T) {
Queries: []string{"stmt31"},
Time: time.Unix(0, 1),
}}
if !reflect.DeepEqual(prepared, want) {
t.Errorf("ReadAllRedo: %s, want %s", jsonStr(prepared), jsonStr(want))
}
utils.MustMatch(t, want, prepared)
wantFailed := []*tx.PreparedTx{{
Dtid: "dtid1",
Queries: []string{"stmt11"},
Time: time.Unix(0, 1),
Message: "error1",
}, {
Dtid: "dtid2",
Queries: []string{"stmt21", "stmt22"},
Time: time.Unix(0, 1),
Message: "error2",
}}
if !reflect.DeepEqual(failed, wantFailed) {
t.Errorf("ReadAllRedo failed): %s, want %s", jsonStr(failed), jsonStr(wantFailed))
}
utils.MustMatch(t, wantFailed, failed)
}

func TestReadAllTransactions(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/tx/twopc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ type PreparedTx struct {
Dtid string
Queries []string
Time time.Time
Message string
}
Loading
Loading