Skip to content

Commit

Permalink
stash commit
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Jun 30, 2023
1 parent ec94e7b commit 8c9b33b
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pkg/apitypes/managed_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
TxStatusSucceeded TxStatus = "Succeeded"
// TxStatusFailed happens when an error is reported by the infrastructure runtime
TxStatusFailed TxStatus = "Failed"
// TxStatusSuspended indicates we are not actively doing any work with this transaction right now, until it's resumed to pending again
TxStatusSuspended TxStatus = "Suspended"
)

// TxSubStatus is an intermediate status a transaction may go through
Expand Down
45 changes: 45 additions & 0 deletions pkg/fftm/route_post_transaction_suspend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fftm

import (
"net/http"

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly-transaction-manager/internal/tmmsgs"
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
)

var postTransactionSuspend = func(m *manager) *ffapi.Route {
return &ffapi.Route{
Name: "postTransactionSuspend",
Path: "/transactions/{transactionId}",
Method: http.MethodDelete,
PathParams: []*ffapi.PathParam{
{Name: "transactionId", Description: tmmsgs.APIParamTransactionID},
},
QueryParams: nil,
Description: tmmsgs.APIEndpointPostTransactionSuspend,
JSONInputValue: nil,
JSONOutputValue: func() interface{} { return &apitypes.ManagedTX{} },
JSONOutputCodes: []int{http.StatusOK, http.StatusAccepted},
JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) {
r.SuccessStatus, output, err = m.requestTransactionDeletion(r.Req.Context(), r.PP["transactionId"])
return output, err
},
}
}
67 changes: 67 additions & 0 deletions pkg/fftm/route_post_transaction_suspend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fftm

