From 8c9b33b42e8245c86365742ddbf14897180540f4 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 30 Jun 2023 12:43:27 -0400 Subject: [PATCH] stash commit Signed-off-by: Peter Broadhurst --- pkg/apitypes/managed_tx.go | 2 + pkg/fftm/route_post_transaction_suspend.go | 45 +++++++++++++ .../route_post_transaction_suspend_test.go | 67 +++++++++++++++++++ pkg/fftm/transaction_management.go | 12 ++++ pkg/txhandler/simple/policyloop.go | 42 ++++++++---- pkg/txhandler/simple/policyloop_test.go | 51 +++++++++++++- .../simple/simple_transaction_handler.go | 10 +++ pkg/txhandler/txhandler.go | 2 + 8 files changed, 218 insertions(+), 13 deletions(-) create mode 100644 pkg/fftm/route_post_transaction_suspend.go create mode 100644 pkg/fftm/route_post_transaction_suspend_test.go diff --git a/pkg/apitypes/managed_tx.go b/pkg/apitypes/managed_tx.go index 464b5e6f..7479083e 100644 --- a/pkg/apitypes/managed_tx.go +++ b/pkg/apitypes/managed_tx.go @@ -34,6 +34,8 @@ const ( TxStatusSucceeded TxStatus = "Succeeded" // TxStatusFailed happens when an error is reported by the infrastructure runtime TxStatusFailed TxStatus = "Failed" + // TxStatusSuspended indicates we are not actively doing any work with this transaction right now, until it's resumed to pending again + TxStatusSuspended TxStatus = "Suspended" ) // TxSubStatus is an intermediate status a transaction may go through diff --git a/pkg/fftm/route_post_transaction_suspend.go b/pkg/fftm/route_post_transaction_suspend.go new file mode 100644 index 00000000..c7848bd7 --- /dev/null +++ b/pkg/fftm/route_post_transaction_suspend.go @@ -0,0 +1,45 @@ +// Copyright © 2023 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftm + +import ( + "net/http" + + "github.com/hyperledger/firefly-common/pkg/ffapi" + "github.com/hyperledger/firefly-transaction-manager/internal/tmmsgs" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" +) + +var postTransactionSuspend = func(m *manager) *ffapi.Route { + return &ffapi.Route{ + Name: "postTransactionSuspend", + Path: "/transactions/{transactionId}", + Method: http.MethodDelete, + PathParams: []*ffapi.PathParam{ + {Name: "transactionId", Description: tmmsgs.APIParamTransactionID}, + }, + QueryParams: nil, + Description: tmmsgs.APIEndpointPostTransactionSuspend, + JSONInputValue: nil, + JSONOutputValue: func() interface{} { return &apitypes.ManagedTX{} }, + JSONOutputCodes: []int{http.StatusOK, http.StatusAccepted}, + JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { + r.SuccessStatus, output, err = m.requestTransactionDeletion(r.Req.Context(), r.PP["transactionId"]) + return output, err + }, + } +} diff --git a/pkg/fftm/route_post_transaction_suspend_test.go b/pkg/fftm/route_post_transaction_suspend_test.go new file mode 100644 index 00000000..3025ba77 --- /dev/null +++ b/pkg/fftm/route_post_transaction_suspend_test.go @@ -0,0 +1,67 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftm + +import ( + "fmt" + "testing" + + "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly-transaction-manager/mocks/txhandlermocks" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestPostTransactionSuspend(t *testing.T) { + + url, m, done := newTestManager(t) + defer done() + tx := newTestTxn(t, m, "0x0aaaaa", 10001, apitypes.TxStatusSucceeded) + txID := tx.ID + + err := m.Start() + assert.NoError(t, err) + + var txOut *apitypes.ManagedTX + emptyInput := map[string]interface{}{} + res, err := resty.New().R(). + SetResult(&txOut). + Body(emptyInput). + Post(fmt.Sprintf("%s/transactions/%s/suspend", url, txID)) + assert.NoError(t, err) + assert.Equal(t, 202, res.StatusCode()) + assert.Equal(t, txID, txOut.ID) +} + +func TestPostTransactionSuspendFailed(t *testing.T) { + url, m, done := newTestManager(t) + defer done() + + err := m.Start() + assert.NoError(t, err) + mth := txhandlermocks.TransactionHandler{} + mth.On("HandleCancelTransaction", mock.Anything, "1234").Return(nil, fmt.Errorf("error")).Once() + m.txHandler = &mth + + var txOut *apitypes.ManagedTX + res, err := resty.New().R(). + SetResult(&txOut). + Delete(fmt.Sprintf("%s/transactions/%s", url, "1234")) + assert.NoError(t, err) + assert.Equal(t, 500, res.StatusCode()) +} diff --git a/pkg/fftm/transaction_management.go b/pkg/fftm/transaction_management.go index 665fadd8..2059895b 100644 --- a/pkg/fftm/transaction_management.go +++ b/pkg/fftm/transaction_management.go @@ -96,3 +96,15 @@ func (m *manager) requestTransactionDeletion(ctx context.Context, txID string) ( return http.StatusAccepted, canceledTx, nil } + +func (m *manager) requestTransactionSuspend(ctx context.Context, txID string) (status int, transaction *apitypes.ManagedTX, err error) { + + canceledTx, err := m.txHandler.HandleSuspendTransaction(ctx, txID) + + if err != nil { + return http.StatusInternalServerError, nil, err + } + + return http.StatusAccepted, canceledTx, nil + +} diff --git a/pkg/txhandler/simple/policyloop.go b/pkg/txhandler/simple/policyloop.go index ed697344..eef81c67 100644 --- a/pkg/txhandler/simple/policyloop.go +++ b/pkg/txhandler/simple/policyloop.go @@ -40,6 +40,7 @@ type policyEngineAPIRequestType int const ( policyEngineAPIRequestTypeDelete policyEngineAPIRequestType = iota + policyEngineAPIRequestTypeSuspend ) type policyEngineAPIRequest struct { @@ -170,7 +171,7 @@ func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflig // Go through executing the policy engine against them for _, pending := range sth.inflight { - err := sth.execPolicy(ctx, pending, false) + err := sth.execPolicy(ctx, pending) if err != nil { log.L(ctx).Errorf("Failed policy cycle transaction=%s operation=%s: %s", pending.mtx.TransactionHash, pending.mtx.ID, err) } @@ -220,8 +221,8 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex } switch request.requestType { - case policyEngineAPIRequestTypeDelete: - if err := sth.execPolicy(ctx, pending, true); err != nil { + case policyEngineAPIRequestTypeDelete, policyEngineAPIRequestTypeSuspend: + if err := sth.execPolicy(ctx, pending, request.requestType); err != nil { request.response <- policyEngineAPIResponse{err: err} } else { res := policyEngineAPIResponse{tx: pending.mtx, status: http.StatusAccepted} @@ -239,7 +240,7 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex } -func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncDeleteRequest bool) (ctx *RunContext, err error) { +func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncRequest ...policyEngineAPIRequestType) (ctx *RunContext, err error) { // Take a snapshot of the pending state under the lock sth.mux.Lock() @@ -254,7 +255,16 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context } confirmNotify := pending.confirmNotify receiptNotify := pending.receiptNotify - if syncDeleteRequest && mtx.DeleteRequested == nil { + for _, sr := range syncRequest { + switch sr { + case policyEngineAPIRequestTypeDelete: + ctx.Deleting = true + case policyEngineAPIRequestTypeSuspend: + ctx.Suspending = true + } + } + + if ctx.Deleting && mtx.DeleteRequested == nil { mtx.DeleteRequested = fftypes.Now() ctx.UpdateType = Update // might change to delete later ctx.TXUpdates.DeleteRequested = mtx.DeleteRequested @@ -299,9 +309,9 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context return ctx, nil } -func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending *pendingState, syncDeleteRequest bool) (err error) { +func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending *pendingState, syncRequest ...policyEngineAPIRequestType) (err error) { - ctx, err := sth.pendingToRunContext(baseCtx, pending, syncDeleteRequest) + ctx, err := sth.pendingToRunContext(baseCtx, pending, syncRequest...) if err != nil { return nil } @@ -309,7 +319,7 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending completed := false switch { - case ctx.Confirmed && !syncDeleteRequest: + case ctx.Confirmed && !ctx.Deleting: completed = true ctx.UpdateType = Update if ctx.Receipt != nil && ctx.Receipt.Success { @@ -319,13 +329,20 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending mtx.Status = apitypes.TxStatusFailed ctx.TXUpdates.Status = &mtx.Status } - + case ctx.Suspending: + // Whole cycle is a no-op if we're not pending + if mtx.Status == apitypes.TxStatusPending { + completed = true + ctx.UpdateType = Update + mtx.Status = apitypes.TxStatusSuspended + ctx.TXUpdates.Status = &mtx.Status + } default: // We get woken for lots of reasons to go through the policy loop, but we only want // to drive the policy engine at regular intervals. // So we track the last time we ran the policy engine against each pending item. // We always call the policy engine on every loop, when deletion has been requested. - if syncDeleteRequest || time.Since(pending.lastPolicyCycle) > sth.policyLoopInterval { + if ctx.Deleting || time.Since(pending.lastPolicyCycle) > sth.policyLoopInterval { // Pass the state to the pluggable policy engine to potentially perform more actions against it, // such as submitting for the first time, or raising the gas etc. @@ -398,11 +415,12 @@ func (sth *simpleTransactionHandler) flushChanges(ctx *RunContext, pending *pend } if completed { pending.remove = true // for the next time round the loop - log.L(ctx).Infof("Transaction %s marked complete (status=%s): %s", mtx.ID, mtx.Status, err) + log.L(ctx).Infof("Transaction %s removed from tracking (status=%s): %s", mtx.ID, mtx.Status, err) sth.markInflightStale() // if and only if the transaction is now resolved dispatch an event to event handler - // and discard any handling errors + // and discard any handling errors. + // Note that TxStatusSuspended has no action here. if mtx.Status == apitypes.TxStatusSucceeded { _ = sth.toolkit.EventHandler.HandleEvent(ctx, apitypes.ManagedTransactionEvent{ Type: apitypes.ManagedTXProcessSucceeded, diff --git a/pkg/txhandler/simple/policyloop_test.go b/pkg/txhandler/simple/policyloop_test.go index 12d3c5a1..eecf8954 100644 --- a/pkg/txhandler/simple/policyloop_test.go +++ b/pkg/txhandler/simple/policyloop_test.go @@ -892,6 +892,55 @@ func TestExecPolicyDeleteInflightSync(t *testing.T) { } +func TestExecPolicySuspendInflightSync(t *testing.T) { + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + eh := &fftm.ManagedTransactionEventHandler{ + Ctx: context.Background(), + TxHandler: sth, + } + mc := &confirmationsmocks.Manager{} + mc.On("Notify", mock.Anything).Return(nil) + eh.ConfirmationManager = mc + mws := &wsmocks.WebSocketServer{} + mws.On("SendReply", mock.Anything).Return(nil).Maybe() + + eh.WsServer = mws + sth.toolkit.EventHandler = eh + mp := sth.toolkit.TXPersistence.(*persistencemocks.Persistence) + mp.On("InsertTransactionWithNextNonce", sth.ctx, mock.Anything, mock.Anything).Return(nil, nil).Once() + mp.On("AddSubStatusAction", sth.ctx, mock.Anything, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, mock.Anything, mock.Anything).Return(nil) + tx := sendSampleTX(t, sth, "0xaaaaa", 12345, "") + sth.inflight = []*pendingState{{mtx: tx}} + mp.On("UpdateTransaction", mock.AnythingOfType("*simple.RunContext"), tx.ID, mock.MatchedBy(func(updates *apitypes.TXUpdates) bool { + return updates.Status != nil && *updates.Status == apitypes.TxStatusSuspended + })).Return(nil) + + req := &policyEngineAPIRequest{ + requestType: policyEngineAPIRequestTypeSuspend, + txID: tx.ID, + response: make(chan policyEngineAPIResponse, 1), + } + sth.policyEngineAPIRequests = append(sth.policyEngineAPIRequests, req) + + sth.processPolicyAPIRequests(sth.ctx) + + res := <-req.response + assert.NoError(t, res.err) + assert.Equal(t, http.StatusOK, res.status) + assert.True(t, sth.inflight[0].remove) + + mp.AssertExpectations(t) + +} + func TestExecPolicyIdempotentCancellation(t *testing.T) { f, tk, _, conf := newTestTransactionHandlerFactory(t) conf.Set(FixedGasPrice, `12345`) @@ -1104,7 +1153,7 @@ func TestExecPolicyUpdateNewInfo(t *testing.T) { FirstSubmit: fftypes.Now(), }, receipt: &ffcapi.TransactionReceiptResponse{}, - }, false) + }) assert.NoError(t, err) } diff --git a/pkg/txhandler/simple/simple_transaction_handler.go b/pkg/txhandler/simple/simple_transaction_handler.go index bcacc7ee..105503e3 100644 --- a/pkg/txhandler/simple/simple_transaction_handler.go +++ b/pkg/txhandler/simple/simple_transaction_handler.go @@ -66,6 +66,8 @@ type RunContext struct { Receipt *ffcapi.TransactionReceiptResponse Confirmations *apitypes.ConfirmationsNotification Confirmed bool + Suspending bool + Deleting bool // Input/output SubStatus apitypes.TxSubStatus Info *simplePolicyInfo // must be updated in-place and set UpdatedInfo to true as well as UpdateType = Update @@ -284,6 +286,14 @@ func (sth *simpleTransactionHandler) HandleCancelTransaction(ctx context.Context return res.tx, nil } +func (sth *simpleTransactionHandler) HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { + res := sth.policyEngineAPIRequest(ctx, &policyEngineAPIRequest{ + requestType: policyEngineAPIRequestTypeSuspend, + txID: txID, + }) + return res.tx, nil +} + func (sth *simpleTransactionHandler) createManagedTx(ctx context.Context, txID string, txHeaders *ffcapi.TransactionHeaders, gas *fftypes.FFBigInt, transactionData string) (*apitypes.ManagedTX, error) { if gas != nil { diff --git a/pkg/txhandler/txhandler.go b/pkg/txhandler/txhandler.go index 87d422d3..8e4a7b2f 100644 --- a/pkg/txhandler/txhandler.go +++ b/pkg/txhandler/txhandler.go @@ -107,6 +107,8 @@ type TransactionHandler interface { HandleNewContractDeployment(ctx context.Context, txReq *apitypes.ContractDeployRequest) (mtx *apitypes.ManagedTX, err error) // HandleCancelTransaction - handles event of cancelling a managed transaction HandleCancelTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) + // HandleSuspendTransaction - handles event of suspending a managed transaction + HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) // Informational events: // HandleTransactionConfirmations - handles confirmations of blockchain transactions for a managed transaction