Skip to content

Commit

Permalink
Merge pull request #91 from kaleido-io/expose-filters
Browse files Browse the repository at this point in the history
Expose filters for rich query
  • Loading branch information
peterbroadhurst authored Jul 3, 2023
2 parents 7b4c815 + 2175fff commit e6fe782
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 14 deletions.
6 changes: 6 additions & 0 deletions internal/persistence/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ type RichQuery interface {
ListTransactionConfirmations(ctx context.Context, txID string, filter ffapi.AndFilter) ([]*apitypes.ConfirmationRecord, *ffapi.FilterResult, error)
ListTransactionHistory(ctx context.Context, txID string, filter ffapi.AndFilter) ([]*apitypes.TXHistoryRecord, *ffapi.FilterResult, error)
ListStreamListeners(ctx context.Context, streamID *fftypes.UUID, filter ffapi.AndFilter) ([]*apitypes.Listener, *ffapi.FilterResult, error)

NewStreamFilter(ctx context.Context) ffapi.FilterBuilder
NewListenerFilter(ctx context.Context) ffapi.FilterBuilder
NewTransactionFilter(ctx context.Context) ffapi.FilterBuilder
NewConfirmationFilter(ctx context.Context) ffapi.FilterBuilder
NewTxHistoryFilter(ctx context.Context) ffapi.FilterBuilder
}

type CheckpointPersistence interface {
Expand Down
12 changes: 8 additions & 4 deletions internal/persistence/postgres/confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func (p *sqlPersistence) newConfirmationsCollection() *dbsql.CrudBase[*apitypes.
return collection
}

func (p *sqlPersistence) NewConfirmationFilter(ctx context.Context) ffapi.FilterBuilder {
return persistence.ConfirmationFilters.NewFilter(ctx)
}

func (p *sqlPersistence) ListTransactionConfirmations(ctx context.Context, txID string, filter ffapi.AndFilter) ([]*apitypes.ConfirmationRecord, *ffapi.FilterResult, error) {
return p.confirmations.GetMany(ctx, filter.Condition(filter.Builder().Eq("transaction", txID)))
}

func (p *sqlPersistence) GetTransactionConfirmations(ctx context.Context, txID string) ([]*apitypes.Confirmation, error) {
// We query in increasing insertion order
filter := persistence.ConfirmationFilters.NewFilter(ctx).Eq("transaction", txID).Sort("sequence").Ascending()
Expand Down Expand Up @@ -110,7 +118,3 @@ func (p *sqlPersistence) AddTransactionConfirmations(ctx context.Context, txID s
}
return nil // we do this async for performance
}

func (p *sqlPersistence) ListTransactionConfirmations(ctx context.Context, txID string, filter ffapi.AndFilter) ([]*apitypes.ConfirmationRecord, *ffapi.FilterResult, error) {
return p.confirmations.GetMany(ctx, filter.Condition(filter.Builder().Eq("transaction", txID)))
}
3 changes: 1 addition & 2 deletions internal/persistence/postgres/confirmations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-transaction-manager/internal/persistence"
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -162,7 +161,7 @@ func TestTransactionConfirmationsOrderPSQL(t *testing.T) {
}, confirmations)

// Filter just one
fb := persistence.ConfirmationFilters.NewFilter(ctx)
fb := p.NewConfirmationFilter(ctx)
crs, _, err := p.ListTransactionConfirmations(ctx, tx2ID, fb.And(fb.Eq("blocknumber", 1)))
assert.Len(t, crs, 1)
assert.Equal(t, crs[0].BlockNumber.Uint64(), uint64(1))
Expand Down
4 changes: 4 additions & 0 deletions internal/persistence/postgres/eventstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func (p *sqlPersistence) newEventStreamsCollection() *dbsql.CrudBase[*apitypes.E
return collection
}

func (p *sqlPersistence) NewStreamFilter(ctx context.Context) ffapi.FilterBuilder {
return persistence.EventStreamFilters.NewFilter(ctx)
}

