diff --git a/internal/persistence/persistence.go b/internal/persistence/persistence.go index bc81fac9..3324b2a5 100644 --- a/internal/persistence/persistence.go +++ b/internal/persistence/persistence.go @@ -145,6 +145,12 @@ type RichQuery interface { ListTransactionConfirmations(ctx context.Context, txID string, filter ffapi.AndFilter) ([]*apitypes.ConfirmationRecord, *ffapi.FilterResult, error) ListTransactionHistory(ctx context.Context, txID string, filter ffapi.AndFilter) ([]*apitypes.TXHistoryRecord, *ffapi.FilterResult, error) ListStreamListeners(ctx context.Context, streamID *fftypes.UUID, filter ffapi.AndFilter) ([]*apitypes.Listener, *ffapi.FilterResult, error) + + NewStreamFilter(ctx context.Context) ffapi.FilterBuilder + NewListenerFilter(ctx context.Context) ffapi.FilterBuilder + NewTransactionFilter(ctx context.Context) ffapi.FilterBuilder + NewConfirmationFilter(ctx context.Context) ffapi.FilterBuilder + NewTxHistoryFilter(ctx context.Context) ffapi.FilterBuilder } type CheckpointPersistence interface { diff --git a/internal/persistence/postgres/confirmations.go b/internal/persistence/postgres/confirmations.go index 72669d27..48894402 100644 --- a/internal/persistence/postgres/confirmations.go +++ b/internal/persistence/postgres/confirmations.go @@ -77,6 +77,14 @@ func (p *sqlPersistence) newConfirmationsCollection() *dbsql.CrudBase[*apitypes. return collection } +func (p *sqlPersistence) NewConfirmationFilter(ctx context.Context) ffapi.FilterBuilder { + return persistence.ConfirmationFilters.NewFilter(ctx) +} + +func (p *sqlPersistence) ListTransactionConfirmations(ctx context.Context, txID string, filter ffapi.AndFilter) ([]*apitypes.ConfirmationRecord, *ffapi.FilterResult, error) { + return p.confirmations.GetMany(ctx, filter.Condition(filter.Builder().Eq("transaction", txID))) +} + func (p *sqlPersistence) GetTransactionConfirmations(ctx context.Context, txID string) ([]*apitypes.Confirmation, error) { // We query in increasing insertion order filter := persistence.ConfirmationFilters.NewFilter(ctx).Eq("transaction", txID).Sort("sequence").Ascending() @@ -110,7 +118,3 @@ func (p *sqlPersistence) AddTransactionConfirmations(ctx context.Context, txID s } return nil // we do this async for performance } - -func (p *sqlPersistence) ListTransactionConfirmations(ctx context.Context, txID string, filter ffapi.AndFilter) ([]*apitypes.ConfirmationRecord, *ffapi.FilterResult, error) { - return p.confirmations.GetMany(ctx, filter.Condition(filter.Builder().Eq("transaction", txID))) -} diff --git a/internal/persistence/postgres/confirmations_test.go b/internal/persistence/postgres/confirmations_test.go index 03775886..60cdacb8 100644 --- a/internal/persistence/postgres/confirmations_test.go +++ b/internal/persistence/postgres/confirmations_test.go @@ -23,7 +23,6 @@ import ( "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/fftypes" - "github.com/hyperledger/firefly-transaction-manager/internal/persistence" "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" "github.com/sirupsen/logrus" @@ -162,7 +161,7 @@ func TestTransactionConfirmationsOrderPSQL(t *testing.T) { }, confirmations) // Filter just one - fb := persistence.ConfirmationFilters.NewFilter(ctx) + fb := p.NewConfirmationFilter(ctx) crs, _, err := p.ListTransactionConfirmations(ctx, tx2ID, fb.And(fb.Eq("blocknumber", 1))) assert.Len(t, crs, 1) assert.Equal(t, crs[0].BlockNumber.Uint64(), uint64(1)) diff --git a/internal/persistence/postgres/eventstreams.go b/internal/persistence/postgres/eventstreams.go index 542df9aa..f37b7177 100644 --- a/internal/persistence/postgres/eventstreams.go +++ b/internal/persistence/postgres/eventstreams.go @@ -94,6 +94,10 @@ func (p *sqlPersistence) newEventStreamsCollection() *dbsql.CrudBase[*apitypes.E return collection } +func (p *sqlPersistence) NewStreamFilter(ctx context.Context) ffapi.FilterBuilder { + return persistence.EventStreamFilters.NewFilter(ctx) +} + func (p *sqlPersistence) ListStreams(ctx context.Context, filter ffapi.AndFilter) ([]*apitypes.EventStream, *ffapi.FilterResult, error) { return p.eventStreams.GetMany(ctx, filter) } diff --git a/internal/persistence/postgres/eventstreams_test.go b/internal/persistence/postgres/eventstreams_test.go index b03a71c9..d98c6425 100644 --- a/internal/persistence/postgres/eventstreams_test.go +++ b/internal/persistence/postgres/eventstreams_test.go @@ -157,7 +157,7 @@ func TestEventStreamAfterPaginatePSQL(t *testing.T) { assert.Regexp(t, "FF00164", err) // Find just one - fb := persistence.EventStreamFilters.NewFilter(ctx) + fb := p.NewStreamFilter(ctx) list4, _, err := p.ListStreams(ctx, fb.And(fb.Eq("name", *eventStreams[15].Name))) assert.NoError(t, err) assert.Len(t, list4, 1) diff --git a/internal/persistence/postgres/listeners.go b/internal/persistence/postgres/listeners.go index de1ef605..307fba85 100644 --- a/internal/persistence/postgres/listeners.go +++ b/internal/persistence/postgres/listeners.go @@ -76,6 +76,10 @@ func (p *sqlPersistence) newListenersCollection() *dbsql.CrudBase[*apitypes.List return collection } +func (p *sqlPersistence) NewListenerFilter(ctx context.Context) ffapi.FilterBuilder { + return persistence.ListenerFilters.NewFilter(ctx) +} + func (p *sqlPersistence) ListListeners(ctx context.Context, filter ffapi.AndFilter) ([]*apitypes.Listener, *ffapi.FilterResult, error) { return p.listeners.GetMany(ctx, filter) } diff --git a/internal/persistence/postgres/listeners_test.go b/internal/persistence/postgres/listeners_test.go index a8646db5..22078fd2 100644 --- a/internal/persistence/postgres/listeners_test.go +++ b/internal/persistence/postgres/listeners_test.go @@ -159,14 +159,14 @@ func TestListenerAfterPaginatePSQL(t *testing.T) { assert.Regexp(t, "FF00164", err) // Find just one - fb := persistence.ListenerFilters.NewFilter(ctx) + fb := p.NewListenerFilter(ctx) list4, _, err := p.ListListeners(ctx, fb.And(fb.Eq("name", *listeners[15].Name))) assert.NoError(t, err) assert.Len(t, list4, 1) assert.Equal(t, *listeners[15].Name, *list4[0].Name) // Check search is scoped - fb = persistence.ListenerFilters.NewFilter(ctx) + fb = p.NewListenerFilter(ctx) list5, _, err := p.ListStreamListeners(ctx, stream1, fb.And(fb.Eq("name", *listeners[15].Name))) assert.NoError(t, err) assert.Empty(t, list5) diff --git a/internal/persistence/postgres/transactions.go b/internal/persistence/postgres/transactions.go index 4a3ee8ce..82fb42b3 100644 --- a/internal/persistence/postgres/transactions.go +++ b/internal/persistence/postgres/transactions.go @@ -114,6 +114,10 @@ func (p *sqlPersistence) newTransactionCollection() *dbsql.CrudBase[*apitypes.Ma return collection } +func (p *sqlPersistence) NewTransactionFilter(ctx context.Context) ffapi.FilterBuilder { + return persistence.TransactionFilters.NewFilter(ctx) +} + func (p *sqlPersistence) ListTransactions(ctx context.Context, filter ffapi.AndFilter) ([]*apitypes.ManagedTX, *ffapi.FilterResult, error) { return p.transactions.GetMany(ctx, filter) } diff --git a/internal/persistence/postgres/transactions_test.go b/internal/persistence/postgres/transactions_test.go index 07a63c1e..6c1c44f3 100644 --- a/internal/persistence/postgres/transactions_test.go +++ b/internal/persistence/postgres/transactions_test.go @@ -257,8 +257,10 @@ func TestTransactionListByCreateTimePSQL(t *testing.T) { txs = append(txs, tx) } + fb := p.NewTransactionFilter(ctx) + // List all the transactions - default is created time descending on the standard filter - list1, _, err := p.ListTransactions(ctx, persistence.TransactionFilters.NewFilter(ctx).And()) + list1, _, err := p.ListTransactions(ctx, fb.And()) assert.NoError(t, err) assert.Len(t, list1, 10) for i := 0; i < 10; i++ { diff --git a/internal/persistence/postgres/txhistory.go b/internal/persistence/postgres/txhistory.go index e2163d57..e791b747 100644 --- a/internal/persistence/postgres/txhistory.go +++ b/internal/persistence/postgres/txhistory.go @@ -87,6 +87,10 @@ func (p *sqlPersistence) newTXHistoryCollection() *dbsql.CrudBase[*apitypes.TXHi return collection } +func (p *sqlPersistence) NewTxHistoryFilter(ctx context.Context) ffapi.FilterBuilder { + return persistence.TXHistoryFilters.NewFilter(ctx) +} + func (p *sqlPersistence) ListTransactionHistory(ctx context.Context, txID string, filter ffapi.AndFilter) ([]*apitypes.TXHistoryRecord, *ffapi.FilterResult, error) { return p.txHistory.GetMany(ctx, filter.Condition(filter.Builder().Eq("transaction", txID))) } diff --git a/internal/persistence/postgres/txhistory_test.go b/internal/persistence/postgres/txhistory_test.go index 125ef59d..77cc7cc4 100644 --- a/internal/persistence/postgres/txhistory_test.go +++ b/internal/persistence/postgres/txhistory_test.go @@ -25,7 +25,6 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/fftypes" - "github.com/hyperledger/firefly-transaction-manager/internal/persistence" "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" "github.com/sirupsen/logrus" @@ -106,7 +105,8 @@ func TestTXHistoryCompressionPSQL(t *testing.T) { assert.NoError(t, err) // Get the history - history, _, err := p.ListTransactionHistory(ctx, txID, persistence.TXHistoryFilters.NewFilter(ctx).And()) + fb := p.NewTxHistoryFilter(ctx) + history, _, err := p.ListTransactionHistory(ctx, txID, fb.And()) assert.NoError(t, err) // Time strip the history for compare for _, h := range history { diff --git a/mocks/persistencemocks/rich_query.go b/mocks/persistencemocks/rich_query.go index eabe6e03..ab233956 100644 --- a/mocks/persistencemocks/rich_query.go +++ b/mocks/persistencemocks/rich_query.go @@ -229,6 +229,86 @@ func (_m *RichQuery) ListTransactions(ctx context.Context, filter ffapi.AndFilte return r0, r1, r2 } +// NewConfirmationFilter provides a mock function with given fields: ctx +func (_m *RichQuery) NewConfirmationFilter(ctx context.Context) ffapi.FilterBuilder { + ret := _m.Called(ctx) + + var r0 ffapi.FilterBuilder + if rf, ok := ret.Get(0).(func(context.Context) ffapi.FilterBuilder); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ffapi.FilterBuilder) + } + } + + return r0 +} + +// NewListenerFilter provides a mock function with given fields: ctx +func (_m *RichQuery) NewListenerFilter(ctx context.Context) ffapi.FilterBuilder { + ret := _m.Called(ctx) + + var r0 ffapi.FilterBuilder + if rf, ok := ret.Get(0).(func(context.Context) ffapi.FilterBuilder); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ffapi.FilterBuilder) + } + } + + return r0 +} + +// NewStreamFilter provides a mock function with given fields: ctx +func (_m *RichQuery) NewStreamFilter(ctx context.Context) ffapi.FilterBuilder { + ret := _m.Called(ctx) + + var r0 ffapi.FilterBuilder + if rf, ok := ret.Get(0).(func(context.Context) ffapi.FilterBuilder); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ffapi.FilterBuilder) + } + } + + return r0 +} + +// NewTransactionFilter provides a mock function with given fields: ctx +func (_m *RichQuery) NewTransactionFilter(ctx context.Context) ffapi.FilterBuilder { + ret := _m.Called(ctx) + + var r0 ffapi.FilterBuilder + if rf, ok := ret.Get(0).(func(context.Context) ffapi.FilterBuilder); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ffapi.FilterBuilder) + } + } + + return r0 +} + +// NewTxHistoryFilter provides a mock function with given fields: ctx +func (_m *RichQuery) NewTxHistoryFilter(ctx context.Context) ffapi.FilterBuilder { + ret := _m.Called(ctx) + + var r0 ffapi.FilterBuilder + if rf, ok := ret.Get(0).(func(context.Context) ffapi.FilterBuilder); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(ffapi.FilterBuilder) + } + } + + return r0 +} + type mockConstructorTestingTNewRichQuery interface { mock.TestingT Cleanup(func()) diff --git a/pkg/fftm/transaction_events_handler.go b/pkg/fftm/transaction_events_handler.go index 899bfcb6..70bc0592 100644 --- a/pkg/fftm/transaction_events_handler.go +++ b/pkg/fftm/transaction_events_handler.go @@ -53,17 +53,18 @@ func (eh *ManagedTransactionEventHandler) HandleEvent(_ context.Context, e apity case apitypes.ManagedTXDeleted: eh.sendWSReply(e.Tx, nil /* receipt never sent with delete */) case apitypes.ManagedTXTransactionHashAdded: + txID := e.Tx.ID return eh.ConfirmationManager.Notify(&confirmations.Notification{ NotificationType: confirmations.NewTransaction, Transaction: &confirmations.TransactionInfo{ TransactionHash: e.Tx.TransactionHash, Receipt: func(ctx context.Context, receipt *ffcapi.TransactionReceiptResponse) { - if err := eh.TxHandler.HandleTransactionReceiptReceived(ctx, e.Tx.ID, receipt); err != nil { + if err := eh.TxHandler.HandleTransactionReceiptReceived(ctx, txID, receipt); err != nil { log.L(ctx).Errorf("Receipt for transaction %s at nonce %s / %d - hash: %s was not handled due to %s", e.Tx.ID, e.Tx.TransactionHeaders.From, e.Tx.Nonce.Int64(), e.Tx.TransactionHash, err.Error()) } }, Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) { - if err := eh.TxHandler.HandleTransactionConfirmations(ctx, e.Tx.ID, notification); err != nil { + if err := eh.TxHandler.HandleTransactionConfirmations(ctx, txID, notification); err != nil { log.L(ctx).Errorf("Confirmation for transaction %s at nonce %s / %d - hash: %s was not handled due to %s", e.Tx.ID, e.Tx.TransactionHeaders.From, e.Tx.Nonce.Int64(), e.Tx.TransactionHash, err.Error()) } },