Skip to content

Commit

Permalink
Add suspend/resume transaction actions to FFTM and simple policy engine
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Jun 30, 2023
1 parent 8c9b33b commit 581fe26
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 43 deletions.
2 changes: 2 additions & 0 deletions internal/tmmsgs/en_api_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ var (
APIEndpointPostRootQueryOutput = ffm("api.endpoints.post.root.query.output", "The data result of a query against a smart contract")
APIEndpointPostSubscriptionReset = ffm("api.endpoints.post.subscription.reset", "Reset listener - route deprecated in favor of /eventstreams/{streamId}/listeners/{listenerId}/reset")
APIEndpointPostSubscriptions = ffm("api.endpoints.post.subscriptions", "Create new listener - route deprecated in favor of /eventstreams/{streamId}/listeners")
APIEndpointPostTransactionSuspend = ffm("api.endpoints.post.transactions.suspend", "Suspend processing on a pending transaction (no-op for completed transactions)")
APIEndpointPostTransactionResume = ffm("api.endpoints.post.transactions.resume", "Resume processing on a suspended transaction")

APIParamStreamID = ffm("api.params.streamId", "Event Stream ID")
APIParamListenerID = ffm("api.params.listenerId", "Listener ID")
Expand Down
52 changes: 52 additions & 0 deletions mocks/txhandlermocks/transaction_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions pkg/fftm/route_post_transaction_resume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fftm

import (
"net/http"

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly-transaction-manager/internal/tmmsgs"
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
)

var postTransactionResume = func(m *manager) *ffapi.Route {
return &ffapi.Route{
Name: "postTransactionResume",
Path: "/transactions/{transactionId}/resume",
Method: http.MethodPost,
PathParams: []*ffapi.PathParam{
{Name: "transactionId", Description: tmmsgs.APIParamTransactionID},
},
QueryParams: nil,
Description: tmmsgs.APIEndpointPostTransactionResume,
JSONInputValue: func() interface{} { return &struct{}{} },
JSONOutputValue: func() interface{} { return &apitypes.ManagedTX{} },
JSONOutputCodes: []int{http.StatusOK, http.StatusAccepted},
JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) {
r.SuccessStatus, output, err = m.requestTransactionResume(r.Req.Context(), r.PP["transactionId"])
return output, err
},
}
}
67 changes: 67 additions & 0 deletions pkg/fftm/route_post_transaction_resume_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fftm

