Skip to content

Commit

Permalink
Merge pull request #4 from kilnfi/feat/add-network-watcher
Browse files Browse the repository at this point in the history
feat: add a watcher to collect data about the network
  • Loading branch information
rayanebel authored Jan 6, 2025
2 parents 5c40217 + 42879c3 commit a63df6f
Show file tree
Hide file tree
Showing 8 changed files with 516 additions and 26 deletions.
6 changes: 6 additions & 0 deletions cmd/watcher/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ type Config struct {
Network string `mapstructure:"network"`
Blockfrost BlockFrostConfig `mapstructure:"blockfrost"`
PoolWatcherConfig PoolWatcherConfig `mapstructure:"pool-watcher"`
NetworkWatcherConfig NetworkWatcherConfig `mapstructure:"network-watcher"`
}

type PoolWatcherConfig struct {
Enabled bool `mapstructure:"enabled"`
RefreshInterval int `mapstructure:"refresh-interval"`
}

type NetworkWatcherConfig struct {
Enabled bool `mapstructure:"enabled"`
RefreshInterval int `mapstructure:"refresh-interval"`
}

type HTTPConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Expand Down
33 changes: 33 additions & 0 deletions cmd/watcher/app/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func NewWatcherCommand() *cobra.Command {
cmd.Flags().IntP("blockfrost-timeout", "", 60, "Timeout for requests to the Blockfrost API (in seconds)")
cmd.Flags().BoolP("pool-watcher-enabled", "", true, "Enable pool watcher")
cmd.Flags().IntP("pool-watcher-refresh-interval", "", 60, "Interval at which the pool watcher collects data about the monitored pools (in seconds)")
cmd.Flags().BoolP("network-watcher-enabled", "", true, "Enable network watcher")
cmd.Flags().IntP("network-watcher-refresh-interval", "", 60, "Interval at which the network watcher collects data about the network (in seconds)")

// bind flag to viper
checkError(viper.BindPFlag("log-level", cmd.Flag("log-level")), "unable to bind log-level flag")
Expand All @@ -94,6 +96,8 @@ func NewWatcherCommand() *cobra.Command {
checkError(viper.BindPFlag("blockfrost.timeout", cmd.Flag("blockfrost-timeout")), "unable to bind blockfrost-timeout flag")
checkError(viper.BindPFlag("pool-watcher.enabled", cmd.Flag("pool-watcher-enabled")), "unable to bind pool-watcher-enabled flag")
checkError(viper.BindPFlag("pool-watcher.refresh-interval", cmd.Flag("pool-watcher-refresh-interval")), "unable to bind pool-watcher-refresh-interval flag")
checkError(viper.BindPFlag("network-watcher.enabled", cmd.Flag("network-watcher-enabled")), "unable to bind network-watcher-enabled flag")
checkError(viper.BindPFlag("network-watcher.refresh-interval", cmd.Flag("network-watcher-refresh-interval")), "unable to bind network-watcher-refresh-interval flag")

return cmd
}
Expand Down Expand Up @@ -155,6 +159,11 @@ func run(_ *cobra.Command, _ []string) error {
return fmt.Errorf("unable to start http server: %w", err)
}

// Start Network Watcher
if cfg.NetworkWatcherConfig.Enabled {
startNetworkWatcher(ctx, eg, blockfrost, metrics)
}

// Start Pool Watcher
if cfg.PoolWatcherConfig.Enabled {
startPoolWatcher(ctx, eg, blockfrost, metrics, cfg.Pools)
Expand Down Expand Up @@ -246,6 +255,30 @@ func startPoolWatcher(
})
}

func startNetworkWatcher(
ctx context.Context,
eg *errgroup.Group,
blockfrost blockfrost.Client,
metrics *metrics.Collection,
) {
eg.Go(func() error {
options := watcher.NetworkWatcherOptions{
// to change
RefreshInterval: time.Second * time.Duration(cfg.PoolWatcherConfig.RefreshInterval),
Network: cfg.Network,
}
logger.Info(
"starting watcher",
slog.String("component", "network-watcher"),
)
networkWatcher := watcher.NewNetworkWatcher(blockfrost, metrics, options)
if err := networkWatcher.Start(ctx); err != nil {
return fmt.Errorf("unable to start network watcher: %w", err)
}
return nil
})
}

