diff --git a/.vscode/settings.json b/.vscode/settings.json index 5ff7c562..0bc36382 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -84,6 +84,7 @@ "Tracef", "txcommon", "txcommonmocks", + "txhistory", "txid", "txns", "txtype", diff --git a/Makefile b/Makefile index 2fb5969e..004db9b8 100644 --- a/Makefile +++ b/Makefile @@ -33,6 +33,7 @@ endef $(eval $(call makemock, pkg/ffcapi, API, ffcapimocks)) $(eval $(call makemock, pkg/policyengine, PolicyEngine, policyenginemocks)) +$(eval $(call makemock, pkg/txhistory, Manager, txhistorymocks)) $(eval $(call makemock, internal/confirmations, Manager, confirmationsmocks)) $(eval $(call makemock, internal/persistence, Persistence, persistencemocks)) $(eval $(call makemock, internal/ws, WebSocketChannels, wsmocks)) diff --git a/config.md b/config.md index 118133e0..68fc7e01 100644 --- a/config.md +++ b/config.md @@ -241,9 +241,7 @@ |Key|Description|Type|Default Value| |---|-----------|----|-------------| -|maxHistoryActions|The number of actions to store per historical status updates|`int`|`50` |maxHistoryCount|The number of historical status updates to retain in the operation|`int`|`50` -|maxHistorySummaryCount|The number of historical status summary records to retain in the operation|`int`|`50` |maxInFlight|The maximum number of transactions to have in-flight with the policy engine / blockchain transaction pool|`int`|`100` |nonceStateTimeout|How old the most recently submitted transaction record in our local state needs to be, before we make a request to the node to query the next nonce for a signing address|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1h` diff --git a/internal/confirmations/confirmations.go b/internal/confirmations/confirmations.go index e01bc034..670706ad 100644 --- a/internal/confirmations/confirmations.go +++ b/internal/confirmations/confirmations.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -13,6 +13,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package confirmations import ( diff --git a/internal/tmconfig/tmconfig.go b/internal/tmconfig/tmconfig.go index f94c0e3a..bc06df90 100644 --- a/internal/tmconfig/tmconfig.go +++ b/internal/tmconfig/tmconfig.go @@ -31,8 +31,6 @@ var ( ConfirmationsStaleReceiptTimeout = ffc("confirmations.staleReceiptTimeout") ConfirmationsNotificationQueueLength = ffc("confirmations.notificationQueueLength") TransactionsMaxHistoryCount = ffc("transactions.maxHistoryCount") - TransactionsMaxHistorySummaryCount = ffc("transactions.maxHistorySummaryCount") - TransactionsMaxHistoryActions = ffc("transactions.maxHistoryActions") TransactionsMaxInFlight = ffc("transactions.maxInFlight") TransactionsNonceStateTimeout = ffc("transactions.nonceStateTimeout") PolicyLoopInterval = ffc("policyloop.interval") @@ -77,8 +75,6 @@ var MetricsConfig config.Section func setDefaults() { viper.SetDefault(string(TransactionsMaxInFlight), 100) viper.SetDefault(string(TransactionsMaxHistoryCount), 50) - viper.SetDefault(string(TransactionsMaxHistorySummaryCount), 50) - viper.SetDefault(string(TransactionsMaxHistoryActions), 50) viper.SetDefault(string(TransactionsNonceStateTimeout), "1h") viper.SetDefault(string(ConfirmationsRequired), 20) viper.SetDefault(string(ConfirmationsBlockQueueLength), 50) diff --git a/mocks/confirmationsmocks/manager.go b/mocks/confirmationsmocks/manager.go index 059e3894..fe54e4b9 100644 --- a/mocks/confirmationsmocks/manager.go +++ b/mocks/confirmationsmocks/manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.14.1. DO NOT EDIT. package confirmationsmocks @@ -69,3 +69,18 @@ func (_m *Manager) Start() { func (_m *Manager) Stop() { _m.Called() } + +type mockConstructorTestingTNewManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewManager creates a new instance of Manager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewManager(t mockConstructorTestingTNewManager) *Manager { + mock := &Manager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/eventsmocks/stream.go b/mocks/eventsmocks/stream.go index 85b2bc05..7d0bc64e 100644 --- a/mocks/eventsmocks/stream.go +++ b/mocks/eventsmocks/stream.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.14.1. DO NOT EDIT. package eventsmocks @@ -139,3 +139,18 @@ func (_m *Stream) UpdateSpec(ctx context.Context, updates *apitypes.EventStream) return r0 } + +type mockConstructorTestingTNewStream interface { + mock.TestingT + Cleanup(func()) +} + +// NewStream creates a new instance of Stream. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewStream(t mockConstructorTestingTNewStream) *Stream { + mock := &Stream{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/persistencemocks/persistence.go b/mocks/persistencemocks/persistence.go index 309fa3dd..3e2f0951 100644 --- a/mocks/persistencemocks/persistence.go +++ b/mocks/persistencemocks/persistence.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.14.1. DO NOT EDIT. package persistencemocks @@ -388,3 +388,18 @@ func (_m *Persistence) WriteTransaction(ctx context.Context, tx *apitypes.Manage return r0 } + +type mockConstructorTestingTNewPersistence interface { + mock.TestingT + Cleanup(func()) +} + +// NewPersistence creates a new instance of Persistence. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewPersistence(t mockConstructorTestingTNewPersistence) *Persistence { + mock := &Persistence{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/policyenginemocks/policy_engine.go b/mocks/policyenginemocks/policy_engine.go index 693704df..030a4214 100644 --- a/mocks/policyenginemocks/policy_engine.go +++ b/mocks/policyenginemocks/policy_engine.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.14.1. DO NOT EDIT. package policyenginemocks @@ -20,25 +20,25 @@ type PolicyEngine struct { } // Execute provides a mock function with given fields: ctx, cAPI, mtx -func (_m *PolicyEngine) Execute(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (policyengine.UpdateType, ffcapi.ErrorReason, error) { +func (_m *PolicyEngine) Execute(ctx context.Context, cAPI *policyengine.ToolkitAPI, mtx *apitypes.ManagedTX) (policyengine.UpdateType, ffcapi.ErrorReason, error) { ret := _m.Called(ctx, cAPI, mtx) var r0 policyengine.UpdateType - if rf, ok := ret.Get(0).(func(context.Context, ffcapi.API, *apitypes.ManagedTX) policyengine.UpdateType); ok { + if rf, ok := ret.Get(0).(func(context.Context, *policyengine.ToolkitAPI, *apitypes.ManagedTX) policyengine.UpdateType); ok { r0 = rf(ctx, cAPI, mtx) } else { r0 = ret.Get(0).(policyengine.UpdateType) } var r1 ffcapi.ErrorReason - if rf, ok := ret.Get(1).(func(context.Context, ffcapi.API, *apitypes.ManagedTX) ffcapi.ErrorReason); ok { + if rf, ok := ret.Get(1).(func(context.Context, *policyengine.ToolkitAPI, *apitypes.ManagedTX) ffcapi.ErrorReason); ok { r1 = rf(ctx, cAPI, mtx) } else { r1 = ret.Get(1).(ffcapi.ErrorReason) } var r2 error - if rf, ok := ret.Get(2).(func(context.Context, ffcapi.API, *apitypes.ManagedTX) error); ok { + if rf, ok := ret.Get(2).(func(context.Context, *policyengine.ToolkitAPI, *apitypes.ManagedTX) error); ok { r2 = rf(ctx, cAPI, mtx) } else { r2 = ret.Error(2) @@ -46,3 +46,18 @@ func (_m *PolicyEngine) Execute(ctx context.Context, cAPI ffcapi.API, mtx *apity return r0, r1, r2 } + +type mockConstructorTestingTNewPolicyEngine interface { + mock.TestingT + Cleanup(func()) +} + +// NewPolicyEngine creates a new instance of PolicyEngine. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewPolicyEngine(t mockConstructorTestingTNewPolicyEngine) *PolicyEngine { + mock := &PolicyEngine{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/txhistorymocks/manager.go b/mocks/txhistorymocks/manager.go new file mode 100644 index 00000000..7c6a7909 --- /dev/null +++ b/mocks/txhistorymocks/manager.go @@ -0,0 +1,59 @@ +// Code generated by mockery v2.14.1. DO NOT EDIT. + +package txhistorymocks + +import ( + context "context" + + apitypes "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" + + fftypes "github.com/hyperledger/firefly-common/pkg/fftypes" + + mock "github.com/stretchr/testify/mock" +) + +// Manager is an autogenerated mock type for the Manager type +type Manager struct { + mock.Mock +} + +// AddSubStatusAction provides a mock function with given fields: ctx, mtx, action, info, err +func (_m *Manager) AddSubStatusAction(ctx context.Context, mtx *apitypes.ManagedTX, action apitypes.TxAction, info *fftypes.JSONAny, err *fftypes.JSONAny) { + _m.Called(ctx, mtx, action, info, err) +} + +// CurrentSubStatus provides a mock function with given fields: ctx, mtx +func (_m *Manager) CurrentSubStatus(ctx context.Context, mtx *apitypes.ManagedTX) *apitypes.TxHistoryStateTransitionEntry { + ret := _m.Called(ctx, mtx) + + var r0 *apitypes.TxHistoryStateTransitionEntry + if rf, ok := ret.Get(0).(func(context.Context, *apitypes.ManagedTX) *apitypes.TxHistoryStateTransitionEntry); ok { + r0 = rf(ctx, mtx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*apitypes.TxHistoryStateTransitionEntry) + } + } + + return r0 +} + +// SetSubStatus provides a mock function with given fields: ctx, mtx, subStatus +func (_m *Manager) SetSubStatus(ctx context.Context, mtx *apitypes.ManagedTX, subStatus apitypes.TxSubStatus) { + _m.Called(ctx, mtx, subStatus) +} + +type mockConstructorTestingTNewManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewManager creates a new instance of Manager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewManager(t mockConstructorTestingTNewManager) *Manager { + mock := &Manager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/mocks/wsmocks/web_socket_channels.go b/mocks/wsmocks/web_socket_channels.go index e5a4b240..ee728c83 100644 --- a/mocks/wsmocks/web_socket_channels.go +++ b/mocks/wsmocks/web_socket_channels.go @@ -1,4 +1,4 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v2.14.1. DO NOT EDIT. package wsmocks @@ -50,3 +50,18 @@ func (_m *WebSocketChannels) GetChannels(topic string) (chan<- interface{}, chan func (_m *WebSocketChannels) SendReply(message interface{}) { _m.Called(message) } + +type mockConstructorTestingTNewWebSocketChannels interface { + mock.TestingT + Cleanup(func()) +} + +// NewWebSocketChannels creates a new instance of WebSocketChannels. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewWebSocketChannels(t mockConstructorTestingTNewWebSocketChannels) *WebSocketChannels { + mock := &WebSocketChannels{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/apitypes/managed_tx.go b/pkg/apitypes/managed_tx.go index e05498c7..77c3a0c6 100644 --- a/pkg/apitypes/managed_tx.go +++ b/pkg/apitypes/managed_tx.go @@ -17,12 +17,7 @@ package apitypes import ( - "context" - "encoding/json" - "fmt" - "github.com/hyperledger/firefly-common/pkg/fftypes" - "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly-transaction-manager/internal/confirmations" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" ) @@ -55,17 +50,22 @@ const ( TxSubStatusFailed TxSubStatus = "Failed" ) -type TxHistoryRecord struct { - Time *fftypes.FFTime `json:"time"` - Status TxSubStatus `json:"subStatus"` - Actions []*TxActionEntry `json:"actions"` - Info string `json:"info,omitempty"` - Error string `json:"error,omitempty"` +// TxHistoryStateTransitionEntry represents a state that the policy engine that manages transaction submission has entered, +// and a list of the actions attempted within that state in order to attempt to move to the next state. +type TxHistoryStateTransitionEntry struct { + Status TxSubStatus `json:"subStatus"` // the subStatus we entered + Time *fftypes.FFTime `json:"time"` // the time we transitioned to this subStatus + Actions []*TxHistoryActionEntry `json:"actions"` // the unique actions we attempted while in this sub-status } -type TxHistorySummaryRecord struct { - FirstOccurrence *fftypes.FFTime `json:"firstOccurence"` - Status TxSubStatus `json:"subStatus"` +// TxHistorySummaryEntry records summarize the transaction history, by recording the number of times each +// subStatus was entered. Because the detailed history might wrap, this means we can retain some basic +// information about the complete history of the transaction beyond the life of the individual history records. +type TxHistorySummaryEntry struct { + Status TxSubStatus `json:"subStatus,omitempty"` + Action TxAction `json:"action,omitempty"` + FirstOccurrence *fftypes.FFTime `json:"firstOccurrence"` + LastOccurrence *fftypes.FFTime `json:"lastOccurrence"` Count int `json:"count"` } @@ -87,8 +87,12 @@ const ( TxActionConfirmTransaction TxAction = "Confirm" ) -// An action taken in order to progress a transaction, e.g. retrieve gas price from an oracle -type TxActionEntry struct { +// An action taken in order to progress a transaction, e.g. retrieve gas price from an oracle. +// Actions are retaining similarly to the TxHistorySummaryEntry records, where we have a finite +// list based on the action name. Actions are only added to the list once, then updated +// when they occur multiple times. So if we are retrying the same set of actions over and over +// again the list of actions does not grow. +type TxHistoryActionEntry struct { Time *fftypes.FFTime `json:"time"` Action TxAction `json:"action"` LastOccurrence *fftypes.FFTime `json:"lastOccurrence"` @@ -133,8 +137,8 @@ type ManagedTX struct { LastSubmit *fftypes.FFTime `json:"lastSubmit,omitempty"` Receipt *ffcapi.TransactionReceiptResponse `json:"receipt,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` - History []*TxHistoryRecord `json:"history,omitempty"` - HistorySummary []*TxHistorySummaryRecord `json:"historySummary,omitempty"` + History []*TxHistoryStateTransitionEntry `json:"history,omitempty"` + HistorySummary []*TxHistorySummaryEntry `json:"historySummary,omitempty"` Confirmations []confirmations.BlockInfo `json:"confirmations,omitempty"` } @@ -162,135 +166,3 @@ type TransactionUpdateReply struct { ProtocolID string `json:"protocolId"` TransactionHash string `json:"transactionHash,omitempty"` } - -func (mtx *ManagedTX) CurrentSubStatus(ctx context.Context) *TxHistoryRecord { - if len(mtx.History) > 0 { - return mtx.History[len(mtx.History)-1] - } - return nil -} - -// Transaction sub-status entries can be added for a given transaction so a caller -// can see discrete steps in a transaction moving to confirmation on the blockchain. -// For example a transaction might have a sub-status of "Stale" if a transaction has -// been in pending state for a given period of time. In order to progress the transaction -// while it's in a given sub-status, certain actions might be taken (such as retrieving -// the latest gas price for the chain). See AddSubStatusAction(). Since a transaction -// might go through many sub-status changes before being confirmed on chain the list of -// entries is capped at the configured number and FIFO approach used to keep within that cap. -func (mtx *ManagedTX) AddSubStatus(ctx context.Context, subStatus TxSubStatus) { - // See if the status being added is the same as the current status. If so we won't create - // a new record, just increment the total count - if len(mtx.History) > 0 { - if mtx.History[len(mtx.History)-1].Status == subStatus { - return - } - log.L(ctx).Debugf("Entered sub-status %s", subStatus) - } - - // Do we need to remove the oldest entry to make space for this one? - if len(mtx.History) > 50 { // TODO - get from config - mtx.History = mtx.History[1:] - } else { - // If this is a change in status add a new record - newStatus := &TxHistoryRecord{ - Time: fftypes.Now(), - Status: subStatus, - Actions: make([]*TxActionEntry, 0), - } - mtx.History = append(mtx.History, newStatus) - - // As was as detailed sub-status records (which might be a long list and early entries - // get purged at some point) we keep a separate list of all the discrete types of sub-status - // we've ever seen for this transaction along with a count of them. This means an early sub-status - // (e.g. "queued") followed by 100s of different sub-status types will still be recorded - newHistorySummary := true - for _, statusType := range mtx.HistorySummary { - if statusType.Status == subStatus { - // Just increment the counter - statusType.Count++ - newHistorySummary = false - break - } - } - - if newHistorySummary { - if len(mtx.HistorySummary) < 50 { // TODO - get from config - mtx.HistorySummary = append(mtx.HistorySummary, &TxHistorySummaryRecord{Status: subStatus, Count: 1, FirstOccurrence: fftypes.Now()}) - } else { - log.L(ctx).Warnf("Reached maximum number of history summary records. New summary status will be not be recorded.") - } - } - } -} - -// Make sure the provided value can be serialised to JSON -func ensureValidJSON(value *fftypes.JSONAny) *fftypes.JSONAny { - if json.Valid([]byte(*value)) { - // Already valid - return value - } - - // Convert to hex and wrap in a valid struct - hex := fmt.Sprintf("%x", []byte(*value)) - return fftypes.JSONAnyPtr(`{"invalidJson":"` + hex + `"}`) -} - -// When a transaction is in a given sub-status (e.g. "Stale") the blockchain connector -// may perform certain actions to move it out of the status. For example it might -// retrieve the current gas price for the chain. TxAction's represent an action taken -// while in a given sub-status. In order to limit the number of TxAction entries in -// a TxSubStatusEntry each action has a count of the number of occurrences and a -// latest timestamp to indicate when it was last executed. There is a last error field -// which can be used to indicate the most recent error that occurred, for example an -// HTTP 4xx return code from a gas oracle. There is also an information field to record -// arbitrary data about the action, for example the gas price retrieved from an oracle. -func (mtx *ManagedTX) AddSubStatusAction(ctx context.Context, action TxAction, info *fftypes.JSONAny, error *fftypes.JSONAny) { - // An action always exists within a sub-status. If a sub-status hasn't been recorded yet we don't record the action - if len(mtx.History) > 0 { - - // See if this action exists in the list already since we only want to update the single entry, not - // add a new one - currentSubStatus := mtx.History[len(mtx.History)-1] - for _, entry := range currentSubStatus.Actions { - if entry.Action == action { - entry.Count++ - entry.LastOccurrence = fftypes.Now() - - if error != nil { - entry.LastError = ensureValidJSON(error) - entry.LastErrorTime = fftypes.Now() - } - - if info != nil { - entry.LastInfo = ensureValidJSON(info) - } - return - } - } - - // This action hasn't been recorded yet in this sub-status. Add a new entry for it. - if len(currentSubStatus.Actions) >= 50 { // TODO - get from config - log.L(ctx).Warn("Number of unique sub-status actions. New action detail will not be recorded.") - } else { - // If this is an entirely new status add it to the list - newAction := &TxActionEntry{ - Time: fftypes.Now(), - Action: action, - LastOccurrence: fftypes.Now(), - Count: 1, - } - - if error != nil { - newAction.LastError = ensureValidJSON(error) - newAction.LastErrorTime = fftypes.Now() - } - - if info != nil { - newAction.LastInfo = ensureValidJSON(info) - } - - currentSubStatus.Actions = append(currentSubStatus.Actions, newAction) - } - } -} diff --git a/pkg/fftm/manager.go b/pkg/fftm/manager.go index a6f1e87a..64efbcc5 100644 --- a/pkg/fftm/manager.go +++ b/pkg/fftm/manager.go @@ -39,6 +39,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" "github.com/hyperledger/firefly-transaction-manager/pkg/policyengine" "github.com/hyperledger/firefly-transaction-manager/pkg/policyengines" + "github.com/hyperledger/firefly-transaction-manager/pkg/txhistory" ) type Manager interface { @@ -70,7 +71,6 @@ type manager struct { ctx context.Context cancelCtx func() retry *retry.Retry - connector ffcapi.API confirmations confirmations.Manager policyEngine policyengine.PolicyEngine apiServer httpserver.HTTPServer @@ -81,6 +81,10 @@ type manager struct { inflightUpdate chan bool inflight []*pendingState + txhistory txhistory.Manager + connector ffcapi.API + tkAPI *policyengine.ToolkitAPI + mux sync.Mutex policyEngineAPIRequests []*policyEngineAPIRequest lockedNonces map[string]*lockedNonce @@ -96,12 +100,9 @@ type manager struct { debugServer *http.Server debugServerDone chan struct{} - policyLoopInterval time.Duration - nonceStateTimeout time.Duration - maxHistoryCount int - maxHistorySummaryCount int - maxHistoryActions int - maxInFlight int + policyLoopInterval time.Duration + nonceStateTimeout time.Duration + maxInFlight int } func InitConfig() { @@ -123,27 +124,29 @@ func NewManager(ctx context.Context, connector ffcapi.API) (Manager, error) { func newManager(ctx context.Context, connector ffcapi.API) *manager { m := &manager{ - connector: connector, - lockedNonces: make(map[string]*lockedNonce), - apiServerDone: make(chan error), - metricsServerDone: make(chan error), - metricsEnabled: config.GetBool(tmconfig.MetricsEnabled), - eventStreams: make(map[fftypes.UUID]events.Stream), - streamsByName: make(map[string]*fftypes.UUID), - metricsManager: metrics.NewMetricsManager(ctx), - policyLoopInterval: config.GetDuration(tmconfig.PolicyLoopInterval), - maxHistoryCount: config.GetInt(tmconfig.TransactionsMaxHistoryCount), - maxHistorySummaryCount: config.GetInt(tmconfig.TransactionsMaxHistorySummaryCount), - maxHistoryActions: config.GetInt(tmconfig.TransactionsMaxHistoryActions), - maxInFlight: config.GetInt(tmconfig.TransactionsMaxInFlight), - nonceStateTimeout: config.GetDuration(tmconfig.TransactionsNonceStateTimeout), - inflightStale: make(chan bool, 1), - inflightUpdate: make(chan bool, 1), + connector: connector, + lockedNonces: make(map[string]*lockedNonce), + apiServerDone: make(chan error), + metricsServerDone: make(chan error), + metricsEnabled: config.GetBool(tmconfig.MetricsEnabled), + eventStreams: make(map[fftypes.UUID]events.Stream), + streamsByName: make(map[string]*fftypes.UUID), + metricsManager: metrics.NewMetricsManager(ctx), + policyLoopInterval: config.GetDuration(tmconfig.PolicyLoopInterval), + maxInFlight: config.GetInt(tmconfig.TransactionsMaxInFlight), + nonceStateTimeout: config.GetDuration(tmconfig.TransactionsNonceStateTimeout), + inflightStale: make(chan bool, 1), + inflightUpdate: make(chan bool, 1), retry: &retry.Retry{ InitialDelay: config.GetDuration(tmconfig.PolicyLoopRetryInitDelay), MaximumDelay: config.GetDuration(tmconfig.PolicyLoopRetryMaxDelay), Factor: config.GetFloat64(tmconfig.PolicyLoopRetryFactor), }, + txhistory: txhistory.NewTxHistoryManager(ctx), + } + m.tkAPI = &policyengine.ToolkitAPI{ + Connector: m.connector, + TXHistory: m.txhistory, } m.ctx, m.cancelCtx = context.WithCancel(ctx) return m diff --git a/pkg/fftm/policyloop.go b/pkg/fftm/policyloop.go index 042cd348..f59c890b 100644 --- a/pkg/fftm/policyloop.go +++ b/pkg/fftm/policyloop.go @@ -196,7 +196,7 @@ func (m *manager) execPolicy(ctx context.Context, pending *pendingState, syncDel completed := false var receiptProtocolID string var lastStatusChange *fftypes.FFTime - currentSubStatus := pending.mtx.CurrentSubStatus(ctx) + currentSubStatus := m.txhistory.CurrentSubStatus(ctx, pending.mtx) if currentSubStatus != nil { lastStatusChange = currentSubStatus.Time @@ -237,7 +237,7 @@ func (m *manager) execPolicy(ctx context.Context, pending *pendingState, syncDel // Pass the state to the pluggable policy engine to potentially perform more actions against it, // such as submitting for the first time, or raising the gas etc. - update, updateReason, updateErr = m.policyEngine.Execute(ctx, m.connector, pending.mtx) + update, updateReason, updateErr = m.policyEngine.Execute(ctx, m.tkAPI, pending.mtx) if updateErr != nil { log.L(ctx).Errorf("Policy engine returned error for transaction %s reason=%s: %s", mtx.ID, updateReason, err) update = policyengine.UpdateYes @@ -255,8 +255,8 @@ func (m *manager) execPolicy(ctx context.Context, pending *pendingState, syncDel } } - if mtx.CurrentSubStatus(ctx) != nil { - if !mtx.CurrentSubStatus(ctx).Time.Equal(lastStatusChange) { + if m.txhistory.CurrentSubStatus(ctx, mtx) != nil { + if !m.txhistory.CurrentSubStatus(ctx, mtx).Time.Equal(lastStatusChange) { update = policyengine.UpdateYes } } @@ -341,7 +341,7 @@ func (m *manager) trackSubmittedTransaction(ctx context.Context, pending *pendin pending.mtx.Receipt = receipt m.mux.Unlock() log.L(m.ctx).Debugf("Receipt received for transaction %s at nonce %s / %d - hash: %s", pending.mtx.ID, pending.mtx.TransactionHeaders.From, pending.mtx.Nonce.Int64(), pending.mtx.TransactionHash) - pending.mtx.AddSubStatusAction(ctx, apitypes.TxActionReceiveReceipt, fftypes.JSONAnyPtr(`{"protocolId":"`+receipt.ProtocolID+`"}`), nil) + m.txhistory.AddSubStatusAction(ctx, pending.mtx, apitypes.TxActionReceiveReceipt, fftypes.JSONAnyPtr(`{"protocolId":"`+receipt.ProtocolID+`"}`), nil) m.markInflightUpdate() }, Confirmed: func(ctx context.Context, confirmations []confirmations.BlockInfo) { @@ -351,8 +351,8 @@ func (m *manager) trackSubmittedTransaction(ctx context.Context, pending *pendin pending.mtx.Confirmations = confirmations m.mux.Unlock() log.L(m.ctx).Debugf("Confirmed transaction %s at nonce %s / %d - hash: %s", pending.mtx.ID, pending.mtx.TransactionHeaders.From, pending.mtx.Nonce.Int64(), pending.mtx.TransactionHash) - pending.mtx.AddSubStatusAction(ctx, apitypes.TxActionConfirmTransaction, nil, nil) - pending.mtx.AddSubStatus(ctx, apitypes.TxSubStatusConfirmed) + m.txhistory.AddSubStatusAction(ctx, pending.mtx, apitypes.TxActionConfirmTransaction, nil, nil) + m.txhistory.SetSubStatus(ctx, pending.mtx, apitypes.TxSubStatusConfirmed) m.markInflightUpdate() }, }, diff --git a/pkg/fftm/policyloop_test.go b/pkg/fftm/policyloop_test.go index 985f4b78..3a6b3efb 100644 --- a/pkg/fftm/policyloop_test.go +++ b/pkg/fftm/policyloop_test.go @@ -33,6 +33,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" "github.com/hyperledger/firefly-transaction-manager/pkg/policyengine" + "github.com/hyperledger/firefly-transaction-manager/pkg/txhistory" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -303,7 +304,8 @@ func TestPolicyLoopUpdateFail(t *testing.T) { }, } - m.inflight[0].mtx.AddSubStatus(m.ctx, apitypes.TxSubStatusReceived) + h := txhistory.NewTxHistoryManager(m.ctx) + h.SetSubStatus(m.ctx, m.inflight[0].mtx, apitypes.TxSubStatusReceived) mpe := &policyenginemocks.PolicyEngine{} m.policyEngine = mpe mpe.On("Execute", mock.Anything, mock.Anything, mock.Anything). diff --git a/pkg/fftm/send_tx.go b/pkg/fftm/send_tx.go index 607ec693..57edce52 100644 --- a/pkg/fftm/send_tx.go +++ b/pkg/fftm/send_tx.go @@ -92,8 +92,8 @@ func (m *manager) submitPreparedTX(ctx context.Context, txID string, txHeaders * Status: apitypes.TxStatusPending, } - mtx.AddSubStatus(ctx, apitypes.TxSubStatusReceived) - mtx.AddSubStatusAction(ctx, apitypes.TxActionAssignNonce, fftypes.JSONAnyPtr(`{"nonce":"`+mtx.Nonce.String()+`"}`), nil) + m.txhistory.SetSubStatus(ctx, mtx, apitypes.TxSubStatusReceived) + m.txhistory.AddSubStatusAction(ctx, mtx, apitypes.TxActionAssignNonce, fftypes.JSONAnyPtr(`{"nonce":"`+mtx.Nonce.String()+`"}`), nil) if err = m.persistence.WriteTransaction(m.ctx, mtx, true); err != nil { return nil, err diff --git a/pkg/policyengine/policyengine.go b/pkg/policyengine/policyengine.go index 5d9aceb0..93e13e21 100644 --- a/pkg/policyengine/policyengine.go +++ b/pkg/policyengine/policyengine.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -21,11 +21,17 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" + "github.com/hyperledger/firefly-transaction-manager/pkg/txhistory" ) // UpdateType informs FFTM whether the transaction needs an update to be persisted after this execution of the policy engine type UpdateType int +type ToolkitAPI struct { + Connector ffcapi.API + TXHistory txhistory.Manager +} + const ( UpdateNo UpdateType = iota // Instructs that no update is necessary UpdateYes // Instructs that the transaction should be updated in persistence @@ -33,5 +39,5 @@ const ( ) type PolicyEngine interface { - Execute(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (updateType UpdateType, reason ffcapi.ErrorReason, err error) + Execute(ctx context.Context, cAPI *ToolkitAPI, mtx *apitypes.ManagedTX) (updateType UpdateType, reason ffcapi.ErrorReason, err error) } diff --git a/pkg/policyengines/simple/simple_policy_engine.go b/pkg/policyengines/simple/simple_policy_engine.go index 829862d3..743f343a 100644 --- a/pkg/policyengines/simple/simple_policy_engine.go +++ b/pkg/policyengines/simple/simple_policy_engine.go @@ -112,8 +112,7 @@ func (p *simplePolicyEngine) withPolicyInfo(ctx context.Context, mtx *apitypes.M return update, reason, err } -func (p *simplePolicyEngine) submitTX(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (reason ffcapi.ErrorReason, err error) { - defer mtx.AddSubStatus(ctx, apitypes.TxSubStatusTracking) +func (p *simplePolicyEngine) submitTX(ctx context.Context, tk *policyengine.ToolkitAPI, mtx *apitypes.ManagedTX) (reason ffcapi.ErrorReason, err error) { sendTX := &ffcapi.TransactionSendRequest{ TransactionHeaders: mtx.TransactionHeaders, GasPrice: mtx.GasPrice, @@ -122,13 +121,13 @@ func (p *simplePolicyEngine) submitTX(ctx context.Context, cAPI ffcapi.API, mtx sendTX.TransactionHeaders.Nonce = (*fftypes.FFBigInt)(mtx.Nonce.Int()) sendTX.TransactionHeaders.Gas = (*fftypes.FFBigInt)(mtx.Gas.Int()) log.L(ctx).Debugf("Sending transaction %s at nonce %s / %d (lastSubmit=%s)", mtx.ID, mtx.TransactionHeaders.From, mtx.Nonce.Int64(), mtx.LastSubmit) - res, reason, err := cAPI.TransactionSend(ctx, sendTX) + res, reason, err := tk.Connector.TransactionSend(ctx, sendTX) if err == nil { - mtx.AddSubStatusAction(ctx, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+string(reason)+`"}`), nil) + tk.TXHistory.AddSubStatusAction(ctx, mtx, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+string(reason)+`"}`), nil) mtx.TransactionHash = res.TransactionHash mtx.LastSubmit = fftypes.Now() } else { - mtx.AddSubStatusAction(ctx, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+string(reason)+`"}`), fftypes.JSONAnyPtr(`{"error":"`+err.Error()+`"}`)) + tk.TXHistory.AddSubStatusAction(ctx, mtx, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+string(reason)+`"}`), fftypes.JSONAnyPtr(`{"error":"`+err.Error()+`"}`)) // We have some simple rules for handling reasons from the connector, which could be enhanced by extending the connector. switch reason { case ffcapi.ErrorKnownTransaction, ffcapi.ErrorReasonNonceTooLow: @@ -147,10 +146,11 @@ func (p *simplePolicyEngine) submitTX(ctx context.Context, cAPI ffcapi.API, mtx } } log.L(ctx).Infof("Transaction %s at nonce %s / %d submitted. Hash: %s", mtx.ID, mtx.TransactionHeaders.From, mtx.Nonce.Int64(), mtx.TransactionHash) + tk.TXHistory.SetSubStatus(ctx, mtx, apitypes.TxSubStatusTracking) return "", nil } -func (p *simplePolicyEngine) Execute(ctx context.Context, cAPI ffcapi.API, mtx *apitypes.ManagedTX) (update policyengine.UpdateType, reason ffcapi.ErrorReason, err error) { +func (p *simplePolicyEngine) Execute(ctx context.Context, tk *policyengine.ToolkitAPI, mtx *apitypes.ManagedTX) (update policyengine.UpdateType, reason ffcapi.ErrorReason, err error) { // Simply policy engine allows deletion of the transaction without additional checks ( ensuring the TX has not been submitted / gap filling the nonce etc. ) if mtx.DeleteRequested != nil { @@ -160,14 +160,14 @@ func (p *simplePolicyEngine) Execute(ctx context.Context, cAPI ffcapi.API, mtx * // Simple policy engine only submits once. if mtx.FirstSubmit == nil { // Only calculate gas price here in the simple policy engine - mtx.GasPrice, err = p.getGasPrice(ctx, cAPI) + mtx.GasPrice, err = p.getGasPrice(ctx, tk.Connector) if err != nil { - mtx.AddSubStatusAction(ctx, apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"error":"`+err.Error()+`"}`)) + tk.TXHistory.AddSubStatusAction(ctx, mtx, apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"error":"`+err.Error()+`"}`)) return policyengine.UpdateNo, "", err } - mtx.AddSubStatusAction(ctx, apitypes.TxActionRetrieveGasPrice, fftypes.JSONAnyPtr(`{"gasPrice":`+string(*mtx.GasPrice)+`}`), nil) + tk.TXHistory.AddSubStatusAction(ctx, mtx, apitypes.TxActionRetrieveGasPrice, fftypes.JSONAnyPtr(`{"gasPrice":`+string(*mtx.GasPrice)+`}`), nil) // Submit the first time - if reason, err := p.submitTX(ctx, cAPI, mtx); err != nil { + if reason, err := p.submitTX(ctx, tk, mtx); err != nil { return policyengine.UpdateYes, reason, err } mtx.FirstSubmit = mtx.LastSubmit @@ -189,20 +189,20 @@ func (p *simplePolicyEngine) Execute(ctx context.Context, cAPI ffcapi.API, mtx * log.L(ctx).Infof("Transaction %s at nonce %s / %d has not been mined after %.2fs", mtx.ID, mtx.TransactionHeaders.From, mtx.Nonce.Int64(), secsSinceSubmit) info.LastWarnTime = now // We do a resubmit at this point - as it might no longer be in the TX pool - mtx.AddSubStatusAction(ctx, apitypes.TxActionTimeout, nil, nil) - mtx.AddSubStatus(ctx, apitypes.TxSubStatusStale) - mtx.GasPrice, err = p.getGasPrice(ctx, cAPI) + tk.TXHistory.AddSubStatusAction(ctx, mtx, apitypes.TxActionTimeout, nil, nil) + tk.TXHistory.SetSubStatus(ctx, mtx, apitypes.TxSubStatusStale) + mtx.GasPrice, err = p.getGasPrice(ctx, tk.Connector) if err != nil { - mtx.AddSubStatusAction(ctx, apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"error":"`+err.Error()+`"}`)) + tk.TXHistory.AddSubStatusAction(ctx, mtx, apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"error":"`+err.Error()+`"}`)) return policyengine.UpdateNo, "", err } - mtx.AddSubStatusAction(ctx, apitypes.TxActionRetrieveGasPrice, fftypes.JSONAnyPtr(`{"gasPrice":`+string(*mtx.GasPrice)+`}`), nil) - if reason, err := p.submitTX(ctx, cAPI, mtx); err != nil { + tk.TXHistory.AddSubStatusAction(ctx, mtx, apitypes.TxActionRetrieveGasPrice, fftypes.JSONAnyPtr(`{"gasPrice":`+string(*mtx.GasPrice)+`}`), nil) + if reason, err := p.submitTX(ctx, tk, mtx); err != nil { if reason != ffcapi.ErrorKnownTransaction { return policyengine.UpdateYes, reason, err } } - mtx.AddSubStatus(ctx, apitypes.TxSubStatusTracking) + tk.TXHistory.SetSubStatus(ctx, mtx, apitypes.TxSubStatusTracking) return policyengine.UpdateYes, "", nil } return policyengine.UpdateNo, "", nil diff --git a/pkg/policyengines/simple/simple_policy_engine_test.go b/pkg/policyengines/simple/simple_policy_engine_test.go index b440486d..6e9c6c93 100644 --- a/pkg/policyengines/simple/simple_policy_engine_test.go +++ b/pkg/policyengines/simple/simple_policy_engine_test.go @@ -30,6 +30,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-transaction-manager/internal/tmconfig" "github.com/hyperledger/firefly-transaction-manager/mocks/ffcapimocks" + "github.com/hyperledger/firefly-transaction-manager/mocks/txhistorymocks" "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" "github.com/hyperledger/firefly-transaction-manager/pkg/policyengine" @@ -37,24 +38,34 @@ import ( "github.com/stretchr/testify/mock" ) -func newTestPolicyEngineFactory(t *testing.T) (*PolicyEngineFactory, config.Section) { +func newTestPolicyEngineFactory(t *testing.T) (*PolicyEngineFactory, *policyengine.ToolkitAPI, *ffcapimocks.API, *txhistorymocks.Manager, config.Section) { tmconfig.Reset() conf := config.RootSection("unittest.simple") f := &PolicyEngineFactory{} f.InitConfig(conf) assert.Equal(t, "simple", f.Name()) - return f, conf + + mockHistory := &txhistorymocks.Manager{} + mockHistory.On("SetSubStatus", mock.Anything, mock.Anything, mock.Anything).Maybe() + mockHistory.On("AddSubStatusAction", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() + + mockFFCAPI := &ffcapimocks.API{} + + return f, &policyengine.ToolkitAPI{ + Connector: mockFFCAPI, + TXHistory: mockHistory, + }, mockFFCAPI, mockHistory, conf } func TestMissingGasConfig(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, _, _, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeDisabled) _, err := f.NewPolicyEngine(context.Background(), conf) assert.Regexp(t, "FF21020", err) } func TestFixedGasPriceOK(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeDisabled) conf.Set(FixedGasPrice, `{ "maxPriorityFee":32.146027800733336, @@ -71,7 +82,6 @@ func TestFixedGasPriceOK(t *testing.T) { TransactionData: "SOME_RAW_TX_BYTES", } - mockFFCAPI := &ffcapimocks.API{} mockFFCAPI.On("TransactionSend", mock.Anything, mock.MatchedBy(func(req *ffcapi.TransactionSendRequest) bool { return req.GasPrice.JSONObject().GetString("maxPriorityFee") == "32.146027800733336" && req.GasPrice.JSONObject().GetString("maxFee") == "32.14602781673334" && @@ -82,7 +92,7 @@ func TestFixedGasPriceOK(t *testing.T) { }, ffcapi.ErrorReason(""), nil) ctx := context.Background() - updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) + updated, reason, err := p.Execute(ctx, tk, mtx) assert.NoError(t, err) assert.Equal(t, policyengine.UpdateYes, updated) assert.Empty(t, reason) @@ -120,7 +130,7 @@ func TestGasOracleSendOK(t *testing.T) { }`)) })) - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeRESTAPI) conf.SubSection(GasOracleConfig).Set(ffresty.HTTPConfigURL, fmt.Sprintf("http://%s", server.Listener.Addr())) conf.SubSection(GasOracleConfig).Set(GasOracleTemplate, `{ @@ -138,7 +148,6 @@ func TestGasOracleSendOK(t *testing.T) { TransactionData: "SOME_RAW_TX_BYTES", } - mockFFCAPI := &ffcapimocks.API{} mockFFCAPI.On("TransactionSend", mock.Anything, mock.MatchedBy(func(req *ffcapi.TransactionSendRequest) bool { return req.GasPrice.JSONObject().GetInteger("maxPriorityFeePerGas").Cmp(big.NewInt(32146027800)) == 0 && req.GasPrice.JSONObject().GetInteger("maxFeePerGas").Cmp(big.NewInt(32247127816)) == 0 && @@ -149,7 +158,7 @@ func TestGasOracleSendOK(t *testing.T) { }, ffcapi.ErrorReason(""), nil) ctx := context.Background() - updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) + updated, reason, err := p.Execute(ctx, tk, mtx) assert.NoError(t, err) assert.Empty(t, reason) assert.Equal(t, policyengine.UpdateYes, updated) @@ -171,7 +180,7 @@ func TestGasOracleSendOK(t *testing.T) { func TestConnectorGasOracleSendOK(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeConnector) p, err := f.NewPolicyEngine(context.Background(), conf) assert.NoError(t, err) @@ -184,7 +193,6 @@ func TestConnectorGasOracleSendOK(t *testing.T) { TransactionData: "SOME_RAW_TX_BYTES", } - mockFFCAPI := &ffcapimocks.API{} mockFFCAPI.On("GasPriceEstimate", mock.Anything, mock.Anything).Return(&ffcapi.GasPriceEstimateResponse{ GasPrice: fftypes.JSONAnyPtr(`"12345"`), }, ffcapi.ErrorReason(""), nil).Once() @@ -196,7 +204,7 @@ func TestConnectorGasOracleSendOK(t *testing.T) { }, ffcapi.ErrorReason(""), nil) ctx := context.Background() - updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) + updated, reason, err := p.Execute(ctx, tk, mtx) assert.NoError(t, err) assert.Empty(t, reason) assert.Equal(t, policyengine.UpdateYes, updated) @@ -214,7 +222,7 @@ func TestConnectorGasOracleSendOK(t *testing.T) { func TestConnectorGasOracleFail(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeConnector) p, err := f.NewPolicyEngine(context.Background(), conf) assert.NoError(t, err) @@ -227,13 +235,12 @@ func TestConnectorGasOracleFail(t *testing.T) { TransactionData: "SOME_RAW_TX_BYTES", } - mockFFCAPI := &ffcapimocks.API{} mockFFCAPI.On("GasPriceEstimate", mock.Anything, mock.Anything).Return(&ffcapi.GasPriceEstimateResponse{ GasPrice: fftypes.JSONAnyPtr(`"12345"`), }, ffcapi.ErrorReason(""), fmt.Errorf("pop")) ctx := context.Background() - _, reason, err := p.Execute(ctx, mockFFCAPI, mtx) + _, reason, err := p.Execute(ctx, tk, mtx) assert.Regexp(t, "pop", err) assert.Empty(t, reason) @@ -243,7 +250,7 @@ func TestConnectorGasOracleFail(t *testing.T) { func TestConnectorGasOracleFailStale(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeConnector) p, err := f.NewPolicyEngine(context.Background(), conf) assert.NoError(t, err) @@ -259,13 +266,12 @@ func TestConnectorGasOracleFailStale(t *testing.T) { LastSubmit: (*fftypes.FFTime)(&longAgo), } - mockFFCAPI := &ffcapimocks.API{} mockFFCAPI.On("GasPriceEstimate", mock.Anything, mock.Anything).Return(&ffcapi.GasPriceEstimateResponse{ GasPrice: fftypes.JSONAnyPtr(`"12345"`), }, ffcapi.ErrorReason(""), fmt.Errorf("pop")) ctx := context.Background() - _, reason, err := p.Execute(ctx, mockFFCAPI, mtx) + _, reason, err := p.Execute(ctx, tk, mtx) assert.Regexp(t, "pop", err) assert.Empty(t, reason) @@ -281,7 +287,7 @@ func TestGasOracleSendFail(t *testing.T) { })) defer server.Close() - f, conf := newTestPolicyEngineFactory(t) + f, tk, _, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeRESTAPI) conf.SubSection(GasOracleConfig).Set(GasOracleTemplate, "{{ . }}") conf.SubSection(GasOracleConfig).Set(ffresty.HTTPConfigURL, fmt.Sprintf("http://%s", server.Listener.Addr())) @@ -295,9 +301,8 @@ func TestGasOracleSendFail(t *testing.T) { TransactionData: "SOME_RAW_TX_BYTES", } - mockFFCAPI := &ffcapimocks.API{} ctx := context.Background() - _, _, err = p.Execute(ctx, mockFFCAPI, mtx) + _, _, err = p.Execute(ctx, tk, mtx) assert.Regexp(t, "FF21021", err) } @@ -307,7 +312,7 @@ func TestGasOracleMissingTemplate(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) server.Close() - f, conf := newTestPolicyEngineFactory(t) + f, _, _, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeRESTAPI) conf.SubSection(GasOracleConfig).Set(ffresty.HTTPConfigURL, fmt.Sprintf("http://%s", server.Listener.Addr())) _, err := f.NewPolicyEngine(context.Background(), conf) @@ -320,7 +325,7 @@ func TestGasOracleBadTemplate(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) server.Close() - f, conf := newTestPolicyEngineFactory(t) + f, _, _, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeRESTAPI) conf.SubSection(GasOracleConfig).Set(GasOracleTemplate, "{{ !!! wrong") conf.SubSection(GasOracleConfig).Set(ffresty.HTTPConfigURL, fmt.Sprintf("http://%s", server.Listener.Addr())) @@ -337,7 +342,7 @@ func TestGasOracleTemplateExecuteFail(t *testing.T) { })) defer server.Close() - f, conf := newTestPolicyEngineFactory(t) + f, tk, _, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeRESTAPI) conf.SubSection(GasOracleConfig).Set(GasOracleTemplate, "{{ .wrong.thing | len }}") conf.SubSection(GasOracleConfig).Set(ffresty.HTTPConfigURL, fmt.Sprintf("http://%s", server.Listener.Addr())) @@ -351,9 +356,8 @@ func TestGasOracleTemplateExecuteFail(t *testing.T) { TransactionData: "SOME_RAW_TX_BYTES", } - mockFFCAPI := &ffcapimocks.API{} ctx := context.Background() - _, _, err = p.Execute(ctx, mockFFCAPI, mtx) + _, _, err = p.Execute(ctx, tk, mtx) assert.Regexp(t, "FF21026", err) } @@ -363,7 +367,7 @@ func TestGasOracleNonJSON(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) server.Close() - f, conf := newTestPolicyEngineFactory(t) + f, tk, _, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeRESTAPI) conf.SubSection(GasOracleConfig).Set(GasOracleTemplate, "{{ . }}") conf.SubSection(GasOracleConfig).Set(ffresty.HTTPConfigURL, fmt.Sprintf("http://%s", server.Listener.Addr())) @@ -377,9 +381,8 @@ func TestGasOracleNonJSON(t *testing.T) { TransactionData: "SOME_RAW_TX_BYTES", } - mockFFCAPI := &ffcapimocks.API{} ctx := context.Background() - _, _, err = p.Execute(ctx, mockFFCAPI, mtx) + _, _, err = p.Execute(ctx, tk, mtx) assert.Regexp(t, "FF21021", err) } @@ -392,7 +395,7 @@ func TestTXSendFail(t *testing.T) { })) defer server.Close() - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeRESTAPI) conf.SubSection(GasOracleConfig).Set(GasOracleTemplate, "{{ . }}") conf.SubSection(GasOracleConfig).Set(ffresty.HTTPConfigURL, fmt.Sprintf("http://%s", server.Listener.Addr())) @@ -406,16 +409,15 @@ func TestTXSendFail(t *testing.T) { TransactionData: "SOME_RAW_TX_BYTES", } - mockFFCAPI := &ffcapimocks.API{} mockFFCAPI.On("TransactionSend", mock.Anything, mock.Anything).Return(nil, ffcapi.ErrorReasonInvalidInputs, fmt.Errorf("pop")) ctx := context.Background() - _, _, err = p.Execute(ctx, mockFFCAPI, mtx) + _, _, err = p.Execute(ctx, tk, mtx) assert.Regexp(t, "pop", err) } func TestWarnStaleWarningCannotParse(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.Set(FixedGasPrice, `12345`) p, err := f.NewPolicyEngine(context.Background(), conf) assert.NoError(t, err) @@ -430,7 +432,6 @@ func TestWarnStaleWarningCannotParse(t *testing.T) { }, } - mockFFCAPI := &ffcapimocks.API{} mockFFCAPI.On("GasPriceEstimate", mock.Anything, mock.Anything).Return(&ffcapi.GasPriceEstimateResponse{ GasPrice: fftypes.JSONAnyPtr(`"12345"`), }, ffcapi.ErrorReason(""), nil).Once() @@ -438,7 +439,7 @@ func TestWarnStaleWarningCannotParse(t *testing.T) { Return(nil, ffcapi.ErrorKnownTransaction, fmt.Errorf("Known transaction")) ctx := context.Background() - updated, _, err := p.Execute(ctx, mockFFCAPI, mtx) + updated, _, err := p.Execute(ctx, tk, mtx) assert.NoError(t, err) assert.Equal(t, policyengine.UpdateYes, updated) assert.NotEmpty(t, mtx.PolicyInfo.JSONObject().GetString("lastWarnTime")) @@ -447,7 +448,7 @@ func TestWarnStaleWarningCannotParse(t *testing.T) { } func TestKnownTransactionHashKnown(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.Set(FixedGasPrice, `12345`) conf.SubSection(GasOracleConfig).Set(GasOracleMode, GasOracleModeDisabled) p, err := f.NewPolicyEngine(context.Background(), conf) @@ -462,12 +463,11 @@ func TestKnownTransactionHashKnown(t *testing.T) { TransactionHash: "0x01020304", } - mockFFCAPI := &ffcapimocks.API{} mockFFCAPI.On("TransactionSend", mock.Anything, mock.Anything). Return(nil, ffcapi.ErrorKnownTransaction, fmt.Errorf("Known transaction")) ctx := context.Background() - updated, _, err := p.Execute(ctx, mockFFCAPI, mtx) + updated, _, err := p.Execute(ctx, tk, mtx) assert.NoError(t, err) assert.Equal(t, policyengine.UpdateYes, updated) @@ -475,7 +475,7 @@ func TestKnownTransactionHashKnown(t *testing.T) { } func TestWarnStaleAdditionalWarningResubmitFail(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.Set(FixedGasPrice, `12345`) p, err := f.NewPolicyEngine(context.Background(), conf) assert.NoError(t, err) @@ -491,7 +491,6 @@ func TestWarnStaleAdditionalWarningResubmitFail(t *testing.T) { PolicyInfo: fftypes.JSONAnyPtr(fmt.Sprintf(`{"lastWarnTime": "%s"}`, lastWarning.String())), } - mockFFCAPI := &ffcapimocks.API{} mockFFCAPI.On("GasPriceEstimate", mock.Anything, mock.Anything).Return(&ffcapi.GasPriceEstimateResponse{ GasPrice: fftypes.JSONAnyPtr(`"12345"`), }, ffcapi.ErrorReason(""), nil).Once() @@ -499,7 +498,7 @@ func TestWarnStaleAdditionalWarningResubmitFail(t *testing.T) { Return(nil, ffcapi.ErrorReason(""), fmt.Errorf("pop")) ctx := context.Background() - updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) + updated, reason, err := p.Execute(ctx, tk, mtx) assert.Regexp(t, "pop", err) assert.Empty(t, reason) assert.Equal(t, policyengine.UpdateYes, updated) @@ -509,7 +508,7 @@ func TestWarnStaleAdditionalWarningResubmitFail(t *testing.T) { } func TestWarnStaleNoWarning(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.Set(FixedGasPrice, `12345`) conf.Set(ResubmitInterval, "100s") p, err := f.NewPolicyEngine(context.Background(), conf) @@ -526,10 +525,8 @@ func TestWarnStaleNoWarning(t *testing.T) { PolicyInfo: fftypes.JSONAnyPtr(fmt.Sprintf(`{"lastWarnTime": "%s"}`, lastWarning.String())), } - mockFFCAPI := &ffcapimocks.API{} - ctx := context.Background() - updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) + updated, reason, err := p.Execute(ctx, tk, mtx) assert.Empty(t, reason) assert.NoError(t, err) assert.Equal(t, policyengine.UpdateNo, updated) @@ -538,7 +535,7 @@ func TestWarnStaleNoWarning(t *testing.T) { } func TestNoOpWithReceipt(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.Set(FixedGasPrice, `12345`) conf.Set(ResubmitInterval, "100s") p, err := f.NewPolicyEngine(context.Background(), conf) @@ -556,10 +553,8 @@ func TestNoOpWithReceipt(t *testing.T) { }, } - mockFFCAPI := &ffcapimocks.API{} - ctx := context.Background() - updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) + updated, reason, err := p.Execute(ctx, tk, mtx) assert.Empty(t, reason) assert.NoError(t, err) assert.Equal(t, policyengine.UpdateNo, updated) @@ -568,7 +563,7 @@ func TestNoOpWithReceipt(t *testing.T) { } func TestAllowsDeleteRequest(t *testing.T) { - f, conf := newTestPolicyEngineFactory(t) + f, tk, mockFFCAPI, _, conf := newTestPolicyEngineFactory(t) conf.Set(FixedGasPrice, `12345`) conf.Set(ResubmitInterval, "100s") p, err := f.NewPolicyEngine(context.Background(), conf) @@ -578,10 +573,8 @@ func TestAllowsDeleteRequest(t *testing.T) { DeleteRequested: fftypes.Now(), } - mockFFCAPI := &ffcapimocks.API{} - ctx := context.Background() - updated, reason, err := p.Execute(ctx, mockFFCAPI, mtx) + updated, reason, err := p.Execute(ctx, tk, mtx) assert.Empty(t, reason) assert.NoError(t, err) assert.Equal(t, policyengine.UpdateDelete, updated) diff --git a/pkg/txhistory/txhistory.go b/pkg/txhistory/txhistory.go new file mode 100644 index 00000000..010866e7 --- /dev/null +++ b/pkg/txhistory/txhistory.go @@ -0,0 +1,182 @@ +// Copyright © 2023 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txhistory + +import ( + "context" + "encoding/json" + + "github.com/hyperledger/firefly-common/pkg/config" + "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly-common/pkg/log" + "github.com/hyperledger/firefly-transaction-manager/internal/tmconfig" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" +) + +type Manager interface { + CurrentSubStatus(ctx context.Context, mtx *apitypes.ManagedTX) *apitypes.TxHistoryStateTransitionEntry + SetSubStatus(ctx context.Context, mtx *apitypes.ManagedTX, subStatus apitypes.TxSubStatus) + AddSubStatusAction(ctx context.Context, mtx *apitypes.ManagedTX, action apitypes.TxAction, info *fftypes.JSONAny, err *fftypes.JSONAny) +} + +type manager struct { + maxHistoryCount int +} + +func NewTxHistoryManager(ctx context.Context) Manager { + return &manager{ + maxHistoryCount: config.GetInt(tmconfig.TransactionsMaxHistoryCount), + } +} + +func (h *manager) CurrentSubStatus(ctx context.Context, mtx *apitypes.ManagedTX) *apitypes.TxHistoryStateTransitionEntry { + if len(mtx.History) > 0 { + return mtx.History[len(mtx.History)-1] + } + return nil +} + +// Transaction sub-status entries can be added for a given transaction so a caller +// can see discrete steps in a transaction moving to confirmation on the blockchain. +// For example a transaction might have a sub-status of "Stale" if a transaction has +// been in pending state for a given period of time. In order to progress the transaction +// while it's in a given sub-status, certain actions might be taken (such as retrieving +// the latest gas price for the chain). See AddSubStatusAction(). Since a transaction +// might go through many sub-status changes before being confirmed on chain the list of +// entries is capped at the configured number and FIFO approach used to keep within that cap. +func (h *manager) SetSubStatus(ctx context.Context, mtx *apitypes.ManagedTX, subStatus apitypes.TxSubStatus) { + // See if the status being transitioned to is the same as the current status. + // If so, there's nothing to do. + if len(mtx.History) > 0 { + if mtx.History[len(mtx.History)-1].Status == subStatus { + return + } + log.L(ctx).Debugf("State transition to sub-status %s", subStatus) + } + + // If this is a change in status add a new record + newStatus := &apitypes.TxHistoryStateTransitionEntry{ + Time: fftypes.Now(), + Status: subStatus, + Actions: make([]*apitypes.TxHistoryActionEntry, 0), + } + mtx.History = append(mtx.History, newStatus) + + if len(mtx.History) > h.maxHistoryCount { + // Need to trim the oldest record + mtx.History = mtx.History[1:] + } + + // As we have a possibly indefinite list of sub-status records (which might be a long list and early entries + // get purged at some point) we keep a separate list of all the discrete types of sub-status + // and action we've we've ever seen for this transaction along with a count of them. This means an early sub-status + // (e.g. "queued") followed by 100s of different sub-status types will still be recorded + for _, statusType := range mtx.HistorySummary { + if statusType.Status == subStatus { + // Just increment the counter and last timestamp + statusType.LastOccurrence = fftypes.Now() + statusType.Count++ + return + } + } + + mtx.HistorySummary = append(mtx.HistorySummary, &apitypes.TxHistorySummaryEntry{Status: subStatus, Count: 1, FirstOccurrence: fftypes.Now(), LastOccurrence: fftypes.Now()}) +} + +// Takes a string that might be valid JSON, and returns valid JSON that is either: +// a) The original JSON if it is valid +// b) An escaped string +func jsonOrString(value *fftypes.JSONAny) *fftypes.JSONAny { + if value == nil { + return nil + } + + if json.Valid([]byte(*value)) { + // Already valid + return value + } + + // Quote it as a string + b, _ := json.Marshal((string)(*value)) + return fftypes.JSONAnyPtrBytes(b) +} + +// When a transaction is in a given sub-status (e.g. "Stale") the blockchain connector +// may perform certain actions to move it out of the status. For example it might +// retrieve the current gas price for the chain. TxAction's represent an action taken +// while in a given sub-status. In order to limit the number of TxAction entries in +// a TxSubStatusEntry each action has a count of the number of occurrences and a +// latest timestamp to indicate when it was last executed. There is a last error field +// which can be used to indicate the most recent error that occurred, for example an +// HTTP 4xx return code from a gas oracle. There is also an information field to record +// arbitrary data about the action, for example the gas price retrieved from an oracle. +func (h *manager) AddSubStatusAction(ctx context.Context, mtx *apitypes.ManagedTX, action apitypes.TxAction, info *fftypes.JSONAny, err *fftypes.JSONAny) { + + // See if this action exists in the list already since we only want to update the single entry, not + // add a new one + currentSubStatus := mtx.History[len(mtx.History)-1] + for _, entry := range currentSubStatus.Actions { + if entry.Action == action { + entry.Count++ + entry.LastOccurrence = fftypes.Now() + + if err != nil { + entry.LastError = jsonOrString(err) + entry.LastErrorTime = fftypes.Now() + } + + if info != nil { + entry.LastInfo = jsonOrString(info) + } + + // As we have a possibly indefinite list of sub-status records (which might be a long list and early entries + // get purged at some point) we keep a separate list of all the discrete types of sub-status + // and action we've we've ever seen for this transaction along with a count of them. This means an early sub-status + // (e.g. "queued") with an action that never happens again (e.g. "assignNonce") followed by 100s of different sub-status + // types will still be recorded + for _, actionType := range mtx.HistorySummary { + if actionType.Action == action { + // Just increment the counter and last timestamp + actionType.LastOccurrence = fftypes.Now() + actionType.Count++ + break + } + } + return + } + } + + // If this is an entirely new status add it to the list + newAction := &apitypes.TxHistoryActionEntry{ + Time: fftypes.Now(), + Action: action, + LastOccurrence: fftypes.Now(), + Count: 1, + } + + if err != nil { + newAction.LastError = jsonOrString(err) + newAction.LastErrorTime = fftypes.Now() + } + + if info != nil { + newAction.LastInfo = jsonOrString(info) + } + + currentSubStatus.Actions = append(currentSubStatus.Actions, newAction) + mtx.HistorySummary = append(mtx.HistorySummary, &apitypes.TxHistorySummaryEntry{Action: action, Count: 1, FirstOccurrence: fftypes.Now(), LastOccurrence: fftypes.Now()}) +} diff --git a/pkg/apitypes/managed_tx_test.go b/pkg/txhistory/txhistory_test.go similarity index 50% rename from pkg/apitypes/managed_tx_test.go rename to pkg/txhistory/txhistory_test.go index 61c70571..042dba8d 100644 --- a/pkg/apitypes/managed_tx_test.go +++ b/pkg/txhistory/txhistory_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package apitypes +package txhistory import ( "context" @@ -23,161 +23,173 @@ import ( "testing" "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly-transaction-manager/internal/tmconfig" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/stretchr/testify/assert" ) -func TestManagedTXSubStatus(t *testing.T) { +func newTestTxHistoryManager(t *testing.T) (context.Context, *manager, func()) { + tmconfig.Reset() ctx, cancelCtx := context.WithCancel(context.Background()) - mtx := ManagedTX{} + h := NewTxHistoryManager(ctx).(*manager) + return ctx, h, cancelCtx +} + +func TestManagedTXSubStatus(t *testing.T) { + mtx := &apitypes.ManagedTX{} + ctx, h, done := newTestTxHistoryManager(t) + defer done() // No sub-status entries initially - assert.Nil(t, mtx.CurrentSubStatus(ctx)) + assert.Nil(t, h.CurrentSubStatus(ctx, mtx)) // Adding the same sub-status lots of times in succession should only result // in a single entry for that instance for i := 0; i < 100; i++ { - mtx.AddSubStatus(ctx, TxSubStatusReceived) + h.SetSubStatus(ctx, mtx, apitypes.TxSubStatusReceived) } assert.Equal(t, 1, len(mtx.History)) - assert.Equal(t, "Received", string(mtx.CurrentSubStatus(ctx).Status)) + assert.Equal(t, "Received", string(h.CurrentSubStatus(ctx, mtx).Status)) // Adding a different type of sub-status should result in // a new entry in the list - mtx.AddSubStatus(ctx, TxSubStatusTracking) + h.SetSubStatus(ctx, mtx, apitypes.TxSubStatusTracking) assert.Equal(t, 2, len(mtx.History)) // Even if many new types are added we shouldn't go over the // configured upper limit for i := 0; i < 100; i++ { - mtx.AddSubStatus(ctx, TxSubStatusStale) - mtx.AddSubStatus(ctx, TxSubStatusTracking) + h.SetSubStatus(ctx, mtx, apitypes.TxSubStatusStale) + h.SetSubStatus(ctx, mtx, apitypes.TxSubStatusTracking) } assert.Equal(t, 50, len(mtx.History)) - cancelCtx() } func TestManagedTXSubStatusRepeat(t *testing.T) { - ctx, cancelCtx := context.WithCancel(context.Background()) - mtx := ManagedTX{} + ctx, h, done := newTestTxHistoryManager(t) + defer done() + mtx := &apitypes.ManagedTX{} // Add a sub-status - mtx.AddSubStatus(ctx, TxSubStatusReceived) + h.SetSubStatus(ctx, mtx, apitypes.TxSubStatusReceived) assert.Equal(t, 1, len(mtx.History)) assert.Equal(t, 1, len(mtx.HistorySummary)) // Add another sub-status - mtx.AddSubStatus(ctx, TxSubStatusTracking) + h.SetSubStatus(ctx, mtx, apitypes.TxSubStatusTracking) assert.Equal(t, 2, len(mtx.History)) assert.Equal(t, 2, len(mtx.HistorySummary)) // Add another that we've seen before - mtx.AddSubStatus(ctx, TxSubStatusReceived) + h.SetSubStatus(ctx, mtx, apitypes.TxSubStatusReceived) assert.Equal(t, 3, len(mtx.History)) // This goes up assert.Equal(t, 2, len(mtx.HistorySummary)) // This doesn't - - cancelCtx() } func TestManagedTXSubStatusAction(t *testing.T) { - ctx, cancelCtx := context.WithCancel(context.Background()) - mtx := ManagedTX{} + ctx, h, done := newTestTxHistoryManager(t) + defer done() + mtx := &apitypes.ManagedTX{} // Add at least 1 sub-status - mtx.AddSubStatus(ctx, TxSubStatusReceived) + h.SetSubStatus(ctx, mtx, apitypes.TxSubStatusReceived) // Add an action - mtx.AddSubStatusAction(ctx, TxActionAssignNonce, nil, nil) + h.AddSubStatusAction(ctx, mtx, apitypes.TxActionAssignNonce, nil, nil) assert.Equal(t, 1, len(mtx.History[0].Actions)) assert.Nil(t, mtx.History[0].Actions[0].LastErrorTime) // Add an action - mtx.AddSubStatusAction(ctx, TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"gasError":"Acme Gas Oracle RC=12345"}`)) + h.AddSubStatusAction(ctx, mtx, apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"gasError":"Acme Gas Oracle RC=12345"}`)) assert.Equal(t, 2, len(mtx.History[0].Actions)) assert.Equal(t, (*mtx.History[0].Actions[1].LastError).String(), `{"gasError":"Acme Gas Oracle RC=12345"}`) // Add the same action which should cause the previous one to inc its counter - mtx.AddSubStatusAction(ctx, TxActionRetrieveGasPrice, fftypes.JSONAnyPtr(`{"info":"helloworld"}`), fftypes.JSONAnyPtr(`{"error":"nogood"}`)) + h.AddSubStatusAction(ctx, mtx, apitypes.TxActionRetrieveGasPrice, fftypes.JSONAnyPtr(`{"info":"helloworld"}`), fftypes.JSONAnyPtr(`{"error":"nogood"}`)) assert.Equal(t, 2, len(mtx.History[0].Actions)) - assert.Equal(t, mtx.History[0].Actions[1].Action, TxActionRetrieveGasPrice) + assert.Equal(t, mtx.History[0].Actions[1].Action, apitypes.TxActionRetrieveGasPrice) assert.Equal(t, 2, mtx.History[0].Actions[1].Count) // Add the same action but with new error information should update the last error field - mtx.AddSubStatusAction(ctx, TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"gasError":"Acme Gas Oracle RC=67890"}`)) + h.AddSubStatusAction(ctx, mtx, apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"gasError":"Acme Gas Oracle RC=67890"}`)) assert.Equal(t, 2, len(mtx.History[0].Actions)) assert.NotNil(t, mtx.History[0].Actions[1].LastErrorTime) assert.Equal(t, (*mtx.History[0].Actions[1].LastError).String(), `{"gasError":"Acme Gas Oracle RC=67890"}`) // Add a new type of action reason := "known_transaction" - mtx.AddSubStatusAction(ctx, TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+reason+`"}`), nil) + h.AddSubStatusAction(ctx, mtx, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+reason+`"}`), nil) assert.Equal(t, 3, len(mtx.History[0].Actions)) - assert.Equal(t, mtx.History[0].Actions[2].Action, TxActionSubmitTransaction) + assert.Equal(t, mtx.History[0].Actions[2].Action, apitypes.TxActionSubmitTransaction) assert.Equal(t, 1, mtx.History[0].Actions[2].Count) assert.Nil(t, mtx.History[0].Actions[2].LastErrorTime) // Add one more type of action receiptId := "123456" - mtx.AddSubStatusAction(ctx, TxActionReceiveReceipt, fftypes.JSONAnyPtr(`{"receiptId":"`+receiptId+`"}`), nil) + h.AddSubStatusAction(ctx, mtx, apitypes.TxActionReceiveReceipt, fftypes.JSONAnyPtr(`{"receiptId":"`+receiptId+`"}`), nil) assert.Equal(t, 4, len(mtx.History[0].Actions)) - assert.Equal(t, mtx.History[0].Actions[3].Action, TxActionReceiveReceipt) + assert.Equal(t, mtx.History[0].Actions[3].Action, apitypes.TxActionReceiveReceipt) assert.Equal(t, 1, mtx.History[0].Actions[3].Count) assert.Nil(t, mtx.History[0].Actions[3].LastErrorTime) - cancelCtx() + // History is the complete list of unique sub-status types and actions + assert.Equal(t, 5, len(mtx.HistorySummary)) + + // Sanity check the history summary entries + for _, historyEntry := range mtx.HistorySummary { + assert.NotNil(t, historyEntry.FirstOccurrence) + assert.NotNil(t, historyEntry.LastOccurrence) + assert.GreaterOrEqual(t, historyEntry.Count, 1) + + if historyEntry.Action == apitypes.TxActionRetrieveGasPrice { + // The first and last occurrence timestamps shoudn't be the same + assert.NotEqual(t, historyEntry.FirstOccurrence, historyEntry.LastOccurrence) + + // We should have a count of 3 for this action + assert.Equal(t, historyEntry.Count, 3) + } + } } func TestManagedTXSubStatusInvalidJSON(t *testing.T) { - ctx, cancelCtx := context.WithCancel(context.Background()) - mtx := ManagedTX{} + ctx, h, done := newTestTxHistoryManager(t) + defer done() + mtx := &apitypes.ManagedTX{} reason := "\"cannot-marshall\"" // Add a new type of action - mtx.AddSubStatus(ctx, TxSubStatusReceived) - mtx.AddSubStatusAction(ctx, TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+reason+`"}`), nil) - val, err := json.Marshal(mtx.History) + h.SetSubStatus(ctx, mtx, apitypes.TxSubStatusReceived) + h.AddSubStatusAction(ctx, mtx, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+reason+`"}`), nil) + val, err := json.Marshal(mtx.History[0].Actions[0].LastInfo) // It should never be possible to cause the sub-status history to become un-marshallable assert.NoError(t, err) - assert.Contains(t, string(val), "invalidJson") + assert.Contains(t, string(val), "cannot-marshall") - cancelCtx() } func TestManagedTXSubStatusMaxEntries(t *testing.T) { - ctx, cancelCtx := context.WithCancel(context.Background()) - mtx := ManagedTX{} - var nextSubStatus TxSubStatus + ctx, h, done := newTestTxHistoryManager(t) + defer done() + mtx := &apitypes.ManagedTX{} + var nextSubStatus apitypes.TxSubStatus // Create 100 unique sub-status strings. We should only keep the // first 50 for i := 0; i < 100; i++ { - nextSubStatus = TxSubStatus(fmt.Sprint(i)) - mtx.AddSubStatus(ctx, nextSubStatus) + nextSubStatus = apitypes.TxSubStatus(fmt.Sprint(i)) + h.SetSubStatus(ctx, mtx, nextSubStatus) } assert.Equal(t, 50, len(mtx.History)) - cancelCtx() } -func TestManagedTXSubStatusMaxActions(t *testing.T) { - ctx, cancelCtx := context.WithCancel(context.Background()) - mtx := ManagedTX{} - - // Add a sub-status - mtx.AddSubStatus(ctx, TxSubStatusReceived) - - // Add lots of unique sub-status actions which we cap at the configured amount - for i := 0; i < 100; i++ { - newAction := fmt.Sprintf("action-%d", i) - mtx.AddSubStatusAction(ctx, TxAction(newAction), nil, nil) - } - assert.Equal(t, 50, len(mtx.History[0].Actions)) - - cancelCtx() +func TestJSONOrStringNull(t *testing.T) { + assert.Nil(t, jsonOrString(nil)) }