Skip to content

Commit

Permalink
feat: event bus (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
agaffney committed Jun 3, 2024
1 parent 947539d commit 6e9b7c5
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 7 deletions.
51 changes: 47 additions & 4 deletions chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"sync"

"github.com/blinklabs-io/node/event"

ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/gouroboros/connection"
ochainsync "github.com/blinklabs-io/gouroboros/protocol/chainsync"
Expand All @@ -42,6 +44,19 @@ var (
})
)

const (
ChainsyncEventType event.EventType = "chainsync.event"
)

// ChainsyncEvent represents either a RollForward or RollBackward chainsync event.
// We use a single event type for both to make synchronization easier.
type ChainsyncEvent struct {
Point ChainsyncPoint
Cbor []byte
Type uint
Rollback bool
}

type ChainsyncPoint struct {
SlotNumber uint64
BlockHash string
Expand Down Expand Up @@ -91,16 +106,18 @@ type ChainsyncClientState struct {

type State struct {
sync.Mutex
eventBus *event.EventBus
tip ChainsyncPoint
clients map[ouroboros.ConnectionId]*ChainsyncClientState
recentBlocks []ChainsyncBlock // TODO: replace with hook(s) for block storage/retrieval
subs map[ouroboros.ConnectionId]chan ChainsyncBlock
clientConnId *ouroboros.ConnectionId // TODO: replace with handling of multiple chainsync clients
}

func NewState() *State {
func NewState(eventBus *event.EventBus) *State {
return &State{
clients: make(map[ouroboros.ConnectionId]*ChainsyncClientState),
eventBus: eventBus,
clients: make(map[ouroboros.ConnectionId]*ChainsyncClientState),
}
}

Expand Down Expand Up @@ -196,10 +213,22 @@ func (s *State) AddBlock(block ChainsyncBlock) {
if len(s.recentBlocks) > maxRecentBlocks {
s.recentBlocks = s.recentBlocks[len(s.recentBlocks)-maxRecentBlocks:]
}
// Publish new block to subscribers
// Publish new block to chainsync subscribers
for _, pubChan := range s.subs {
pubChan <- block
}
// Generate event
s.eventBus.Publish(
ChainsyncEventType,
event.NewEvent(
ChainsyncEventType,
ChainsyncEvent{
Point: block.Point,
Type: block.Type,
Cbor: block.Cbor[:],
},
),
)
}

func (s *State) Rollback(slot uint64, hash string) {
Expand All @@ -213,7 +242,7 @@ func (s *State) Rollback(slot uint64, hash string) {
break
}
}
// Publish rollback to subscribers
// Publish rollback to chainsync subscribers
for _, pubChan := range s.subs {
pubChan <- ChainsyncBlock{
Rollback: true,
Expand All @@ -223,4 +252,18 @@ func (s *State) Rollback(slot uint64, hash string) {
},
}
}
// Generate event
s.eventBus.Publish(
ChainsyncEventType,
event.NewEvent(
ChainsyncEventType,
ChainsyncEvent{
Rollback: true,
Point: ChainsyncPoint{
SlotNumber: slot,
BlockHash: hash,
},
},
),
)
}
113 changes: 113 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package event

import (
"sync"
"time"
)

const (
EventQueueSize = 20
)

type EventType string

type EventSubscriberId int

type EventHandlerFunc func(Event)

type Event struct {
Type EventType
Timestamp time.Time
Data any
}

func NewEvent(eventType EventType, eventData any) Event {
return Event{
Type: eventType,
Timestamp: time.Now(),
Data: eventData,
}
}

type EventBus struct {
sync.Mutex
subscribers map[EventType]map[EventSubscriberId]chan Event
lastSubId EventSubscriberId
}

// NewEventBus creates a new EventBus
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[EventType]map[EventSubscriberId]chan Event),
}
}

// Subscribe allows a consumer to receive events of a particular type via a channel
func (e *EventBus) Subscribe(eventType EventType) (EventSubscriberId, <-chan Event) {
e.Lock()
defer e.Unlock()
// Create event channel
evtCh := make(chan Event, EventQueueSize)
// Increment subscriber ID
subId := e.lastSubId + 1
e.lastSubId = subId
// Add new subscriber
if _, ok := e.subscribers[eventType]; !ok {
e.subscribers[eventType] = make(map[EventSubscriberId]chan Event)
}
evtTypeSubs := e.subscribers[eventType]
evtTypeSubs[subId] = evtCh
return subId, evtCh
}