func (p *sqlPersistence) ListStreams(ctx context.Context, filter ffapi.AndFilter) ([]*apitypes.EventStream, *ffapi.FilterResult, error) {
return p.eventStreams.GetMany(ctx, filter)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/persistence/postgres/eventstreams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestEventStreamAfterPaginatePSQL(t *testing.T) {
assert.Regexp(t, "FF00164", err)

// Find just one
fb := persistence.EventStreamFilters.NewFilter(ctx)
fb := p.NewStreamFilter(ctx)
list4, _, err := p.ListStreams(ctx, fb.And(fb.Eq("name", *eventStreams[15].Name)))
assert.NoError(t, err)
assert.Len(t, list4, 1)
Expand Down
4 changes: 4 additions & 0 deletions internal/persistence/postgres/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (p *sqlPersistence) newListenersCollection() *dbsql.CrudBase[*apitypes.List
return collection
}

func (p *sqlPersistence) NewListenerFilter(ctx context.Context) ffapi.FilterBuilder {
return persistence.ListenerFilters.NewFilter(ctx)
}

func (p *sqlPersistence) ListListeners(ctx context.Context, filter ffapi.AndFilter) ([]*apitypes.Listener, *ffapi.FilterResult, error) {
return p.listeners.GetMany(ctx, filter)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/persistence/postgres/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ func TestListenerAfterPaginatePSQL(t *testing.T) {
assert.Regexp(t, "FF00164", err)

// Find just one
fb := persistence.ListenerFilters.NewFilter(ctx)
fb := p.NewListenerFilter(ctx)
list4, _, err := p.ListListeners(ctx, fb.And(fb.Eq("name", *listeners[15].Name)))
assert.NoError(t, err)
assert.Len(t, list4, 1)
assert.Equal(t, *listeners[15].Name, *list4[0].Name)

// Check search is scoped
fb = persistence.ListenerFilters.NewFilter(ctx)
fb = p.NewListenerFilter(ctx)
list5, _, err := p.ListStreamListeners(ctx, stream1, fb.And(fb.Eq("name", *listeners[15].Name)))
assert.NoError(t, err)
assert.Empty(t, list5)
Expand Down
4 changes: 4 additions & 0 deletions internal/persistence/postgres/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (p *sqlPersistence) newTransactionCollection() *dbsql.CrudBase[*apitypes.Ma
return collection
}

func (p *sqlPersistence) NewTransactionFilter(ctx context.Context) ffapi.FilterBuilder {
return persistence.TransactionFilters.NewFilter(ctx)
}

func (p *sqlPersistence) ListTransactions(ctx context.Context, filter ffapi.AndFilter) ([]*apitypes.ManagedTX, *ffapi.FilterResult, error) {
return p.transactions.GetMany(ctx, filter)
}
Expand Down
4 changes: 3 additions & 1 deletion internal/persistence/postgres/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,10 @@ func TestTransactionListByCreateTimePSQL(t *testing.T) {
txs = append(txs, tx)
}

fb := p.NewTransactionFilter(ctx)

// List all the transactions - default is created time descending on the standard filter
list1, _, err := p.ListTransactions(ctx, persistence.TransactionFilters.NewFilter(ctx).And())
list1, _, err := p.ListTransactions(ctx, fb.And())
assert.NoError(t, err)
assert.Len(t, list1, 10)
for i := 0; i < 10; i++ {
Expand Down
4 changes: 4 additions & 0 deletions internal/persistence/postgres/txhistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (p *sqlPersistence) newTXHistoryCollection() *dbsql.CrudBase[*apitypes.TXHi
return collection
}

func (p *sqlPersistence) NewTxHistoryFilter(ctx context.Context) ffapi.FilterBuilder {
return persistence.TXHistoryFilters.NewFilter(ctx)
}

func (p *sqlPersistence) ListTransactionHistory(ctx context.Context, txID string, filter ffapi.AndFilter) ([]*apitypes.TXHistoryRecord, *ffapi.FilterResult, error) {
return p.txHistory.GetMany(ctx, filter.Condition(filter.Builder().Eq("transaction", txID)))
}
Expand Down
4 changes: 2 additions & 2 deletions internal/persistence/postgres/txhistory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-transaction-manager/internal/persistence"
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -106,7 +105,8 @@ func TestTXHistoryCompressionPSQL(t *testing.T) {
assert.NoError(t, err)

// Get the history
history, _, err := p.ListTransactionHistory(ctx, txID, persistence.TXHistoryFilters.NewFilter(ctx).And())
fb := p.NewTxHistoryFilter(ctx)
history, _, err := p.ListTransactionHistory(ctx, txID, fb.And())
assert.NoError(t, err)
// Time strip the history for compare
for _, h := range history {
Expand Down
80 changes: 80 additions & 0 deletions mocks/persistencemocks/rich_query.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/fftm/transaction_events_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,18 @@ func (eh *ManagedTransactionEventHandler) HandleEvent(_ context.Context, e apity
case apitypes.ManagedTXDeleted:
eh.sendWSReply(e.Tx, nil /* receipt never sent with delete */)
case apitypes.ManagedTXTransactionHashAdded:
txID := e.Tx.ID
return eh.ConfirmationManager.Notify(&confirmations.Notification{
NotificationType: confirmations.NewTransaction,
Transaction: &confirmations.TransactionInfo{
TransactionHash: e.Tx.TransactionHash,
Receipt: func(ctx context.Context, receipt *ffcapi.TransactionReceiptResponse) {
if err := eh.TxHandler.HandleTransactionReceiptReceived(ctx, e.Tx.ID, receipt); err != nil {
if err := eh.TxHandler.HandleTransactionReceiptReceived(ctx, txID, receipt); err != nil {
log.L(ctx).Errorf("Receipt for transaction %s at nonce %s / %d - hash: %s was not handled due to %s", e.Tx.ID, e.Tx.TransactionHeaders.From, e.Tx.Nonce.Int64(), e.Tx.TransactionHash, err.Error())
}
},
Confirmations: func(ctx context.Context, notification *apitypes.ConfirmationsNotification) {
if err := eh.TxHandler.HandleTransactionConfirmations(ctx, e.Tx.ID, notification); err != nil {
if err := eh.TxHandler.HandleTransactionConfirmations(ctx, txID, notification); err != nil {
log.L(ctx).Errorf("Confirmation for transaction %s at nonce %s / %d - hash: %s was not handled due to %s", e.Tx.ID, e.Tx.TransactionHeaders.From, e.Tx.Nonce.Int64(), e.Tx.TransactionHash, err.Error())
}
},
Expand Down

0 comments on commit e6fe782

Please sign in to comment.