From 6e9b7c5d70bd89b539b87fe769101621e1fcf526 Mon Sep 17 00:00:00 2001 From: Aurora Gaffney Date: Mon, 3 Jun 2024 10:07:51 -0500 Subject: [PATCH] feat: event bus (#53) --- chainsync/chainsync.go | 51 ++++++++++++++++-- event/event.go | 113 ++++++++++++++++++++++++++++++++++++++++ event/event_test.go | 115 +++++++++++++++++++++++++++++++++++++++++ mempool/mempool.go | 43 ++++++++++++++- node.go | 8 ++- 5 files changed, 323 insertions(+), 7 deletions(-) create mode 100644 event/event.go create mode 100644 event/event_test.go diff --git a/chainsync/chainsync.go b/chainsync/chainsync.go index 16c4305..2149097 100644 --- a/chainsync/chainsync.go +++ b/chainsync/chainsync.go @@ -19,6 +19,8 @@ import ( "fmt" "sync" + "github.com/blinklabs-io/node/event" + ouroboros "github.com/blinklabs-io/gouroboros" "github.com/blinklabs-io/gouroboros/connection" ochainsync "github.com/blinklabs-io/gouroboros/protocol/chainsync" @@ -42,6 +44,19 @@ var ( }) ) +const ( + ChainsyncEventType event.EventType = "chainsync.event" +) + +// ChainsyncEvent represents either a RollForward or RollBackward chainsync event. +// We use a single event type for both to make synchronization easier. +type ChainsyncEvent struct { + Point ChainsyncPoint + Cbor []byte + Type uint + Rollback bool +} + type ChainsyncPoint struct { SlotNumber uint64 BlockHash string @@ -91,6 +106,7 @@ type ChainsyncClientState struct { type State struct { sync.Mutex + eventBus *event.EventBus tip ChainsyncPoint clients map[ouroboros.ConnectionId]*ChainsyncClientState recentBlocks []ChainsyncBlock // TODO: replace with hook(s) for block storage/retrieval @@ -98,9 +114,10 @@ type State struct { clientConnId *ouroboros.ConnectionId // TODO: replace with handling of multiple chainsync clients } -func NewState() *State { +func NewState(eventBus *event.EventBus) *State { return &State{ - clients: make(map[ouroboros.ConnectionId]*ChainsyncClientState), + eventBus: eventBus, + clients: make(map[ouroboros.ConnectionId]*ChainsyncClientState), } } @@ -196,10 +213,22 @@ func (s *State) AddBlock(block ChainsyncBlock) { if len(s.recentBlocks) > maxRecentBlocks { s.recentBlocks = s.recentBlocks[len(s.recentBlocks)-maxRecentBlocks:] } - // Publish new block to subscribers + // Publish new block to chainsync subscribers for _, pubChan := range s.subs { pubChan <- block } + // Generate event + s.eventBus.Publish( + ChainsyncEventType, + event.NewEvent( + ChainsyncEventType, + ChainsyncEvent{ + Point: block.Point, + Type: block.Type, + Cbor: block.Cbor[:], + }, + ), + ) } func (s *State) Rollback(slot uint64, hash string) { @@ -213,7 +242,7 @@ func (s *State) Rollback(slot uint64, hash string) { break } } - // Publish rollback to subscribers + // Publish rollback to chainsync subscribers for _, pubChan := range s.subs { pubChan <- ChainsyncBlock{ Rollback: true, @@ -223,4 +252,18 @@ func (s *State) Rollback(slot uint64, hash string) { }, } } + // Generate event + s.eventBus.Publish( + ChainsyncEventType, + event.NewEvent( + ChainsyncEventType, + ChainsyncEvent{ + Rollback: true, + Point: ChainsyncPoint{ + SlotNumber: slot, + BlockHash: hash, + }, + }, + ), + ) } diff --git a/event/event.go b/event/event.go new file mode 100644 index 0000000..25e3822 --- /dev/null +++ b/event/event.go @@ -0,0 +1,113 @@ +// Copyright 2024 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package event + +import ( + "sync" + "time" +) + +const ( + EventQueueSize = 20 +) + +type EventType string + +type EventSubscriberId int + +type EventHandlerFunc func(Event) + +type Event struct { + Type EventType + Timestamp time.Time + Data any +} + +func NewEvent(eventType EventType, eventData any) Event { + return Event{ + Type: eventType, + Timestamp: time.Now(), + Data: eventData, + } +} + +type EventBus struct { + sync.Mutex + subscribers map[EventType]map[EventSubscriberId]chan Event + lastSubId EventSubscriberId +} + +// NewEventBus creates a new EventBus +func NewEventBus() *EventBus { + return &EventBus{ + subscribers: make(map[EventType]map[EventSubscriberId]chan Event), + } +} + +// Subscribe allows a consumer to receive events of a particular type via a channel +func (e *EventBus) Subscribe(eventType EventType) (EventSubscriberId, <-chan Event) { + e.Lock() + defer e.Unlock() + // Create event channel + evtCh := make(chan Event, EventQueueSize) + // Increment subscriber ID + subId := e.lastSubId + 1 + e.lastSubId = subId + // Add new subscriber + if _, ok := e.subscribers[eventType]; !ok { + e.subscribers[eventType] = make(map[EventSubscriberId]chan Event) + } + evtTypeSubs := e.subscribers[eventType] + evtTypeSubs[subId] = evtCh + return subId, evtCh +} + +// SubscribeFunc allows a consumer to receive events of a particular type via a callback function +func (e *EventBus) SubscribeFunc(eventType EventType, handlerFunc EventHandlerFunc) EventSubscriberId { + subId, evtCh := e.Subscribe(eventType) + go func(evtCh <-chan Event, handlerFunc EventHandlerFunc) { + for { + evt, ok := <-evtCh + if !ok { + return + } + handlerFunc(evt) + } + }(evtCh, handlerFunc) + return subId +} + +// Unsubscribe stops delivery of events for a particular type for an existing subscriber +func (e *EventBus) Unsubscribe(eventType EventType, subId EventSubscriberId) { + e.Lock() + defer e.Unlock() + if evtTypeSubs, ok := e.subscribers[eventType]; ok { + delete(evtTypeSubs, subId) + } +} + +// Publish allows a producer to send an event of a particular type to all subscribers +func (e *EventBus) Publish(eventType EventType, evt Event) { + e.Lock() + defer e.Unlock() + if subs, ok := e.subscribers[eventType]; ok { + for _, subCh := range subs { + // NOTE: this is purposely a blocking operation to prevent dropping data + // XXX: do we maybe want to detect a blocked channel and temporarily set it aside + // to get the event sent to the other subscribers? + subCh <- evt + } + } +} diff --git a/event/event_test.go b/event/event_test.go new file mode 100644 index 0000000..7522d34 --- /dev/null +++ b/event/event_test.go @@ -0,0 +1,115 @@ +// Copyright 2024 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package event_test + +import ( + "testing" + "time" + + "github.com/blinklabs-io/node/event" +) + +func TestEventBusSingleSubscriber(t *testing.T) { + var testEvtData int = 999 + var testEvtType event.EventType = "test.event" + eb := event.NewEventBus() + _, subCh := eb.Subscribe(testEvtType) + eb.Publish(testEvtType, event.NewEvent(testEvtType, testEvtData)) + select { + case evt, ok := <-subCh: + if !ok { + t.Fatalf("event channel closed unexpectedly") + } + switch v := evt.Data.(type) { + case int: + if v != testEvtData { + t.Fatalf("did not get expected event") + } + default: + t.Fatalf("event data was not of expected type, expected int, got %T", evt.Data) + } + case <-time.After(1 * time.Second): + t.Fatalf("timeout waiting for event") + } +} + +func TestEventBusMultipleSubscribers(t *testing.T) { + var testEvtData int = 999 + var testEvtType event.EventType = "test.event" + eb := event.NewEventBus() + _, sub1Ch := eb.Subscribe(testEvtType) + _, sub2Ch := eb.Subscribe(testEvtType) + eb.Publish(testEvtType, event.NewEvent(testEvtType, testEvtData)) + var gotVal1, gotVal2 bool + for { + if gotVal1 && gotVal2 { + break + } + select { + case evt, ok := <-sub1Ch: + if !ok { + t.Fatalf("event channel closed unexpectedly") + } + if gotVal1 { + t.Fatalf("received unexpected event") + } + switch v := evt.Data.(type) { + case int: + if v != testEvtData { + t.Fatalf("did not get expected event") + } + default: + t.Fatalf("event data was not of expected type, expected int, got %T", evt.Data) + } + gotVal1 = true + case evt, ok := <-sub2Ch: + if !ok { + t.Fatalf("event channel closed unexpectedly") + } + if gotVal2 { + t.Fatalf("received unexpected event") + } + switch v := evt.Data.(type) { + case int: + if v != testEvtData { + t.Fatalf("did not get expected event") + } + default: + t.Fatalf("event data was not of expected type, expected int, got %T", evt.Data) + } + gotVal2 = true + case <-time.After(1 * time.Second): + t.Fatalf("timeout waiting for event") + } + } +} + +func TestEventBusUnsubscribe(t *testing.T) { + var testEvtData int = 999 + var testEvtType event.EventType = "test.event" + eb := event.NewEventBus() + subId, subCh := eb.Subscribe(testEvtType) + eb.Unsubscribe(testEvtType, subId) + eb.Publish(testEvtType, event.NewEvent(testEvtType, testEvtData)) + select { + case _, ok := <-subCh: + if !ok { + t.Fatalf("event channel closed unexpectedly") + } + t.Fatalf("received unexpected event") + case <-time.After(1 * time.Second): + // NOTE: this is the expected way for the test to end + } +} diff --git a/mempool/mempool.go b/mempool/mempool.go index ff10ee6..81eecdd 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "github.com/blinklabs-io/node/event" + ouroboros "github.com/blinklabs-io/gouroboros" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -32,6 +34,21 @@ const ( txSubmissionMempoolExpirationPeriod = 1 * time.Minute ) +const ( + AddTransactionEventType event.EventType = "mempool.add_tx" + RemoveTransactionEventType event.EventType = "mempool.remove_tx" +) + +type AddTransactionEvent struct { + Hash string + Body []byte + Type uint +} + +type RemoveTransactionEvent struct { + Hash string +} + var ( txsProcessedNum_int = promauto.NewCounter(prometheus.CounterOpts{ Name: "cardano_node_metrics_txsProcessedNum_int", @@ -57,6 +74,7 @@ type MempoolTransaction struct { type Mempool struct { sync.Mutex logger *slog.Logger + eventBus *event.EventBus consumers map[ouroboros.ConnectionId]*MempoolConsumer consumersMutex sync.Mutex consumerIndex map[ouroboros.ConnectionId]int @@ -64,8 +82,9 @@ type Mempool struct { transactions []*MempoolTransaction } -func NewMempool(logger *slog.Logger) *Mempool { +func NewMempool(logger *slog.Logger, eventBus *event.EventBus) *Mempool { m := &Mempool{ + eventBus: eventBus, consumers: make(map[ouroboros.ConnectionId]*MempoolConsumer), } if logger == nil { @@ -194,6 +213,18 @@ func (m *Mempool) AddTransaction(tx MempoolTransaction) error { } } } + // Generate event + m.eventBus.Publish( + AddTransactionEventType, + event.NewEvent( + AddTransactionEventType, + AddTransactionEvent{ + Hash: tx.Hash, + Type: tx.Type, + Body: tx.Cbor[:], + }, + ), + ) return nil } @@ -246,6 +277,16 @@ func (m *Mempool) removeTransaction(hash string) bool { m.consumerIndex[connId] = consumerIdx } m.consumerIndexMutex.Unlock() + // Generate event + m.eventBus.Publish( + RemoveTransactionEventType, + event.NewEvent( + RemoveTransactionEventType, + RemoveTransactionEvent{ + Hash: tx.Hash, + }, + ), + ) return true } } diff --git a/node.go b/node.go index 3d78889..0ace1a7 100644 --- a/node.go +++ b/node.go @@ -19,6 +19,7 @@ import ( "sync" "github.com/blinklabs-io/node/chainsync" + "github.com/blinklabs-io/node/event" "github.com/blinklabs-io/node/mempool" ouroboros "github.com/blinklabs-io/gouroboros" @@ -28,16 +29,19 @@ type Node struct { config Config connManager *ouroboros.ConnectionManager chainsyncState *chainsync.State + eventBus *event.EventBus outboundConns map[ouroboros.ConnectionId]outboundPeer outboundConnsMutex sync.Mutex mempool *mempool.Mempool } func New(cfg Config) (*Node, error) { + eventBus := event.NewEventBus() n := &Node{ config: cfg, - chainsyncState: chainsync.NewState(), - mempool: mempool.NewMempool(cfg.logger), + chainsyncState: chainsync.NewState(eventBus), + eventBus: eventBus, + mempool: mempool.NewMempool(cfg.logger, eventBus), outboundConns: make(map[ouroboros.ConnectionId]outboundPeer), } if err := n.configPopulateNetworkMagic(); err != nil {