Skip to content

Commit

Permalink
feat: make prometheus registry configurable (#132)
Browse files Browse the repository at this point in the history
Rather than implicitly use the default prometheus registry, we now
require the prometheus registry to be explicitly provided. This avoids
conflicts when used as a library, as well as providing more flexibility.
  • Loading branch information
agaffney authored Sep 19, 2024
1 parent 8cee599 commit 8e35681
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 92 deletions.
34 changes: 19 additions & 15 deletions chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
blockNum_int = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cardano_node_metrics_blockNum_int",
Help: "current block number",
})
slotNum_int = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cardano_node_metrics_slotNum_int",
Help: "current slot number",
})
)

type ChainsyncClientState struct {
Cursor ocommon.Point
ChainIter *state.ChainIterator
Expand All @@ -53,14 +42,29 @@ type State struct {
ledgerState *state.LedgerState
clients map[ouroboros.ConnectionId]*ChainsyncClientState
clientConnId *ouroboros.ConnectionId // TODO: replace with handling of multiple chainsync clients
metrics struct {
blockNum prometheus.Gauge
slotNum prometheus.Gauge
}
}

func NewState(eventBus *event.EventBus, ledgerState *state.LedgerState) *State {
return &State{
func NewState(eventBus *event.EventBus, ledgerState *state.LedgerState, promRegistry prometheus.Registerer) *State {
s := &State{
eventBus: eventBus,
ledgerState: ledgerState,
clients: make(map[ouroboros.ConnectionId]*ChainsyncClientState),
}
// Init metrics
promautoFactory := promauto.With(promRegistry)
s.metrics.blockNum = promautoFactory.NewGauge(prometheus.GaugeOpts{
Name: "cardano_node_metrics_blockNum_int",
Help: "current block number",
})
s.metrics.slotNum = promautoFactory.NewGauge(prometheus.GaugeOpts{
Name: "cardano_node_metrics_slotNum_int",
Help: "current slot number",
})
return s
}

func (s *State) AddClient(
Expand Down Expand Up @@ -116,8 +120,8 @@ func (s *State) AddBlock(block ledger.Block, blockType uint) error {
// block number isn't stored in the block itself
blockNumber := block.BlockNumber()
// Uodate metrics
blockNum_int.Set(float64(blockNumber))
slotNum_int.Set(float64(slotNumber))
s.metrics.blockNum.Set(float64(blockNumber))
s.metrics.slotNum.Set(float64(slotNumber))
// Generate event
blkHash, err := hex.DecodeString(block.Hash())
if err != nil {
Expand Down
18 changes: 14 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

ouroboros "github.com/blinklabs-io/gouroboros"
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
"github.com/prometheus/client_golang/prometheus"
)

type Config struct {
Expand All @@ -31,8 +32,9 @@ type Config struct {
listeners []ListenerConfig
network string
networkMagic uint32
peerSharing bool
outboundSourcePort int
peerSharing bool
promRegistry prometheus.Registerer
topologyConfig *TopologyConfig
tracing bool
tracingStdout bool
Expand Down Expand Up @@ -143,17 +145,25 @@ func WithNetworkMagic(networkMagic uint32) ConfigOptionFunc {
}
}

// WithOutboundSourcePort specifies the source port to use for outbound connections. This defaults to dynamic source ports
func WithOutboundSourcePort(port int) ConfigOptionFunc {
return func(c *Config) {
c.outboundSourcePort = port
}
}

// WithPeerSharing specifies whether to enable peer sharing. This is disabled by default
func WithPeerSharing(peerSharing bool) ConfigOptionFunc {
return func(c *Config) {
c.peerSharing = peerSharing
}
}

// WithOutboundSourcePort specifies the source port to use for outbound connections. This defaults to dynamic source ports
func WithOutboundSourcePort(port int) ConfigOptionFunc {
// WithPrometheusRegistry specifies a prometheus.Registerer instance to add metrics to. In most cases, prometheus.DefaultRegistry would be
// a good choice to get metrics working
func WithPrometheusRegistry(registry prometheus.Registerer) ConfigOptionFunc {
return func(c *Config) {
c.outboundSourcePort = port
c.promRegistry = registry
}
}

Expand Down
15 changes: 10 additions & 5 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package event
import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

const (
Expand Down Expand Up @@ -47,13 +49,16 @@ type EventBus struct {
sync.Mutex
subscribers map[EventType]map[EventSubscriberId]chan Event
lastSubId EventSubscriberId
metrics eventMetrics
}

// NewEventBus creates a new EventBus
func NewEventBus() *EventBus {
return &EventBus{
func NewEventBus(promRegistry prometheus.Registerer) *EventBus {
e := &EventBus{
subscribers: make(map[EventType]map[EventSubscriberId]chan Event),
}
e.initMetrics(promRegistry)
return e
}

// Subscribe allows a consumer to receive events of a particular type via a channel
Expand All @@ -71,7 +76,7 @@ func (e *EventBus) Subscribe(eventType EventType) (EventSubscriberId, <-chan Eve
}
evtTypeSubs := e.subscribers[eventType]
evtTypeSubs[subId] = evtCh
metricSubscribers.WithLabelValues(string(eventType)).Inc()
e.metrics.subscribers.WithLabelValues(string(eventType)).Inc()
return subId, evtCh
}

Expand All @@ -97,7 +102,7 @@ func (e *EventBus) Unsubscribe(eventType EventType, subId EventSubscriberId) {
if evtTypeSubs, ok := e.subscribers[eventType]; ok {
delete(evtTypeSubs, subId)
}
metricSubscribers.WithLabelValues(string(eventType)).Dec()
e.metrics.subscribers.WithLabelValues(string(eventType)).Dec()
}

// Publish allows a producer to send an event of a particular type to all subscribers
Expand All @@ -112,5 +117,5 @@ func (e *EventBus) Publish(eventType EventType, evt Event) {
subCh <- evt
}
}
metricEventsTotal.WithLabelValues(string(eventType)).Inc()
e.metrics.eventsTotal.WithLabelValues(string(eventType)).Inc()
}
6 changes: 3 additions & 3 deletions event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func TestEventBusSingleSubscriber(t *testing.T) {
var testEvtData int = 999
var testEvtType event.EventType = "test.event"
eb := event.NewEventBus()
eb := event.NewEventBus(nil)
_, subCh := eb.Subscribe(testEvtType)
eb.Publish(testEvtType, event.NewEvent(testEvtType, testEvtData))
select {
Expand All @@ -48,7 +48,7 @@ func TestEventBusSingleSubscriber(t *testing.T) {
func TestEventBusMultipleSubscribers(t *testing.T) {
var testEvtData int = 999
var testEvtType event.EventType = "test.event"
eb := event.NewEventBus()
eb := event.NewEventBus(nil)
_, sub1Ch := eb.Subscribe(testEvtType)
_, sub2Ch := eb.Subscribe(testEvtType)
eb.Publish(testEvtType, event.NewEvent(testEvtType, testEvtData))
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestEventBusMultipleSubscribers(t *testing.T) {
func TestEventBusUnsubscribe(t *testing.T) {
var testEvtData int = 999
var testEvtType event.EventType = "test.event"
eb := event.NewEventBus()
eb := event.NewEventBus(nil)
subId, subCh := eb.Subscribe(testEvtType)
eb.Unsubscribe(testEvtType, subId)
eb.Publish(testEvtType, event.NewEvent(testEvtType, testEvtData))
Expand Down
14 changes: 10 additions & 4 deletions event/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
metricEventsTotal = promauto.NewCounterVec(
type eventMetrics struct {
eventsTotal *prometheus.CounterVec
subscribers *prometheus.GaugeVec
}

func (e *EventBus) initMetrics(promRegistry prometheus.Registerer) {
promautoFactory := promauto.With(promRegistry)
e.metrics.eventsTotal = promautoFactory.NewCounterVec(
prometheus.CounterOpts{
Name: "event_total",
Help: "total events by type",
},
[]string{"type"},
)
metricSubscribers = promauto.NewGaugeVec(
e.metrics.subscribers = promautoFactory.NewGaugeVec(
prometheus.GaugeOpts{
Name: "event_subscribers",
Help: "subscribers by event type",
},
[]string{"type"},
)
)
}
22 changes: 17 additions & 5 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ import (
"fmt"
"log/slog"
"net"
"net/http"
"os"

"github.com/blinklabs-io/node"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func Run(logger *slog.Logger) error {
Expand All @@ -29,6 +34,16 @@ func Run(logger *slog.Logger) error {
return err
}
logger.Info("listening for ouroboros node-to-node connections on :3001")
// Metrics listener
http.Handle("/metrics", promhttp.Handler())
logger.Info("listening for prometheus metrics connections on :12798")
go func() {
// TODO: make this configurable
if err := http.ListenAndServe(":12798", nil); err != nil {
logger.Error(fmt.Sprintf("failed to start metrics listener: %s", err))
os.Exit(1)
}
}()
n, err := node.New(
node.NewConfig(
node.WithIntersectTip(true),
Expand All @@ -42,6 +57,8 @@ func Run(logger *slog.Logger) error {
Listener: l,
},
),
// Enable metrics with default prometheus registry
node.WithPrometheusRegistry(prometheus.DefaultRegisterer),
// TODO: make this configurable
//node.WithTracing(true),
// TODO: replace with parsing topology file
Expand All @@ -64,11 +81,6 @@ func Run(logger *slog.Logger) error {
if err != nil {
return err
}
go func() {
if err := n.StartMetrics(); err != nil {
logger.Error(fmt.Sprintf("failed to start metrics listener %v", err))
}
}()
if err := n.Run(); err != nil {
return err
}
Expand Down
46 changes: 25 additions & 21 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,6 @@ type RemoveTransactionEvent struct {
Hash string
}

var (
txsProcessedNum_int = promauto.NewCounter(prometheus.CounterOpts{
Name: "cardano_node_metrics_txsProcessedNum_int",
Help: "total transactions processed",
})
txsInMempool_int = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cardano_node_metrics_txsInMempool_int",
Help: "current count of mempool transactions",
})
mempoolBytes_int = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cardano_node_metrics_mempoolBytes_int",
Help: "current size of mempool transactions in bytes",
})
)

type MempoolTransaction struct {
Hash string
Type uint
Expand All @@ -80,9 +65,14 @@ type Mempool struct {
consumerIndex map[ouroboros.ConnectionId]int
consumerIndexMutex sync.Mutex
transactions []*MempoolTransaction
metrics struct {
txsProcessedNum prometheus.Counter
txsInMempool prometheus.Gauge
mempoolBytes prometheus.Gauge
}
}

func NewMempool(logger *slog.Logger, eventBus *event.EventBus) *Mempool {
func NewMempool(logger *slog.Logger, eventBus *event.EventBus, promRegistry prometheus.Registerer) *Mempool {
m := &Mempool{
eventBus: eventBus,
consumers: make(map[ouroboros.ConnectionId]*MempoolConsumer),
Expand All @@ -95,6 +85,20 @@ func NewMempool(logger *slog.Logger, eventBus *event.EventBus) *Mempool {
// TODO: replace this with purging based on on-chain TXs
// Schedule initial mempool expired cleanup
m.scheduleRemoveExpired()
// Init metrics
promautoFactory := promauto.With(promRegistry)
m.metrics.txsProcessedNum = promautoFactory.NewCounter(prometheus.CounterOpts{
Name: "cardano_node_metrics_txsProcessedNum_int",
Help: "total transactions processed",
})
m.metrics.txsInMempool = promautoFactory.NewGauge(prometheus.GaugeOpts{
Name: "cardano_node_metrics_txsInMempool_int",
Help: "current count of mempool transactions",
})
m.metrics.mempoolBytes = promautoFactory.NewGauge(prometheus.GaugeOpts{
Name: "cardano_node_metrics_mempoolBytes_int",
Help: "current size of mempool transactions in bytes",
})
return m
}

Expand Down Expand Up @@ -201,9 +205,9 @@ func (m *Mempool) AddTransaction(tx MempoolTransaction) error {
m.logger.Debug(
fmt.Sprintf("added transaction %s to mempool", tx.Hash),
)
txsProcessedNum_int.Inc()
txsInMempool_int.Inc()
mempoolBytes_int.Add(float64(len(tx.Cbor)))
m.metrics.txsProcessedNum.Inc()
m.metrics.txsInMempool.Inc()
m.metrics.mempoolBytes.Add(float64(len(tx.Cbor)))
// Send new TX to consumers that are ready for it
newTxIdx := len(m.transactions) - 1
for connId, consumerIdx := range m.consumerIndex {
Expand Down Expand Up @@ -268,8 +272,8 @@ func (m *Mempool) removeTransaction(hash string) bool {
txIdx,
txIdx+1,
)
txsInMempool_int.Dec()
mempoolBytes_int.Sub(float64(len(tx.Cbor)))
m.metrics.txsInMempool.Dec()
m.metrics.mempoolBytes.Sub(float64(len(tx.Cbor)))
// Update consumer indexes to reflect removed TX
for connId, consumerIdx := range m.consumerIndex {
// Decrement consumer index if the consumer has reached the removed TX
Expand Down
32 changes: 0 additions & 32 deletions metrics.go

This file was deleted.

Loading

0 comments on commit 8e35681

Please sign in to comment.