// SubscribeFunc allows a consumer to receive events of a particular type via a callback function
func (e *EventBus) SubscribeFunc(eventType EventType, handlerFunc EventHandlerFunc) EventSubscriberId {
subId, evtCh := e.Subscribe(eventType)
go func(evtCh <-chan Event, handlerFunc EventHandlerFunc) {
for {
evt, ok := <-evtCh
if !ok {
return
}
handlerFunc(evt)
}
}(evtCh, handlerFunc)
return subId
}

// Unsubscribe stops delivery of events for a particular type for an existing subscriber
func (e *EventBus) Unsubscribe(eventType EventType, subId EventSubscriberId) {
e.Lock()
defer e.Unlock()
if evtTypeSubs, ok := e.subscribers[eventType]; ok {
delete(evtTypeSubs, subId)
}
}

// Publish allows a producer to send an event of a particular type to all subscribers
func (e *EventBus) Publish(eventType EventType, evt Event) {
e.Lock()
defer e.Unlock()
if subs, ok := e.subscribers[eventType]; ok {
for _, subCh := range subs {
// NOTE: this is purposely a blocking operation to prevent dropping data
// XXX: do we maybe want to detect a blocked channel and temporarily set it aside
// to get the event sent to the other subscribers?
subCh <- evt
}
}
}
115 changes: 115 additions & 0 deletions event/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package event_test

import (
"testing"
"time"

"github.com/blinklabs-io/node/event"
)

func TestEventBusSingleSubscriber(t *testing.T) {
var testEvtData int = 999
var testEvtType event.EventType = "test.event"
eb := event.NewEventBus()
_, subCh := eb.Subscribe(testEvtType)
eb.Publish(testEvtType, event.NewEvent(testEvtType, testEvtData))
select {
case evt, ok := <-subCh:
if !ok {
t.Fatalf("event channel closed unexpectedly")
}
switch v := evt.Data.(type) {
case int:
if v != testEvtData {
t.Fatalf("did not get expected event")
}
default:
t.Fatalf("event data was not of expected type, expected int, got %T", evt.Data)
}
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for event")
}
}

func TestEventBusMultipleSubscribers(t *testing.T) {
var testEvtData int = 999
var testEvtType event.EventType = "test.event"
eb := event.NewEventBus()
_, sub1Ch := eb.Subscribe(testEvtType)
_, sub2Ch := eb.Subscribe(testEvtType)
eb.Publish(testEvtType, event.NewEvent(testEvtType, testEvtData))
var gotVal1, gotVal2 bool
for {
if gotVal1 && gotVal2 {
break
}
select {
case evt, ok := <-sub1Ch:
if !ok {
t.Fatalf("event channel closed unexpectedly")
}
if gotVal1 {
t.Fatalf("received unexpected event")
}
switch v := evt.Data.(type) {
case int:
if v != testEvtData {
t.Fatalf("did not get expected event")
}
default:
t.Fatalf("event data was not of expected type, expected int, got %T", evt.Data)
}
gotVal1 = true
case evt, ok := <-sub2Ch:
if !ok {
t.Fatalf("event channel closed unexpectedly")
}
if gotVal2 {
t.Fatalf("received unexpected event")
}
switch v := evt.Data.(type) {
case int:
if v != testEvtData {
t.Fatalf("did not get expected event")
}
default:
t.Fatalf("event data was not of expected type, expected int, got %T", evt.Data)
}
gotVal2 = true
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for event")
}
}
}

func TestEventBusUnsubscribe(t *testing.T) {
var testEvtData int = 999
var testEvtType event.EventType = "test.event"
eb := event.NewEventBus()
subId, subCh := eb.Subscribe(testEvtType)
eb.Unsubscribe(testEvtType, subId)
eb.Publish(testEvtType, event.NewEvent(testEvtType, testEvtData))
select {
case _, ok := <-subCh:
if !ok {
t.Fatalf("event channel closed unexpectedly")
}
t.Fatalf("received unexpected event")
case <-time.After(1 * time.Second):
// NOTE: this is the expected way for the test to end
}
}
Loading

0 comments on commit 6e9b7c5

Please sign in to comment.