import (
"fmt"
"testing"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-transaction-manager/mocks/txhandlermocks"
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestPostTransactionSuspend(t *testing.T) {

url, m, done := newTestManager(t)
defer done()
tx := newTestTxn(t, m, "0x0aaaaa", 10001, apitypes.TxStatusSucceeded)
txID := tx.ID

err := m.Start()
assert.NoError(t, err)

var txOut *apitypes.ManagedTX
emptyInput := map[string]interface{}{}
res, err := resty.New().R().
SetResult(&txOut).
Body(emptyInput).
Post(fmt.Sprintf("%s/transactions/%s/suspend", url, txID))
assert.NoError(t, err)
assert.Equal(t, 202, res.StatusCode())
assert.Equal(t, txID, txOut.ID)
}

func TestPostTransactionSuspendFailed(t *testing.T) {
url, m, done := newTestManager(t)
defer done()

err := m.Start()
assert.NoError(t, err)
mth := txhandlermocks.TransactionHandler{}
mth.On("HandleCancelTransaction", mock.Anything, "1234").Return(nil, fmt.Errorf("error")).Once()
m.txHandler = &mth

var txOut *apitypes.ManagedTX
res, err := resty.New().R().
SetResult(&txOut).
Delete(fmt.Sprintf("%s/transactions/%s", url, "1234"))
assert.NoError(t, err)
assert.Equal(t, 500, res.StatusCode())
}
12 changes: 12 additions & 0 deletions pkg/fftm/transaction_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,15 @@ func (m *manager) requestTransactionDeletion(ctx context.Context, txID string) (
return http.StatusAccepted, canceledTx, nil

}

func (m *manager) requestTransactionSuspend(ctx context.Context, txID string) (status int, transaction *apitypes.ManagedTX, err error) {

canceledTx, err := m.txHandler.HandleSuspendTransaction(ctx, txID)

if err != nil {
return http.StatusInternalServerError, nil, err
}

return http.StatusAccepted, canceledTx, nil

}
42 changes: 30 additions & 12 deletions pkg/txhandler/simple/policyloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type policyEngineAPIRequestType int

const (
policyEngineAPIRequestTypeDelete policyEngineAPIRequestType = iota
policyEngineAPIRequestTypeSuspend
)

type policyEngineAPIRequest struct {
Expand Down Expand Up @@ -170,7 +171,7 @@ func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflig
// Go through executing the policy engine against them

for _, pending := range sth.inflight {
err := sth.execPolicy(ctx, pending, false)
err := sth.execPolicy(ctx, pending)
if err != nil {
log.L(ctx).Errorf("Failed policy cycle transaction=%s operation=%s: %s", pending.mtx.TransactionHash, pending.mtx.ID, err)
}
Expand Down Expand Up @@ -220,8 +221,8 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex
}

switch request.requestType {
case policyEngineAPIRequestTypeDelete:
if err := sth.execPolicy(ctx, pending, true); err != nil {
case policyEngineAPIRequestTypeDelete, policyEngineAPIRequestTypeSuspend:
if err := sth.execPolicy(ctx, pending, request.requestType); err != nil {
request.response <- policyEngineAPIResponse{err: err}
} else {
res := policyEngineAPIResponse{tx: pending.mtx, status: http.StatusAccepted}
Expand All @@ -239,7 +240,7 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex

}

func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncDeleteRequest bool) (ctx *RunContext, err error) {
func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncRequest ...policyEngineAPIRequestType) (ctx *RunContext, err error) {

// Take a snapshot of the pending state under the lock
sth.mux.Lock()
Expand All @@ -254,7 +255,16 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context
}
confirmNotify := pending.confirmNotify
receiptNotify := pending.receiptNotify
if syncDeleteRequest && mtx.DeleteRequested == nil {
for _, sr := range syncRequest {
switch sr {
case policyEngineAPIRequestTypeDelete:
ctx.Deleting = true
case policyEngineAPIRequestTypeSuspend:
ctx.Suspending = true
}
}

if ctx.Deleting && mtx.DeleteRequested == nil {
mtx.DeleteRequested = fftypes.Now()
ctx.UpdateType = Update // might change to delete later
ctx.TXUpdates.DeleteRequested = mtx.DeleteRequested
Expand Down Expand Up @@ -299,17 +309,17 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context
return ctx, nil
}

func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending *pendingState, syncDeleteRequest bool) (err error) {
func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending *pendingState, syncRequest ...policyEngineAPIRequestType) (err error) {

ctx, err := sth.pendingToRunContext(baseCtx, pending, syncDeleteRequest)
ctx, err := sth.pendingToRunContext(baseCtx, pending, syncRequest...)
if err != nil {
return nil
}
mtx := ctx.TX

completed := false
switch {
case ctx.Confirmed && !syncDeleteRequest:
case ctx.Confirmed && !ctx.Deleting:
completed = true
ctx.UpdateType = Update
if ctx.Receipt != nil && ctx.Receipt.Success {
Expand All @@ -319,13 +329,20 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending
mtx.Status = apitypes.TxStatusFailed
ctx.TXUpdates.Status = &mtx.Status
}

case ctx.Suspending:
// Whole cycle is a no-op if we're not pending
if mtx.Status == apitypes.TxStatusPending {
completed = true
ctx.UpdateType = Update
mtx.Status = apitypes.TxStatusSuspended
ctx.TXUpdates.Status = &mtx.Status
}
default:
// We get woken for lots of reasons to go through the policy loop, but we only want
// to drive the policy engine at regular intervals.
// So we track the last time we ran the policy engine against each pending item.
// We always call the policy engine on every loop, when deletion has been requested.
if syncDeleteRequest || time.Since(pending.lastPolicyCycle) > sth.policyLoopInterval {
if ctx.Deleting || time.Since(pending.lastPolicyCycle) > sth.policyLoopInterval {
// Pass the state to the pluggable policy engine to potentially perform more actions against it,
// such as submitting for the first time, or raising the gas etc.

Expand Down Expand Up @@ -398,11 +415,12 @@ func (sth *simpleTransactionHandler) flushChanges(ctx *RunContext, pending *pend
}
if completed {
pending.remove = true // for the next time round the loop
log.L(ctx).Infof("Transaction %s marked complete (status=%s): %s", mtx.ID, mtx.Status, err)
log.L(ctx).Infof("Transaction %s removed from tracking (status=%s): %s", mtx.ID, mtx.Status, err)
sth.markInflightStale()

// if and only if the transaction is now resolved dispatch an event to event handler
// and discard any handling errors
// and discard any handling errors.
// Note that TxStatusSuspended has no action here.
if mtx.Status == apitypes.TxStatusSucceeded {
_ = sth.toolkit.EventHandler.HandleEvent(ctx, apitypes.ManagedTransactionEvent{
Type: apitypes.ManagedTXProcessSucceeded,
Expand Down
51 changes: 50 additions & 1 deletion pkg/txhandler/simple/policyloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,55 @@ func TestExecPolicyDeleteInflightSync(t *testing.T) {

}

func TestExecPolicySuspendInflightSync(t *testing.T) {
f, tk, _, conf := newTestTransactionHandlerFactory(t)
conf.Set(FixedGasPrice, `12345`)

th, err := f.NewTransactionHandler(context.Background(), conf)
assert.NoError(t, err)

sth := th.(*simpleTransactionHandler)
sth.ctx = context.Background()
sth.Init(sth.ctx, tk)
eh := &fftm.ManagedTransactionEventHandler{
Ctx: context.Background(),
TxHandler: sth,
}
mc := &confirmationsmocks.Manager{}
mc.On("Notify", mock.Anything).Return(nil)
eh.ConfirmationManager = mc
mws := &wsmocks.WebSocketServer{}
mws.On("SendReply", mock.Anything).Return(nil).Maybe()

eh.WsServer = mws
sth.toolkit.EventHandler = eh
mp := sth.toolkit.TXPersistence.(*persistencemocks.Persistence)
mp.On("InsertTransactionWithNextNonce", sth.ctx, mock.Anything, mock.Anything).Return(nil, nil).Once()
mp.On("AddSubStatusAction", sth.ctx, mock.Anything, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, mock.Anything, mock.Anything).Return(nil)
tx := sendSampleTX(t, sth, "0xaaaaa", 12345, "")
sth.inflight = []*pendingState{{mtx: tx}}
mp.On("UpdateTransaction", mock.AnythingOfType("*simple.RunContext"), tx.ID, mock.MatchedBy(func(updates *apitypes.TXUpdates) bool {
return updates.Status != nil && *updates.Status == apitypes.TxStatusSuspended
})).Return(nil)

req := &policyEngineAPIRequest{
requestType: policyEngineAPIRequestTypeSuspend,
txID: tx.ID,
response: make(chan policyEngineAPIResponse, 1),
}
sth.policyEngineAPIRequests = append(sth.policyEngineAPIRequests, req)

sth.processPolicyAPIRequests(sth.ctx)

res := <-req.response
assert.NoError(t, res.err)
assert.Equal(t, http.StatusOK, res.status)
assert.True(t, sth.inflight[0].remove)

mp.AssertExpectations(t)

}

func TestExecPolicyIdempotentCancellation(t *testing.T) {
f, tk, _, conf := newTestTransactionHandlerFactory(t)
conf.Set(FixedGasPrice, `12345`)
Expand Down Expand Up @@ -1104,7 +1153,7 @@ func TestExecPolicyUpdateNewInfo(t *testing.T) {
FirstSubmit: fftypes.Now(),
},
receipt: &ffcapi.TransactionReceiptResponse{},
}, false)
})
assert.NoError(t, err)

}
Expand Down
10 changes: 10 additions & 0 deletions pkg/txhandler/simple/simple_transaction_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type RunContext struct {
Receipt *ffcapi.TransactionReceiptResponse
Confirmations *apitypes.ConfirmationsNotification
Confirmed bool
Suspending bool
Deleting bool
// Input/output
SubStatus apitypes.TxSubStatus
Info *simplePolicyInfo // must be updated in-place and set UpdatedInfo to true as well as UpdateType = Update
Expand Down Expand Up @@ -284,6 +286,14 @@ func (sth *simpleTransactionHandler) HandleCancelTransaction(ctx context.Context
return res.tx, nil
}

func (sth *simpleTransactionHandler) HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) {
res := sth.policyEngineAPIRequest(ctx, &policyEngineAPIRequest{
requestType: policyEngineAPIRequestTypeSuspend,
txID: txID,
})
return res.tx, nil
}

func (sth *simpleTransactionHandler) createManagedTx(ctx context.Context, txID string, txHeaders *ffcapi.TransactionHeaders, gas *fftypes.FFBigInt, transactionData string) (*apitypes.ManagedTX, error) {

if gas != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/txhandler/txhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type TransactionHandler interface {
HandleNewContractDeployment(ctx context.Context, txReq *apitypes.ContractDeployRequest) (mtx *apitypes.ManagedTX, err error)
// HandleCancelTransaction - handles event of cancelling a managed transaction
HandleCancelTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error)
// HandleSuspendTransaction - handles event of suspending a managed transaction
HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error)

// Informational events:
// HandleTransactionConfirmations - handles confirmations of blockchain transactions for a managed transaction
Expand Down

0 comments on commit 8c9b33b

Please sign in to comment.