import (
"fmt"
"testing"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-transaction-manager/mocks/txhandlermocks"
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestPostTransactionResume(t *testing.T) {

url, m, done := newTestManager(t)
defer done()
tx := newTestTxn(t, m, "0x0aaaaa", 10001, apitypes.TxStatusSucceeded)
txID := tx.ID

err := m.Start()
assert.NoError(t, err)

var txOut *apitypes.ManagedTX
res, err := resty.New().R().
SetResult(&txOut).
SetBody(struct{}{}).
Post(fmt.Sprintf("%s/transactions/%s/resume", url, txID))
assert.NoError(t, err)
assert.Equal(t, 202, res.StatusCode())
assert.Equal(t, txID, txOut.ID)
}

func TestPostTransactionResumeFailed(t *testing.T) {
url, m, done := newTestManager(t)
defer done()

err := m.Start()
assert.NoError(t, err)
mth := txhandlermocks.TransactionHandler{}
mth.On("HandleResumeTransaction", mock.Anything, "1234").Return(nil, fmt.Errorf("error")).Once()
m.txHandler = &mth

var txOut *apitypes.ManagedTX
res, err := resty.New().R().
SetResult(&txOut).
SetBody(struct{}{}).
Post(fmt.Sprintf("%s/transactions/%s/resume", url, "1234"))
assert.NoError(t, err)
assert.Equal(t, 500, res.StatusCode())
}
8 changes: 4 additions & 4 deletions pkg/fftm/route_post_transaction_suspend.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ import (
var postTransactionSuspend = func(m *manager) *ffapi.Route {
return &ffapi.Route{
Name: "postTransactionSuspend",
Path: "/transactions/{transactionId}",
Method: http.MethodDelete,
Path: "/transactions/{transactionId}/suspend",
Method: http.MethodPost,
PathParams: []*ffapi.PathParam{
{Name: "transactionId", Description: tmmsgs.APIParamTransactionID},
},
QueryParams: nil,
Description: tmmsgs.APIEndpointPostTransactionSuspend,
JSONInputValue: nil,
JSONInputValue: func() interface{} { return &struct{}{} },
JSONOutputValue: func() interface{} { return &apitypes.ManagedTX{} },
JSONOutputCodes: []int{http.StatusOK, http.StatusAccepted},
JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) {
r.SuccessStatus, output, err = m.requestTransactionDeletion(r.Req.Context(), r.PP["transactionId"])
r.SuccessStatus, output, err = m.requestTransactionSuspend(r.Req.Context(), r.PP["transactionId"])
return output, err
},
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/fftm/route_post_transaction_suspend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ func TestPostTransactionSuspend(t *testing.T) {
assert.NoError(t, err)

var txOut *apitypes.ManagedTX
emptyInput := map[string]interface{}{}
res, err := resty.New().R().
SetResult(&txOut).
Body(emptyInput).
SetBody(struct{}{}).
Post(fmt.Sprintf("%s/transactions/%s/suspend", url, txID))
assert.NoError(t, err)
assert.Equal(t, 202, res.StatusCode())
Expand All @@ -55,13 +54,14 @@ func TestPostTransactionSuspendFailed(t *testing.T) {
err := m.Start()
assert.NoError(t, err)
mth := txhandlermocks.TransactionHandler{}
mth.On("HandleCancelTransaction", mock.Anything, "1234").Return(nil, fmt.Errorf("error")).Once()
mth.On("HandleSuspendTransaction", mock.Anything, "1234").Return(nil, fmt.Errorf("error")).Once()
m.txHandler = &mth

var txOut *apitypes.ManagedTX
res, err := resty.New().R().
SetResult(&txOut).
Delete(fmt.Sprintf("%s/transactions/%s", url, "1234"))
SetBody(struct{}{}).
Post(fmt.Sprintf("%s/transactions/%s/suspend", url, "1234"))
assert.NoError(t, err)
assert.Equal(t, 500, res.StatusCode())
}
2 changes: 2 additions & 0 deletions pkg/fftm/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,7 @@ func (m *manager) routes() []*ffapi.Route {
postSubscriptions(m),
getAddressBalance(m),
getGasPrice(m),
postTransactionSuspend(m),
postTransactionResume(m),
}
}
12 changes: 12 additions & 0 deletions pkg/fftm/transaction_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,15 @@ func (m *manager) requestTransactionSuspend(ctx context.Context, txID string) (s
return http.StatusAccepted, canceledTx, nil

}

func (m *manager) requestTransactionResume(ctx context.Context, txID string) (status int, transaction *apitypes.ManagedTX, err error) {

canceledTx, err := m.txHandler.HandleResumeTransaction(ctx, txID)

if err != nil {
return http.StatusInternalServerError, nil, err
}

return http.StatusAccepted, canceledTx, nil

}
51 changes: 29 additions & 22 deletions pkg/txhandler/simple/policyloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ const metricsGaugeTransactionsInflightFreeDescription = "Number of transactions
type policyEngineAPIRequestType int

const (
policyEngineAPIRequestTypeDelete policyEngineAPIRequestType = iota
policyEngineAPIRequestTypeSuspend
ActionNone policyEngineAPIRequestType = iota
ActionDelete
ActionSuspend
ActionResume
)

type policyEngineAPIRequest struct {
Expand Down Expand Up @@ -171,7 +173,7 @@ func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflig
// Go through executing the policy engine against them

for _, pending := range sth.inflight {
err := sth.execPolicy(ctx, pending)
err := sth.execPolicy(ctx, pending, nil)
if err != nil {
log.L(ctx).Errorf("Failed policy cycle transaction=%s operation=%s: %s", pending.mtx.TransactionHash, pending.mtx.ID, err)
}
Expand Down Expand Up @@ -221,12 +223,12 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex
}

switch request.requestType {
case policyEngineAPIRequestTypeDelete, policyEngineAPIRequestTypeSuspend:
if err := sth.execPolicy(ctx, pending, request.requestType); err != nil {
case ActionDelete, ActionSuspend, ActionResume:
if err := sth.execPolicy(ctx, pending, &request.requestType); err != nil {
request.response <- policyEngineAPIResponse{err: err}
} else {
res := policyEngineAPIResponse{tx: pending.mtx, status: http.StatusAccepted}
if pending.remove {
if pending.remove || request.requestType == ActionResume /* always sync */ {
res.status = http.StatusOK // synchronously completed
}
request.response <- res
Expand All @@ -240,7 +242,7 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex

}

func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncRequest ...policyEngineAPIRequestType) (ctx *RunContext, err error) {
func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncRequest *policyEngineAPIRequestType) (ctx *RunContext, err error) {

// Take a snapshot of the pending state under the lock
sth.mux.Lock()
Expand All @@ -255,16 +257,11 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context
}
confirmNotify := pending.confirmNotify
receiptNotify := pending.receiptNotify
for _, sr := range syncRequest {
switch sr {
case policyEngineAPIRequestTypeDelete:
ctx.Deleting = true
case policyEngineAPIRequestTypeSuspend:
ctx.Suspending = true
}
if syncRequest != nil {
ctx.SyncAction = *syncRequest
}

if ctx.Deleting && mtx.DeleteRequested == nil {
if ctx.SyncAction == ActionDelete && mtx.DeleteRequested == nil {
mtx.DeleteRequested = fftypes.Now()
ctx.UpdateType = Update // might change to delete later
ctx.TXUpdates.DeleteRequested = mtx.DeleteRequested
Expand Down Expand Up @@ -309,17 +306,17 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context
return ctx, nil
}

func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending *pendingState, syncRequest ...policyEngineAPIRequestType) (err error) {
func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending *pendingState, syncRequest *policyEngineAPIRequestType) (err error) {

ctx, err := sth.pendingToRunContext(baseCtx, pending, syncRequest...)
ctx, err := sth.pendingToRunContext(baseCtx, pending, syncRequest)
if err != nil {
return nil
}
mtx := ctx.TX

completed := false
switch {
case ctx.Confirmed && !ctx.Deleting:
case ctx.Confirmed && ctx.SyncAction != ActionDelete:
completed = true
ctx.UpdateType = Update
if ctx.Receipt != nil && ctx.Receipt.Success {
Expand All @@ -329,20 +326,27 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending
mtx.Status = apitypes.TxStatusFailed
ctx.TXUpdates.Status = &mtx.Status
}
case ctx.Suspending:
case ctx.SyncAction == ActionSuspend:
// Whole cycle is a no-op if we're not pending
if mtx.Status == apitypes.TxStatusPending {
completed = true
ctx.UpdateType = Update
completed = true // drop it out of the loop
mtx.Status = apitypes.TxStatusSuspended
ctx.TXUpdates.Status = &mtx.Status
}
case ctx.SyncAction == ActionResume:
// Whole cycle is a no-op if we're not suspended
if mtx.Status == apitypes.TxStatusSuspended {
ctx.UpdateType = Update
mtx.Status = apitypes.TxStatusPending
ctx.TXUpdates.Status = &mtx.Status
}
default:
// We get woken for lots of reasons to go through the policy loop, but we only want
// to drive the policy engine at regular intervals.
// So we track the last time we ran the policy engine against each pending item.
// We always call the policy engine on every loop, when deletion has been requested.
if ctx.Deleting || time.Since(pending.lastPolicyCycle) > sth.policyLoopInterval {
if ctx.SyncAction == ActionDelete || time.Since(pending.lastPolicyCycle) > sth.policyLoopInterval {
// Pass the state to the pluggable policy engine to potentially perform more actions against it,
// such as submitting for the first time, or raising the gas etc.

Expand Down Expand Up @@ -413,7 +417,10 @@ func (sth *simpleTransactionHandler) flushChanges(ctx *RunContext, pending *pend
log.L(ctx).Errorf("Failed to update transaction %s (status=%s): %s", mtx.ID, mtx.Status, err)
return err
}
if completed {
if ctx.SyncAction == ActionResume {
log.L(ctx).Infof("Transaction %s resumed", mtx.ID)
sth.markInflightStale() // this won't be in the in-flight set, so we need to pull it in if there's space
} else if completed {
pending.remove = true // for the next time round the loop
log.L(ctx).Infof("Transaction %s removed from tracking (status=%s): %s", mtx.ID, mtx.Status, err)
sth.markInflightStale()
Expand Down
Loading

0 comments on commit 581fe26

Please sign in to comment.