Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: retrieve unresolved transactions #16356

Merged
merged 5 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,516 changes: 839 additions & 677 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

433 changes: 433 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Large diffs are not rendered by default.

350 changes: 180 additions & 170 deletions go/vt/proto/queryservice/queryservice.pb.go

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions go/vt/proto/queryservice/queryservice_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,12 @@ func (itc *internalTabletConn) ReadTransaction(ctx context.Context, target *quer
return metadata, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

// UnresolvedTransactions is part of queryservice.QueryService
func (itc *internalTabletConn) UnresolvedTransactions(ctx context.Context, target *querypb.Target) (transactions []*querypb.TransactionMetadata, err error) {
transactions, err = itc.tablet.qsc.QueryService().UnresolvedTransactions(ctx, target)
return transactions, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

// BeginExecute is part of queryservice.QueryService
func (itc *internalTabletConn) BeginExecute(
ctx context.Context,
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/endtoend/framework/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ func (client *QueryClient) ReadTransaction(dtid string) (*querypb.TransactionMet
return client.server.ReadTransaction(client.ctx, client.target, dtid)
}

// UnresolvedTransactions invokes the UnresolvedTransactions API of TabletServer.
func (client *QueryClient) UnresolvedTransactions() ([]*querypb.TransactionMetadata, error) {
return client.server.UnresolvedTransactions(client.ctx, client.target)
}

// SetServingType is for testing transitions.
// It currently supports only primary->replica and back.
func (client *QueryClient) SetServingType(tabletType topodatapb.TabletType) error {
Expand Down
29 changes: 29 additions & 0 deletions go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,3 +737,32 @@ func TestManualTwopcz(t *testing.T) {
fmt.Print("Sleeping for 30 seconds\n")
time.Sleep(30 * time.Second)
}

// TestUnresolvedTransactions tests the UnresolvedTransactions API.
func TestUnresolvedTransactions(t *testing.T) {
client := framework.NewClient()

participants := []*querypb.Target{
{Keyspace: "ks1", Shard: "-80"},
{Keyspace: "ks1", Shard: "80-"},
}
err := client.CreateTransaction("dtid01", participants)
require.NoError(t, err)

// expected no transaction to show here, as 1 second not passed.
transactions, err := client.UnresolvedTransactions()
require.NoError(t, err)
require.Empty(t, transactions)

// abandon age is 1 second.
time.Sleep(2 * time.Second)

transactions, err = client.UnresolvedTransactions()
require.NoError(t, err)
want := []*querypb.TransactionMetadata{{
Dtid: "dtid01",
State: querypb.TransactionState_PREPARE,
Participants: participants,
}}
utils.MustMatch(t, want, transactions)
}
15 changes: 15 additions & 0 deletions go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,21 @@ func (q *query) ReadTransaction(ctx context.Context, request *querypb.ReadTransa
return &querypb.ReadTransactionResponse{Metadata: result}, nil
}

// UnresolvedTransactions is part of the queryservice.QueryServer interface
func (q *query) UnresolvedTransactions(ctx context.Context, request *querypb.UnresolvedTransactionsRequest) (response *querypb.UnresolvedTransactionsResponse, err error) {
defer q.server.HandlePanic(&err)
ctx = callerid.NewContext(callinfo.GRPCCallInfo(ctx),
request.EffectiveCallerId,
request.ImmediateCallerId,
)
transactions, err := q.server.UnresolvedTransactions(ctx, request.Target)
if err != nil {
return nil, vterrors.ToGRPC(err)
}

return &querypb.UnresolvedTransactionsResponse{Transactions: transactions}, nil
}

// BeginExecute is part of the queryservice.QueryServer interface
func (q *query) BeginExecute(ctx context.Context, request *querypb.BeginExecuteRequest) (response *querypb.BeginExecuteResponse, err error) {
defer q.server.HandlePanic(&err)
Expand Down
20 changes: 20 additions & 0 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,26 @@ func (conn *gRPCQueryClient) ReadTransaction(ctx context.Context, target *queryp
return response.Metadata, nil
}

// UnresolvedTransactions returns all unresolved distributed transactions.
func (conn *gRPCQueryClient) UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
return nil, tabletconn.ConnClosed
}

req := &querypb.UnresolvedTransactionsRequest{
Target: target,
EffectiveCallerId: callerid.EffectiveCallerIDFromContext(ctx),
ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx),
}
response, err := conn.c.UnresolvedTransactions(ctx, req)
if err != nil {
return nil, tabletconn.ErrorFromGRPC(err)
}
return response.Transactions, nil
}

// BeginExecute starts a transaction and runs an Execute.
func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (state queryservice.TransactionState, result *sqltypes.Result, err error) {
conn.mu.RLock()
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/queryservice/queryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type QueryService interface {
// ReadTransaction returns the metadata for the specified dtid.
ReadTransaction(ctx context.Context, target *querypb.Target, dtid string) (metadata *querypb.TransactionMetadata, err error)

// UnresolvedTransactions returns the list of unresolved distributed transactions.
UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error)

// Execute for query execution
Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (*sqltypes.Result, error)
// StreamExecute for query execution with streaming
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vttablet/queryservice/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ func (ws *wrappedService) ReadTransaction(ctx context.Context, target *querypb.T
return metadata, err
}

func (ws *wrappedService) UnresolvedTransactions(ctx context.Context, target *querypb.Target) (transactions []*querypb.TransactionMetadata, err error) {
err = ws.wrapper(ctx, target, ws.impl, "UnresolvedTransactions", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var innerErr error
transactions, innerErr = conn.UnresolvedTransactions(ctx, target)
return canRetry(ctx, innerErr), innerErr
})
return transactions, err
}

func (ws *wrappedService) Execute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (qr *sqltypes.Result, err error) {
inDedicatedConn := transactionID != 0 || reservedID != 0
err = ws.wrapper(ctx, target, ws.impl, "Execute", inDedicatedConn, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,12 @@ func (sbc *SandboxConn) ReadTransaction(ctx context.Context, target *querypb.Tar
return nil, nil
}

// UnresolvedTransactions is part of the QueryService interface.
func (sbc *SandboxConn) UnresolvedTransactions(context.Context, *querypb.Target) ([]*querypb.TransactionMetadata, error) {
// TODO implement me
panic("implement me")
}

// BeginExecute is part of the QueryService interface.
func (sbc *SandboxConn) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (queryservice.TransactionState, *sqltypes.Result, error) {
sbc.panicIfNeeded()
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,18 @@ func (f *FakeQueryService) ReadTransaction(ctx context.Context, target *querypb.
return Metadata, nil
}

// UnresolvedTransactions is part of the queryservice.QueryService interface
func (f *FakeQueryService) UnresolvedTransactions(ctx context.Context, target *querypb.Target) ([]*querypb.TransactionMetadata, error) {
if f.HasError {
return nil, f.TabletError
}
if f.Panics {
panic(fmt.Errorf("test-triggered panic"))
}
f.checkTargetCallerID(ctx, "UnresolvedTransactions", target)
return []*querypb.TransactionMetadata{Metadata}, nil
}

// ExecuteQuery is a fake test query.
const ExecuteQuery = "executeQuery"

Expand Down
30 changes: 30 additions & 0 deletions go/vt/vttablet/tabletconntest/tabletconntest.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,33 @@ func testReadTransactionPanics(t *testing.T, conn queryservice.QueryService, f *
})
}

func testUnresolvedTransactions(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testUnresolvedTransactions")
ctx := context.Background()
ctx = callerid.NewContext(ctx, TestCallerID, TestVTGateCallerID)
transactions, err := conn.UnresolvedTransactions(ctx, TestTarget)
require.NoError(t, err)
require.True(t, proto.Equal(transactions[0], Metadata))
}

func testUnresolvedTransactionsError(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testUnresolvedTransactionsError")
f.HasError = true
testErrorHelper(t, f, "UnresolvedTransactions", func(ctx context.Context) error {
_, err := conn.UnresolvedTransactions(ctx, TestTarget)
return err
})
f.HasError = false
}

func testUnresolvedTransactionsPanics(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testUnresolvedTransactionsPanics")
testPanicHelper(t, f, "UnresolvedTransactions", func(ctx context.Context) error {
_, err := conn.UnresolvedTransactions(ctx, TestTarget)
return err
})
}

func testExecute(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testExecute")
f.ExpectedTransactionID = ExecuteTransactionID
Expand Down Expand Up @@ -936,6 +963,7 @@ func TestSuite(ctx context.Context, t *testing.T, protocol string, tablet *topod
testSetRollback,
testConcludeTransaction,
testReadTransaction,
testUnresolvedTransactions,
testExecute,
testBeginExecute,
testStreamExecute,
Expand All @@ -956,6 +984,7 @@ func TestSuite(ctx context.Context, t *testing.T, protocol string, tablet *topod
testSetRollbackError,
testConcludeTransactionError,
testReadTransactionError,
testUnresolvedTransactionsError,
testExecuteError,
testBeginExecuteErrorInBegin,
testBeginExecuteErrorInExecute,
Expand All @@ -979,6 +1008,7 @@ func TestSuite(ctx context.Context, t *testing.T, protocol string, tablet *topod
testSetRollbackPanics,
testConcludeTransactionPanics,
testReadTransactionPanics,
testUnresolvedTransactionsPanics,
testExecutePanics,
testBeginExecutePanics,
testStreamExecutePanics,
Expand Down
9 changes: 6 additions & 3 deletions go/vt/vttablet/tabletserver/dt_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ import (
"context"
"time"

"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
)

// DTExecutor is used for executing a distributed transactional request.
Expand Down Expand Up @@ -290,3 +288,8 @@ func (dte *DTExecutor) inTransaction(f func(*StatefulConnection) error) error {
}
return nil
}

// UnresolvedTransactions returns the list of unresolved distributed transactions.
func (dte *DTExecutor) UnresolvedTransactions() ([]*querypb.TransactionMetadata, error) {
return dte.te.twoPC.UnresolvedTransactions(dte.ctx, time.Now().Add(-dte.te.abandonAge))
}
24 changes: 19 additions & 5 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
Expand All @@ -66,11 +70,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// logPoolFull is for throttling transaction / query pool full messages in the log.
Expand Down Expand Up @@ -747,6 +746,21 @@ func (tsv *TabletServer) ReadTransaction(ctx context.Context, target *querypb.Ta
return metadata, err
}

// UnresolvedTransactions returns the unresolved distributed transaction record.
func (tsv *TabletServer) UnresolvedTransactions(ctx context.Context, target *querypb.Target) (transactions []*querypb.TransactionMetadata, err error) {
err = tsv.execRequest(
ctx, tsv.loadQueryTimeout(),
"UnresolvedTransactions", "unresolved_transaction", nil,
target, nil, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
txe := NewDTExecutor(ctx, tsv.te, logStats)
transactions, err = txe.UnresolvedTransactions()
return err
},
)
return
}

// Execute executes the query and returns the result as response.
func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID, reservedID int64, options *querypb.ExecuteOptions) (result *sqltypes.Result, err error) {
span, ctx := trace.NewSpan(ctx, "TabletServer.Execute")
Expand Down
Loading
Loading