Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FFTM new listener type for block notifications and receipt decoding functions #123

Merged
merged 15 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ linters-settings:
values:
regexp:
COMPANY: .*
YEAR_FUZZY: '\d\d\d\d(,\d\d\d\d)?'
template: |-
Copyright © {{ YEAR }} {{ COMPANY }}
Copyright © {{ YEAR_FUZZY }} {{ COMPANY }}

SPDX-License-Identifier: Apache-2.0

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
DROP INDEX IF EXISTS transactions_status;
BEGIN;
DROP INDEX IF EXISTS transactions_status;
COMMIT;
Comment on lines +1 to +3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for fixing thoses

Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
CREATE INDEX transactions_status ON transactions(status);
BEGIN;
CREATE INDEX transactions_status ON transactions(status);
COMMIT;
5 changes: 5 additions & 0 deletions db/migrations/postgres/000010_add_listeners_type.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;

ALTER TABLE listeners DROP COLUMN "type";

COMMIT;
7 changes: 7 additions & 0 deletions db/migrations/postgres/000010_add_listeners_type.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
BEGIN;

ALTER TABLE listeners ADD COLUMN "type" VARCHAR(64);
UPDATE listeners SET "type" = 'events';
ALTER TABLE listeners ALTER COLUMN "type" SET NOT NULL;

COMMIT;
41 changes: 38 additions & 3 deletions internal/confirmations/confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Manager interface {
Stop()
NewBlockHashes() chan<- *ffcapi.BlockHashEvent
CheckInFlight(listenerID *fftypes.UUID) bool
StartConfirmedBlockListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) error
StopConfirmedBlockListener(ctx context.Context, id *fftypes.UUID) error
}

type NotificationType string
Expand Down Expand Up @@ -99,6 +101,8 @@ type blockConfirmationManager struct {
pendingMux sync.Mutex
receiptChecker *receiptChecker
retry *retry.Retry
cblLock sync.Mutex
cbls map[fftypes.UUID]*confirmedBlockListener
fetchReceiptUponEntry bool
done chan struct{}
}
Expand All @@ -108,6 +112,7 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A
bcm := &blockConfirmationManager{
baseContext: baseContext,
connector: connector,
cbls: make(map[fftypes.UUID]*confirmedBlockListener),
blockListenerStale: true,
requiredConfirmations: config.GetInt(tmconfig.ConfirmationsRequired),
staleReceiptTimeout: config.GetDuration(tmconfig.ConfirmationsStaleReceiptTimeout),
Expand Down Expand Up @@ -233,6 +238,9 @@ func (bcm *blockConfirmationManager) Stop() {
// Reset context ready for restart
bcm.ctx, bcm.cancelFunc = context.WithCancel(bcm.baseContext)
}
for _, cbl := range bcm.copyCBLsList() {
_ = bcm.StopConfirmedBlockListener(bcm.ctx, cbl.id)
}
}

func (bcm *blockConfirmationManager) NewBlockHashes() chan<- *ffcapi.BlockHashEvent {
Expand Down Expand Up @@ -301,9 +309,10 @@ func (bcm *blockConfirmationManager) getBlockByHash(blockHash string) (*apitypes
return blockInfo, nil
}

func (bcm *blockConfirmationManager) getBlockByNumber(blockNumber uint64, expectedParentHash string) (*apitypes.BlockInfo, error) {
func (bcm *blockConfirmationManager) getBlockByNumber(blockNumber uint64, allowCache bool, expectedParentHash string) (*apitypes.BlockInfo, error) {
res, reason, err := bcm.connector.BlockInfoByNumber(bcm.ctx, &ffcapi.BlockInfoByNumberRequest{
BlockNumber: fftypes.NewFFBigInt(int64(blockNumber)),
AllowCache: allowCache,
ExpectedParentHash: expectedParentHash,
})
if err != nil {
Expand All @@ -326,6 +335,27 @@ func transformBlockInfo(res *ffcapi.BlockInfo) *apitypes.BlockInfo {
}
}

func (bcm *blockConfirmationManager) copyCBLsList() []*confirmedBlockListener {
bcm.cblLock.Lock()
defer bcm.cblLock.Unlock()
cbls := make([]*confirmedBlockListener, 0, len(bcm.cbls))
for _, cbl := range bcm.cbls {
cbls = append(cbls, cbl)
}
return cbls
}

func (bcm *blockConfirmationManager) propagateBlockHashToCBLs(bhe *ffcapi.BlockHashEvent) {
bcm.cblLock.Lock()
defer bcm.cblLock.Unlock()
for _, cbl := range bcm.cbls {
select {
case cbl.newBlockHashes <- bhe:
case <-cbl.processorDone:
}
}
}

func (bcm *blockConfirmationManager) confirmationsListener() {
defer close(bcm.done)
notifications := make([]*Notification, 0)
Expand All @@ -340,6 +370,11 @@ func (bcm *blockConfirmationManager) confirmationsListener() {
}
blockHashes = append(blockHashes, bhe.BlockHashes...)

// Need to also pass this event to any confirmed block listeners
// (they promise to always be efficient in handling these, having a go-routine
// dedicated to spinning fast just processing those separate to dispatching them)
bcm.propagateBlockHashToCBLs(bhe)

if bhe.Created != nil {
for i := 0; i < len(bhe.BlockHashes); i++ {
bcm.metricsEmitter.RecordBlockHashQueueingMetrics(bcm.ctx, time.Since(*bhe.Created.Time()).Seconds())
Expand Down Expand Up @@ -371,7 +406,7 @@ func (bcm *blockConfirmationManager) confirmationsListener() {

if bcm.blockListenerStale {
if err := bcm.walkChain(blocks); err != nil {
log.L(bcm.ctx).Errorf("Failed to create walk chain after restoring blockListener: %s", err)
log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err)
continue
}
bcm.blockListenerStale = false
Expand Down Expand Up @@ -704,7 +739,7 @@ func (bs *blockState) getByNumber(blockNumber uint64, expectedParentHash string)
if block != nil {
return block, nil
}
block, err := bs.bcm.getBlockByNumber(blockNumber, expectedParentHash)
block, err := bs.bcm.getBlockByNumber(blockNumber, true, expectedParentHash)
if err != nil {
return nil, err
}
Expand Down
Loading