// checkError is a helper function to log an error and exit the program
// used for the flag parsing
func checkError(err error, msg string) {
Expand Down
99 changes: 95 additions & 4 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,95 @@ import (
)

type Collection struct {
RelaysPerPool *prometheus.GaugeVec
PoolsPledgeMet *prometheus.GaugeVec
PoolsSaturationLevel *prometheus.GaugeVec
MonitoredValidatorsCount *prometheus.GaugeVec
ChainID *prometheus.GaugeVec
EpochDuration prometheus.Gauge
NetworkEpoch prometheus.Gauge
NextEpochStartTime prometheus.Gauge
NetworkBlockHeight prometheus.Gauge
NetworkSlot prometheus.Gauge
NetworkEpochSlot prometheus.Gauge
NetworkTotalPools prometheus.Gauge
NetworkCurrentEpochProposedBlocks prometheus.Gauge
NetworkActiveStake prometheus.Gauge
RelaysPerPool *prometheus.GaugeVec
PoolsPledgeMet *prometheus.GaugeVec
PoolsSaturationLevel *prometheus.GaugeVec
MonitoredValidatorsCount *prometheus.GaugeVec
}

func NewCollection() *Collection {
return &Collection{
ChainID: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "chain_id",
Help: "Chain ID",
},
[]string{"chain_id"},
),
EpochDuration: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "epoch_duration",
Help: "Duration of an epoch in days",
},
),
NetworkEpoch: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "network_epoch",
Help: "Current epoch number",
},
),
NextEpochStartTime: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "next_epoch_start_time",
Help: "start time of the next epoch in seconds",
},
),
NetworkBlockHeight: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "network_block_height",
Help: "Latest known block height",
},
),
NetworkSlot: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "network_slot",
Help: "Latest known slot",
},
),
NetworkEpochSlot: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "network_epoch_slot",
Help: "Latest known epoch slot",
},
),
NetworkTotalPools: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "network_pools",
Help: "Total number of pools in the network",
},
),
NetworkCurrentEpochProposedBlocks: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "network_blocks_proposed_current_epoch",
Help: "Number of blocks proposed in the current epoch by the network",
},
),
NetworkActiveStake: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "network_active_stake",
Help: "Total active stake in the network",
},
),
RelaysPerPool: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Expand Down Expand Up @@ -52,6 +133,16 @@ func NewCollection() *Collection {
func (m *Collection) MustRegister(reg prometheus.Registerer) {
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())
reg.MustRegister(m.ChainID)
reg.MustRegister(m.EpochDuration)
reg.MustRegister(m.NetworkEpoch)
reg.MustRegister(m.NextEpochStartTime)
reg.MustRegister(m.NetworkBlockHeight)
reg.MustRegister(m.NetworkSlot)
reg.MustRegister(m.NetworkEpochSlot)
reg.MustRegister(m.NetworkTotalPools)
reg.MustRegister(m.NetworkCurrentEpochProposedBlocks)
reg.MustRegister(m.NetworkActiveStake)
reg.MustRegister(m.RelaysPerPool)
reg.MustRegister(m.PoolsPledgeMet)
reg.MustRegister(m.PoolsSaturationLevel)
Expand Down
3 changes: 2 additions & 1 deletion internal/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestMustRegister(t *testing.T) {
metrics := NewCollection()

// register metrics that need labels
metrics.ChainID.WithLabelValues("test_chain").Set(1)
metrics.RelaysPerPool.WithLabelValues("pool_name", "pool_id", "pool_instance").Set(5)
metrics.PoolsPledgeMet.WithLabelValues("pool_name", "pool_id", "pool_instance").Set(1)
metrics.PoolsSaturationLevel.WithLabelValues("pool_name", "pool_id", "pool_instance").Set(85)
Expand All @@ -26,7 +27,7 @@ func TestMustRegister(t *testing.T) {
metrics.MustRegister(registry)

// The expected number of metrics to be registered, based on the definitions provided in the Collection struct.
expectedMetricsCount := 4
expectedMetricsCount := 14

var totalRegisteredMetrics int
size, _ := registry.Gather()
Expand Down
79 changes: 79 additions & 0 deletions internal/watcher/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package watcher

import (
"context"
"testing"
"time"

blockfrostmocks "github.com/kilnfi/cardano-validator-watcher/internal/blockfrost/mocks"
"github.com/kilnfi/cardano-validator-watcher/internal/metrics"
"github.com/kilnfi/cardano-validator-watcher/internal/pools"
"github.com/prometheus/client_golang/prometheus"
)

type clients struct {
bf *blockfrostmocks.MockClient
}

func setupPools(t *testing.T) pools.Pools {
t.Helper()
return pools.Pools{
{
ID: "pool-0",
Instance: "pool-0",
Key: "key",
Name: "pool-0",
Exclude: false,
},
{
ID: "pool-1",
Instance: "pool-1",
Key: "key",
Name: "pool-1",
Exclude: true,
},
}
}

func setupClients(t *testing.T) *clients {
t.Helper()

return &clients{
bf: blockfrostmocks.NewMockClient(t),
}
}

func setupRegistry(t *testing.T) *struct {
registry *prometheus.Registry
metrics *metrics.Collection
metricsExpectedOutput string
metricsUnderTest []string
} {
t.Helper()

registry := prometheus.NewRegistry()
Collection := metrics.NewCollection()
Collection.MustRegister(registry)

return &struct {
registry *prometheus.Registry
metrics *metrics.Collection
metricsExpectedOutput string
metricsUnderTest []string
}{
registry: registry,
metrics: Collection,
metricsExpectedOutput: "",
metricsUnderTest: []string{},
}
}

func setupContextWithTimeout(t *testing.T, d time.Duration) context.Context {
t.Helper()

ctx, cancel := context.WithCancel(context.Background())
go func() {
time.AfterFunc(d, cancel)
}()
return ctx
}
Loading

0 comments on commit a63df6f

Please sign in to comment.