From 581fe26f338095af25464447ebfd94eca21a34e6 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 30 Jun 2023 13:58:20 -0400 Subject: [PATCH] Add suspend/resume transaction actions to FFTM and simple policy engine Signed-off-by: Peter Broadhurst --- internal/tmmsgs/en_api_descriptions.go | 2 + mocks/txhandlermocks/transaction_handler.go | 52 +++++ pkg/fftm/route_post_transaction_resume.go | 45 +++++ .../route_post_transaction_resume_test.go | 67 +++++++ pkg/fftm/route_post_transaction_suspend.go | 8 +- .../route_post_transaction_suspend_test.go | 8 +- pkg/fftm/routes.go | 2 + pkg/fftm/transaction_management.go | 12 ++ pkg/txhandler/simple/policyloop.go | 51 +++-- pkg/txhandler/simple/policyloop_test.go | 188 +++++++++++++++++- .../simple/simple_transaction_handler.go | 19 +- pkg/txhandler/txhandler.go | 2 + 12 files changed, 413 insertions(+), 43 deletions(-) create mode 100644 pkg/fftm/route_post_transaction_resume.go create mode 100644 pkg/fftm/route_post_transaction_resume_test.go diff --git a/internal/tmmsgs/en_api_descriptions.go b/internal/tmmsgs/en_api_descriptions.go index 74ee6d4b..9762b65b 100644 --- a/internal/tmmsgs/en_api_descriptions.go +++ b/internal/tmmsgs/en_api_descriptions.go @@ -58,6 +58,8 @@ var ( APIEndpointPostRootQueryOutput = ffm("api.endpoints.post.root.query.output", "The data result of a query against a smart contract") APIEndpointPostSubscriptionReset = ffm("api.endpoints.post.subscription.reset", "Reset listener - route deprecated in favor of /eventstreams/{streamId}/listeners/{listenerId}/reset") APIEndpointPostSubscriptions = ffm("api.endpoints.post.subscriptions", "Create new listener - route deprecated in favor of /eventstreams/{streamId}/listeners") + APIEndpointPostTransactionSuspend = ffm("api.endpoints.post.transactions.suspend", "Suspend processing on a pending transaction (no-op for completed transactions)") + APIEndpointPostTransactionResume = ffm("api.endpoints.post.transactions.resume", "Resume processing on a suspended transaction") APIParamStreamID = ffm("api.params.streamId", "Event Stream ID") APIParamListenerID = ffm("api.params.listenerId", "Listener ID") diff --git a/mocks/txhandlermocks/transaction_handler.go b/mocks/txhandlermocks/transaction_handler.go index 97eecf5d..0fc2bcae 100644 --- a/mocks/txhandlermocks/transaction_handler.go +++ b/mocks/txhandlermocks/transaction_handler.go @@ -97,6 +97,58 @@ func (_m *TransactionHandler) HandleNewTransaction(ctx context.Context, txReq *a return r0, r1 } +// HandleResumeTransaction provides a mock function with given fields: ctx, txID +func (_m *TransactionHandler) HandleResumeTransaction(ctx context.Context, txID string) (*apitypes.ManagedTX, error) { + ret := _m.Called(ctx, txID) + + var r0 *apitypes.ManagedTX + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*apitypes.ManagedTX, error)); ok { + return rf(ctx, txID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *apitypes.ManagedTX); ok { + r0 = rf(ctx, txID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*apitypes.ManagedTX) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, txID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// HandleSuspendTransaction provides a mock function with given fields: ctx, txID +func (_m *TransactionHandler) HandleSuspendTransaction(ctx context.Context, txID string) (*apitypes.ManagedTX, error) { + ret := _m.Called(ctx, txID) + + var r0 *apitypes.ManagedTX + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*apitypes.ManagedTX, error)); ok { + return rf(ctx, txID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *apitypes.ManagedTX); ok { + r0 = rf(ctx, txID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*apitypes.ManagedTX) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, txID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // HandleTransactionConfirmations provides a mock function with given fields: ctx, txID, notification func (_m *TransactionHandler) HandleTransactionConfirmations(ctx context.Context, txID string, notification *apitypes.ConfirmationsNotification) error { ret := _m.Called(ctx, txID, notification) diff --git a/pkg/fftm/route_post_transaction_resume.go b/pkg/fftm/route_post_transaction_resume.go new file mode 100644 index 00000000..0d815e32 --- /dev/null +++ b/pkg/fftm/route_post_transaction_resume.go @@ -0,0 +1,45 @@ +// Copyright © 2023 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftm + +import ( + "net/http" + + "github.com/hyperledger/firefly-common/pkg/ffapi" + "github.com/hyperledger/firefly-transaction-manager/internal/tmmsgs" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" +) + +var postTransactionResume = func(m *manager) *ffapi.Route { + return &ffapi.Route{ + Name: "postTransactionResume", + Path: "/transactions/{transactionId}/resume", + Method: http.MethodPost, + PathParams: []*ffapi.PathParam{ + {Name: "transactionId", Description: tmmsgs.APIParamTransactionID}, + }, + QueryParams: nil, + Description: tmmsgs.APIEndpointPostTransactionResume, + JSONInputValue: func() interface{} { return &struct{}{} }, + JSONOutputValue: func() interface{} { return &apitypes.ManagedTX{} }, + JSONOutputCodes: []int{http.StatusOK, http.StatusAccepted}, + JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { + r.SuccessStatus, output, err = m.requestTransactionResume(r.Req.Context(), r.PP["transactionId"]) + return output, err + }, + } +} diff --git a/pkg/fftm/route_post_transaction_resume_test.go b/pkg/fftm/route_post_transaction_resume_test.go new file mode 100644 index 00000000..40b2e148 --- /dev/null +++ b/pkg/fftm/route_post_transaction_resume_test.go @@ -0,0 +1,67 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftm + +import ( + "fmt" + "testing" + + "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly-transaction-manager/mocks/txhandlermocks" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestPostTransactionResume(t *testing.T) { + + url, m, done := newTestManager(t) + defer done() + tx := newTestTxn(t, m, "0x0aaaaa", 10001, apitypes.TxStatusSucceeded) + txID := tx.ID + + err := m.Start() + assert.NoError(t, err) + + var txOut *apitypes.ManagedTX + res, err := resty.New().R(). + SetResult(&txOut). + SetBody(struct{}{}). + Post(fmt.Sprintf("%s/transactions/%s/resume", url, txID)) + assert.NoError(t, err) + assert.Equal(t, 202, res.StatusCode()) + assert.Equal(t, txID, txOut.ID) +} + +func TestPostTransactionResumeFailed(t *testing.T) { + url, m, done := newTestManager(t) + defer done() + + err := m.Start() + assert.NoError(t, err) + mth := txhandlermocks.TransactionHandler{} + mth.On("HandleResumeTransaction", mock.Anything, "1234").Return(nil, fmt.Errorf("error")).Once() + m.txHandler = &mth + + var txOut *apitypes.ManagedTX + res, err := resty.New().R(). + SetResult(&txOut). + SetBody(struct{}{}). + Post(fmt.Sprintf("%s/transactions/%s/resume", url, "1234")) + assert.NoError(t, err) + assert.Equal(t, 500, res.StatusCode()) +} diff --git a/pkg/fftm/route_post_transaction_suspend.go b/pkg/fftm/route_post_transaction_suspend.go index c7848bd7..ad9de3dc 100644 --- a/pkg/fftm/route_post_transaction_suspend.go +++ b/pkg/fftm/route_post_transaction_suspend.go @@ -27,18 +27,18 @@ import ( var postTransactionSuspend = func(m *manager) *ffapi.Route { return &ffapi.Route{ Name: "postTransactionSuspend", - Path: "/transactions/{transactionId}", - Method: http.MethodDelete, + Path: "/transactions/{transactionId}/suspend", + Method: http.MethodPost, PathParams: []*ffapi.PathParam{ {Name: "transactionId", Description: tmmsgs.APIParamTransactionID}, }, QueryParams: nil, Description: tmmsgs.APIEndpointPostTransactionSuspend, - JSONInputValue: nil, + JSONInputValue: func() interface{} { return &struct{}{} }, JSONOutputValue: func() interface{} { return &apitypes.ManagedTX{} }, JSONOutputCodes: []int{http.StatusOK, http.StatusAccepted}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - r.SuccessStatus, output, err = m.requestTransactionDeletion(r.Req.Context(), r.PP["transactionId"]) + r.SuccessStatus, output, err = m.requestTransactionSuspend(r.Req.Context(), r.PP["transactionId"]) return output, err }, } diff --git a/pkg/fftm/route_post_transaction_suspend_test.go b/pkg/fftm/route_post_transaction_suspend_test.go index 3025ba77..8cd121d2 100644 --- a/pkg/fftm/route_post_transaction_suspend_test.go +++ b/pkg/fftm/route_post_transaction_suspend_test.go @@ -38,10 +38,9 @@ func TestPostTransactionSuspend(t *testing.T) { assert.NoError(t, err) var txOut *apitypes.ManagedTX - emptyInput := map[string]interface{}{} res, err := resty.New().R(). SetResult(&txOut). - Body(emptyInput). + SetBody(struct{}{}). Post(fmt.Sprintf("%s/transactions/%s/suspend", url, txID)) assert.NoError(t, err) assert.Equal(t, 202, res.StatusCode()) @@ -55,13 +54,14 @@ func TestPostTransactionSuspendFailed(t *testing.T) { err := m.Start() assert.NoError(t, err) mth := txhandlermocks.TransactionHandler{} - mth.On("HandleCancelTransaction", mock.Anything, "1234").Return(nil, fmt.Errorf("error")).Once() + mth.On("HandleSuspendTransaction", mock.Anything, "1234").Return(nil, fmt.Errorf("error")).Once() m.txHandler = &mth var txOut *apitypes.ManagedTX res, err := resty.New().R(). SetResult(&txOut). - Delete(fmt.Sprintf("%s/transactions/%s", url, "1234")) + SetBody(struct{}{}). + Post(fmt.Sprintf("%s/transactions/%s/suspend", url, "1234")) assert.NoError(t, err) assert.Equal(t, 500, res.StatusCode()) } diff --git a/pkg/fftm/routes.go b/pkg/fftm/routes.go index c98634d0..d6472040 100644 --- a/pkg/fftm/routes.go +++ b/pkg/fftm/routes.go @@ -51,5 +51,7 @@ func (m *manager) routes() []*ffapi.Route { postSubscriptions(m), getAddressBalance(m), getGasPrice(m), + postTransactionSuspend(m), + postTransactionResume(m), } } diff --git a/pkg/fftm/transaction_management.go b/pkg/fftm/transaction_management.go index 2059895b..384d054f 100644 --- a/pkg/fftm/transaction_management.go +++ b/pkg/fftm/transaction_management.go @@ -108,3 +108,15 @@ func (m *manager) requestTransactionSuspend(ctx context.Context, txID string) (s return http.StatusAccepted, canceledTx, nil } + +func (m *manager) requestTransactionResume(ctx context.Context, txID string) (status int, transaction *apitypes.ManagedTX, err error) { + + canceledTx, err := m.txHandler.HandleResumeTransaction(ctx, txID) + + if err != nil { + return http.StatusInternalServerError, nil, err + } + + return http.StatusAccepted, canceledTx, nil + +} diff --git a/pkg/txhandler/simple/policyloop.go b/pkg/txhandler/simple/policyloop.go index eef81c67..7ee9a632 100644 --- a/pkg/txhandler/simple/policyloop.go +++ b/pkg/txhandler/simple/policyloop.go @@ -39,8 +39,10 @@ const metricsGaugeTransactionsInflightFreeDescription = "Number of transactions type policyEngineAPIRequestType int const ( - policyEngineAPIRequestTypeDelete policyEngineAPIRequestType = iota - policyEngineAPIRequestTypeSuspend + ActionNone policyEngineAPIRequestType = iota + ActionDelete + ActionSuspend + ActionResume ) type policyEngineAPIRequest struct { @@ -171,7 +173,7 @@ func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflig // Go through executing the policy engine against them for _, pending := range sth.inflight { - err := sth.execPolicy(ctx, pending) + err := sth.execPolicy(ctx, pending, nil) if err != nil { log.L(ctx).Errorf("Failed policy cycle transaction=%s operation=%s: %s", pending.mtx.TransactionHash, pending.mtx.ID, err) } @@ -221,12 +223,12 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex } switch request.requestType { - case policyEngineAPIRequestTypeDelete, policyEngineAPIRequestTypeSuspend: - if err := sth.execPolicy(ctx, pending, request.requestType); err != nil { + case ActionDelete, ActionSuspend, ActionResume: + if err := sth.execPolicy(ctx, pending, &request.requestType); err != nil { request.response <- policyEngineAPIResponse{err: err} } else { res := policyEngineAPIResponse{tx: pending.mtx, status: http.StatusAccepted} - if pending.remove { + if pending.remove || request.requestType == ActionResume /* always sync */ { res.status = http.StatusOK // synchronously completed } request.response <- res @@ -240,7 +242,7 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex } -func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncRequest ...policyEngineAPIRequestType) (ctx *RunContext, err error) { +func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncRequest *policyEngineAPIRequestType) (ctx *RunContext, err error) { // Take a snapshot of the pending state under the lock sth.mux.Lock() @@ -255,16 +257,11 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context } confirmNotify := pending.confirmNotify receiptNotify := pending.receiptNotify - for _, sr := range syncRequest { - switch sr { - case policyEngineAPIRequestTypeDelete: - ctx.Deleting = true - case policyEngineAPIRequestTypeSuspend: - ctx.Suspending = true - } + if syncRequest != nil { + ctx.SyncAction = *syncRequest } - if ctx.Deleting && mtx.DeleteRequested == nil { + if ctx.SyncAction == ActionDelete && mtx.DeleteRequested == nil { mtx.DeleteRequested = fftypes.Now() ctx.UpdateType = Update // might change to delete later ctx.TXUpdates.DeleteRequested = mtx.DeleteRequested @@ -309,9 +306,9 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context return ctx, nil } -func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending *pendingState, syncRequest ...policyEngineAPIRequestType) (err error) { +func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending *pendingState, syncRequest *policyEngineAPIRequestType) (err error) { - ctx, err := sth.pendingToRunContext(baseCtx, pending, syncRequest...) + ctx, err := sth.pendingToRunContext(baseCtx, pending, syncRequest) if err != nil { return nil } @@ -319,7 +316,7 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending completed := false switch { - case ctx.Confirmed && !ctx.Deleting: + case ctx.Confirmed && ctx.SyncAction != ActionDelete: completed = true ctx.UpdateType = Update if ctx.Receipt != nil && ctx.Receipt.Success { @@ -329,20 +326,27 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending mtx.Status = apitypes.TxStatusFailed ctx.TXUpdates.Status = &mtx.Status } - case ctx.Suspending: + case ctx.SyncAction == ActionSuspend: // Whole cycle is a no-op if we're not pending if mtx.Status == apitypes.TxStatusPending { - completed = true ctx.UpdateType = Update + completed = true // drop it out of the loop mtx.Status = apitypes.TxStatusSuspended ctx.TXUpdates.Status = &mtx.Status } + case ctx.SyncAction == ActionResume: + // Whole cycle is a no-op if we're not suspended + if mtx.Status == apitypes.TxStatusSuspended { + ctx.UpdateType = Update + mtx.Status = apitypes.TxStatusPending + ctx.TXUpdates.Status = &mtx.Status + } default: // We get woken for lots of reasons to go through the policy loop, but we only want // to drive the policy engine at regular intervals. // So we track the last time we ran the policy engine against each pending item. // We always call the policy engine on every loop, when deletion has been requested. - if ctx.Deleting || time.Since(pending.lastPolicyCycle) > sth.policyLoopInterval { + if ctx.SyncAction == ActionDelete || time.Since(pending.lastPolicyCycle) > sth.policyLoopInterval { // Pass the state to the pluggable policy engine to potentially perform more actions against it, // such as submitting for the first time, or raising the gas etc. @@ -413,7 +417,10 @@ func (sth *simpleTransactionHandler) flushChanges(ctx *RunContext, pending *pend log.L(ctx).Errorf("Failed to update transaction %s (status=%s): %s", mtx.ID, mtx.Status, err) return err } - if completed { + if ctx.SyncAction == ActionResume { + log.L(ctx).Infof("Transaction %s resumed", mtx.ID) + sth.markInflightStale() // this won't be in the in-flight set, so we need to pull it in if there's space + } else if completed { pending.remove = true // for the next time round the loop log.L(ctx).Infof("Transaction %s removed from tracking (status=%s): %s", mtx.ID, mtx.Status, err) sth.markInflightStale() diff --git a/pkg/txhandler/simple/policyloop_test.go b/pkg/txhandler/simple/policyloop_test.go index eecf8954..5ad5fcd4 100644 --- a/pkg/txhandler/simple/policyloop_test.go +++ b/pkg/txhandler/simple/policyloop_test.go @@ -795,7 +795,7 @@ func TestExecPolicyGetTxFail(t *testing.T) { mp.On("GetTransactionByID", sth.ctx, tx.ID).Return(nil, fmt.Errorf("pop")) req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: tx.ID, response: make(chan policyEngineAPIResponse, 1), } @@ -830,7 +830,7 @@ func TestExecPolicyDeleteFail(t *testing.T) { mp.On("DeleteTransaction", mock.Anything, tx.ID).Return(fmt.Errorf("pop")) req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: tx.ID, response: make(chan policyEngineAPIResponse, 1), } @@ -875,7 +875,7 @@ func TestExecPolicyDeleteInflightSync(t *testing.T) { mp.On("DeleteTransaction", mock.Anything, tx.ID).Return(nil) req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: tx.ID, response: make(chan policyEngineAPIResponse, 1), } @@ -924,7 +924,7 @@ func TestExecPolicySuspendInflightSync(t *testing.T) { })).Return(nil) req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeSuspend, + requestType: ActionSuspend, txID: tx.ID, response: make(chan policyEngineAPIResponse, 1), } @@ -941,6 +941,180 @@ func TestExecPolicySuspendInflightSync(t *testing.T) { } +func TestExecPolicyResumeSync(t *testing.T) { + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + eh := &fftm.ManagedTransactionEventHandler{ + Ctx: context.Background(), + TxHandler: sth, + } + mc := &confirmationsmocks.Manager{} + mc.On("Notify", mock.Anything).Return(nil) + eh.ConfirmationManager = mc + mws := &wsmocks.WebSocketServer{} + mws.On("SendReply", mock.Anything).Return(nil).Maybe() + + eh.WsServer = mws + sth.toolkit.EventHandler = eh + mp := sth.toolkit.TXPersistence.(*persistencemocks.Persistence) + mp.On("InsertTransactionWithNextNonce", sth.ctx, mock.Anything, mock.Anything).Return(nil, nil).Once() + mp.On("AddSubStatusAction", sth.ctx, mock.Anything, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, mock.Anything, mock.Anything).Return(nil) + tx := sendSampleTX(t, sth, "0xaaaaa", 12345, "") + tx.Status = apitypes.TxStatusSuspended + mp.On("UpdateTransaction", mock.AnythingOfType("*simple.RunContext"), tx.ID, mock.MatchedBy(func(updates *apitypes.TXUpdates) bool { + tx.Status = apitypes.TxStatusPending + return updates.Status != nil && *updates.Status == apitypes.TxStatusPending + })).Return(nil) + mp.On("GetTransactionByID", mock.Anything, tx.ID).Return(tx, nil) + + req := &policyEngineAPIRequest{ + requestType: ActionResume, + txID: tx.ID, + response: make(chan policyEngineAPIResponse, 1), + } + sth.policyEngineAPIRequests = append(sth.policyEngineAPIRequests, req) + + sth.processPolicyAPIRequests(sth.ctx) + + res := <-req.response + assert.NoError(t, res.err) + assert.Equal(t, http.StatusOK, res.status) + + mp.AssertExpectations(t) + +} + +func TestExecHandleResumeQueuesResumeOk(t *testing.T) { + + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + + result := make(chan error) + go func() { + _, err := sth.HandleResumeTransaction(sth.ctx, "tx1") + result <- err + }() + + for len(sth.policyEngineAPIRequests) == 0 { + time.Sleep(1 * time.Millisecond) + } + req := sth.policyEngineAPIRequests[0] + assert.Equal(t, ActionResume, req.requestType) + req.response <- policyEngineAPIResponse{} + + err = <-result + assert.NoError(t, err) + +} + +func TestExecHandleResumeQueuesSuspendOk(t *testing.T) { + + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + + result := make(chan error) + go func() { + _, err := sth.HandleSuspendTransaction(sth.ctx, "tx1") + result <- err + }() + + for len(sth.policyEngineAPIRequests) == 0 { + time.Sleep(1 * time.Millisecond) + } + req := sth.policyEngineAPIRequests[0] + assert.Equal(t, ActionSuspend, req.requestType) + req.response <- policyEngineAPIResponse{} + + err = <-result + assert.NoError(t, err) + +} + +func TestExecHandleResumeQueuesResumeErr(t *testing.T) { + + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + + result := make(chan error) + go func() { + _, err := sth.HandleResumeTransaction(sth.ctx, "tx1") + result <- err + }() + + for len(sth.policyEngineAPIRequests) == 0 { + time.Sleep(1 * time.Millisecond) + } + req := sth.policyEngineAPIRequests[0] + assert.Equal(t, ActionResume, req.requestType) + req.response <- policyEngineAPIResponse{ + err: fmt.Errorf("pop"), + } + + err = <-result + assert.Regexp(t, "pop", err) + +} + +func TestExecHandleResumeQueuesSuspendErr(t *testing.T) { + + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + + result := make(chan error) + go func() { + _, err := sth.HandleSuspendTransaction(sth.ctx, "tx1") + result <- err + }() + + for len(sth.policyEngineAPIRequests) == 0 { + time.Sleep(1 * time.Millisecond) + } + req := sth.policyEngineAPIRequests[0] + assert.Equal(t, ActionSuspend, req.requestType) + req.response <- policyEngineAPIResponse{ + err: fmt.Errorf("pop"), + } + + err = <-result + assert.Regexp(t, "pop", err) + +} + func TestExecPolicyIdempotentCancellation(t *testing.T) { f, tk, _, conf := newTestTransactionHandlerFactory(t) conf.Set(FixedGasPrice, `12345`) @@ -1032,7 +1206,7 @@ func TestPendingTransactionGetsRemoved(t *testing.T) { // add a delete request req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: testTxID.String(), response: make(chan policyEngineAPIResponse, 1), } @@ -1067,7 +1241,7 @@ func TestExecPolicyDeleteNotFound(t *testing.T) { mp.On("GetTransactionByID", sth.ctx, "bad-id").Return(nil, nil) req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: "bad-id", response: make(chan policyEngineAPIResponse, 1), } @@ -1153,7 +1327,7 @@ func TestExecPolicyUpdateNewInfo(t *testing.T) { FirstSubmit: fftypes.Now(), }, receipt: &ffcapi.TransactionReceiptResponse{}, - }) + }, nil) assert.NoError(t, err) } diff --git a/pkg/txhandler/simple/simple_transaction_handler.go b/pkg/txhandler/simple/simple_transaction_handler.go index 105503e3..6aef804e 100644 --- a/pkg/txhandler/simple/simple_transaction_handler.go +++ b/pkg/txhandler/simple/simple_transaction_handler.go @@ -66,8 +66,7 @@ type RunContext struct { Receipt *ffcapi.TransactionReceiptResponse Confirmations *apitypes.ConfirmationsNotification Confirmed bool - Suspending bool - Deleting bool + SyncAction policyEngineAPIRequestType // Input/output SubStatus apitypes.TxSubStatus Info *simplePolicyInfo // must be updated in-place and set UpdatedInfo to true as well as UpdateType = Update @@ -280,18 +279,26 @@ func (sth *simpleTransactionHandler) HandleNewContractDeployment(ctx context.Con func (sth *simpleTransactionHandler) HandleCancelTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { res := sth.policyEngineAPIRequest(ctx, &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: txID, }) - return res.tx, nil + return res.tx, res.err } func (sth *simpleTransactionHandler) HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { res := sth.policyEngineAPIRequest(ctx, &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeSuspend, + requestType: ActionSuspend, txID: txID, }) - return res.tx, nil + return res.tx, res.err +} + +func (sth *simpleTransactionHandler) HandleResumeTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { + res := sth.policyEngineAPIRequest(ctx, &policyEngineAPIRequest{ + requestType: ActionResume, + txID: txID, + }) + return res.tx, res.err } func (sth *simpleTransactionHandler) createManagedTx(ctx context.Context, txID string, txHeaders *ffcapi.TransactionHeaders, gas *fftypes.FFBigInt, transactionData string) (*apitypes.ManagedTX, error) { diff --git a/pkg/txhandler/txhandler.go b/pkg/txhandler/txhandler.go index 8e4a7b2f..bba6fb9f 100644 --- a/pkg/txhandler/txhandler.go +++ b/pkg/txhandler/txhandler.go @@ -109,6 +109,8 @@ type TransactionHandler interface { HandleCancelTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) // HandleSuspendTransaction - handles event of suspending a managed transaction HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) + // HandleResumeTransaction - handles event of resuming a suspended managed transaction + HandleResumeTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) // Informational events: // HandleTransactionConfirmations - handles confirmations of blockchain transactions for a managed transaction