Skip to content

Commit

Permalink
Merge pull request #9 from kilnfi/feat/add-status-watcher
Browse files Browse the repository at this point in the history
feat: add status watcher
  • Loading branch information
rayanebel authored Jan 6, 2025
2 parents 26d4976 + 3ead556 commit 2af75a4
Show file tree
Hide file tree
Showing 17 changed files with 595 additions and 54 deletions.
32 changes: 29 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@


# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# configuration resources that should be keepd locally
config.yaml
.codegpt
bin/
config/
*.db*
exp/

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool
*.out

# Release
dist/

# Dependency directories (remove the comment below to include it)
bin/
output/
testbin/

# Others
**/.DS_Store
*.db*
5 changes: 5 additions & 0 deletions cmd/watcher/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
BlockWatcherConfig BlockWatcherConfig `mapstructure:"block-watcher"`
PoolWatcherConfig PoolWatcherConfig `mapstructure:"pool-watcher"`
NetworkWatcherConfig NetworkWatcherConfig `mapstructure:"network-watcher"`
StatusWatcherConfig StatusWatcherConfig `mapstructure:"status-watcher"`
}

type BlockWatcherConfig struct {
Expand All @@ -34,6 +35,10 @@ type NetworkWatcherConfig struct {
RefreshInterval int `mapstructure:"refresh-interval"`
}

type StatusWatcherConfig struct {
RefreshInterval int `mapstructure:"refresh-interval"`
}

type HTTPConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Expand Down
51 changes: 42 additions & 9 deletions cmd/watcher/app/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func NewWatcherCommand() *cobra.Command {
cmd.Flags().StringP("blockfrost-endpoint", "", "", "blockfrost API endpoint")
cmd.Flags().IntP("blockfrost-max-routines", "", 10, "number of routines used by blockfrost to perform concurrent actions")
cmd.Flags().IntP("blockfrost-timeout", "", 60, "Timeout for requests to the Blockfrost API (in seconds)")
cmd.Flags().IntP("status-watcher-refresh-interval", "", 15, "Interval at which the status watcher collects data about the network (in seconds)")
cmd.Flags().BoolP("network-watcher-enabled", "", true, "Enable network watcher")
cmd.Flags().IntP("network-watcher-refresh-interval", "", 60, "Interval at which the network watcher collects data about the network (in seconds)")
cmd.Flags().BoolP("pool-watcher-enabled", "", true, "Enable pool watcher")
Expand All @@ -112,10 +113,12 @@ func NewWatcherCommand() *cobra.Command {
checkError(viper.BindPFlag("blockfrost.timeout", cmd.Flag("blockfrost-timeout")), "unable to bind blockfrost-timeout flag")
checkError(viper.BindPFlag("network-watcher.enabled", cmd.Flag("network-watcher-enabled")), "unable to bind network-watcher-enabled flag")
checkError(viper.BindPFlag("network-watcher.refresh-interval", cmd.Flag("network-watcher-refresh-interval")), "unable to bind network-watcher-refresh-interval flag")
checkError(viper.BindPFlag("status-watcher.refresh-interval", cmd.Flag("status-watcher-refresh-interval")), "unable to bind status-watcher-refresh-interval flag")
checkError(viper.BindPFlag("pool-watcher.enabled", cmd.Flag("pool-watcher-enabled")), "unable to bind pool-watcher-enabled flag")
checkError(viper.BindPFlag("pool-watcher.refresh-interval", cmd.Flag("pool-watcher-refresh-interval")), "unable to bind pool-watcher-refresh-interval flag")
checkError(viper.BindPFlag("block-watcher.enabled", cmd.Flag("block-watcher-enabled")), "unable to bind block-watcher-enabled flag")
checkError(viper.BindPFlag("block-watcher.refresh-interval", cmd.Flag("block-watcher-refresh-interval")), "unable to bind block-watcher-refresh-interval flag")

return cmd
}

Expand Down Expand Up @@ -197,24 +200,29 @@ func run(_ *cobra.Command, _ []string) error {
return fmt.Errorf("unable to refresh slot leaders: %w", err)
}

healthStore := watcher.NewHealthStore()

// Start HTTP server
if err := startHTTPServer(eg, registry); err != nil {
if err := startHTTPServer(eg, registry, healthStore); err != nil {
return fmt.Errorf("unable to start http server: %w", err)
}

// Start Status Watcher
startStatusWatcher(ctx, eg, cardano, blockfrost, metrics, healthStore)

// Start Pool Watcher
if cfg.PoolWatcherConfig.Enabled {
startPoolWatcher(ctx, eg, blockfrost, metrics, cfg.Pools)
startPoolWatcher(ctx, eg, blockfrost, metrics, cfg.Pools, healthStore)
}

// Start Block Watcher
if cfg.BlockWatcherConfig.Enabled {
startBlockWatcher(ctx, eg, cardano, blockfrost, slotLeaderService, metrics, cfg.Pools, database.DB)
startBlockWatcher(ctx, eg, cardano, blockfrost, slotLeaderService, metrics, cfg.Pools, database.DB, healthStore)
}

// Start Network Watcher
if cfg.NetworkWatcherConfig.Enabled {
startNetworkWatcher(ctx, eg, blockfrost, metrics)
startNetworkWatcher(ctx, eg, blockfrost, metrics, healthStore)
}

<-ctx.Done()
Expand Down Expand Up @@ -259,11 +267,12 @@ func createCardanoClient(blockfrost blockfrost.Client) cardano.CardanoClient {
return cardanocli.NewClient(opts, blockfrost, &cardanocli.RealCommandExecutor{})
}

func startHTTPServer(eg *errgroup.Group, registry *prometheus.Registry) error {
func startHTTPServer(eg *errgroup.Group, registry *prometheus.Registry, healthStore *watcher.HealthStore) error {
var err error

server, err = http.New(
registry,
healthStore,
http.WithHost(cfg.HTTP.Host),
http.WithPort(cfg.HTTP.Port),
)
Expand All @@ -286,13 +295,36 @@ func startHTTPServer(eg *errgroup.Group, registry *prometheus.Registry) error {
return nil
}

// startStatusWatcher starts the status watcher service
func startStatusWatcher(
ctx context.Context,
eg *errgroup.Group,
cardano cardano.CardanoClient,
blockfrost blockfrost.Client,
metrics *metrics.Collection,
healthStore *watcher.HealthStore,
) {
eg.Go(func() error {
statusWatcher := watcher.NewStatusWatcher(blockfrost, cardano, metrics, healthStore)
logger.Info(
"starting watcher",
slog.String("component", "status-watcher"),
)
if err := statusWatcher.Start(ctx); err != nil {
return fmt.Errorf("unable to start status watcher: %w", err)
}
return nil
})
}

// startPoolWatcher starts the pool watcher service
func startPoolWatcher(
ctx context.Context,
eg *errgroup.Group,
blockfrost blockfrost.Client,
metrics *metrics.Collection,
pools pools.Pools,
healthStore *watcher.HealthStore,
) {
eg.Go(func() error {
options := watcher.PoolWatcherOptions{
Expand All @@ -303,7 +335,7 @@ func startPoolWatcher(
"starting watcher",
slog.String("component", "pool-watcher"),
)
poolWatcher, err := watcher.NewPoolWatcher(blockfrost, metrics, pools, options)
poolWatcher, err := watcher.NewPoolWatcher(blockfrost, metrics, pools, healthStore, options)
if err != nil {
return fmt.Errorf("unable to create pool watcher: %w", err)
}
Expand All @@ -314,12 +346,12 @@ func startPoolWatcher(
})
}

// startNetworkWatcher starts the network watcher service
func startNetworkWatcher(
ctx context.Context,
eg *errgroup.Group,
blockfrost blockfrost.Client,
metrics *metrics.Collection,
healthStore *watcher.HealthStore,
) {
eg.Go(func() error {
options := watcher.NetworkWatcherOptions{
Expand All @@ -331,7 +363,7 @@ func startNetworkWatcher(
"starting watcher",
slog.String("component", "network-watcher"),
)
networkWatcher := watcher.NewNetworkWatcher(blockfrost, metrics, options)
networkWatcher := watcher.NewNetworkWatcher(blockfrost, metrics, healthStore, options)
if err := networkWatcher.Start(ctx); err != nil {
return fmt.Errorf("unable to start network watcher: %w", err)
}
Expand All @@ -349,12 +381,13 @@ func startBlockWatcher(
metrics *metrics.Collection,
pools pools.Pools,
db *sqlx.DB,
healthStore *watcher.HealthStore,
) {
eg.Go(func() error {
options := watcher.BlockWatcherOptions{
RefreshInterval: time.Second * time.Duration(cfg.BlockWatcherConfig.RefreshInterval),
}
blockWatcher := watcher.NewBlockWatcher(cardano, blockfrost, sl, pools, metrics, db, options)
blockWatcher := watcher.NewBlockWatcher(cardano, blockfrost, sl, pools, metrics, db, healthStore, options)
logger.Info(
"starting watcher",
slog.String("component", "block-watcher"),
Expand Down
11 changes: 10 additions & 1 deletion internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Collection struct {
ExpectedBlocks *prometheus.GaugeVec
LatestSlotProcessedByBlockWatcher prometheus.Gauge
NextSlotLeader *prometheus.GaugeVec
HealthStatus prometheus.Gauge
}

func NewCollection() *Collection {
Expand Down Expand Up @@ -189,6 +190,13 @@ func NewCollection() *Collection {
},
[]string{"pool_name", "pool_id", "pool_instance", "epoch"},
),
HealthStatus: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "cardano_validator_watcher",
Name: "health_status",
Help: "Health status of the Cardano validator watcher: 1 = healthy, 0 = unhealthy",
},
),
}
}

Expand All @@ -198,14 +206,14 @@ func (m *Collection) MustRegister(reg prometheus.Registerer) {
reg.MustRegister(m.ChainID)
reg.MustRegister(m.EpochDuration)
reg.MustRegister(m.NetworkEpoch)
reg.MustRegister(m.NextEpochStartTime)
reg.MustRegister(m.NetworkBlockHeight)
reg.MustRegister(m.NetworkSlot)
reg.MustRegister(m.NetworkEpochSlot)
reg.MustRegister(m.NetworkTotalPools)
reg.MustRegister(m.NetworkCurrentEpochProposedBlocks)
reg.MustRegister(m.NetworkActiveStake)
reg.MustRegister(m.RelaysPerPool)
reg.MustRegister(m.NextEpochStartTime)
reg.MustRegister(m.PoolsPledgeMet)
reg.MustRegister(m.PoolsSaturationLevel)
reg.MustRegister(m.MonitoredValidatorsCount)
Expand All @@ -216,4 +224,5 @@ func (m *Collection) MustRegister(reg prometheus.Registerer) {
reg.MustRegister(m.ExpectedBlocks)
reg.MustRegister(m.LatestSlotProcessedByBlockWatcher)
reg.MustRegister(m.NextSlotLeader)
reg.MustRegister(m.HealthStatus)
}
2 changes: 1 addition & 1 deletion internal/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestMustRegister(t *testing.T) {
metrics.MustRegister(registry)

// The expected number of metrics to be registered, based on the definitions provided in the Collection struct.
expectedMetricsCount := 21
expectedMetricsCount := 22

var totalRegisteredMetrics int
size, _ := registry.Gather()
Expand Down
14 changes: 11 additions & 3 deletions internal/server/http/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package http
import (
"log/slog"
"net/http"

"github.com/kilnfi/cardano-validator-watcher/internal/watcher"
)

// Handler represents the HTTP handlers for the server
type Handler struct {
logger *slog.Logger
logger *slog.Logger
healthStore *watcher.HealthStore
}

// NewHandler returns a new Handler
func NewHandler(logger *slog.Logger) *Handler {
func NewHandler(logger *slog.Logger, healthStore *watcher.HealthStore) *Handler {
return &Handler{
logger: logger,
logger: logger,
healthStore: healthStore,
}
}

Expand All @@ -38,6 +42,10 @@ func (h *Handler) LiveProbe(w http.ResponseWriter, _ *http.Request) {
// If the service is ready, it returns a 200 OK status
// If the service is not ready, it returns a 500 Internal Server Error status
func (h *Handler) ReadyProbe(w http.ResponseWriter, _ *http.Request) {
if !h.healthStore.GetHealth() {
http.Error(w, "Health KO", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("Health OK"))
}
28 changes: 28 additions & 0 deletions internal/server/http/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/kilnfi/cardano-validator-watcher/internal/metrics"
"github.com/kilnfi/cardano-validator-watcher/internal/watcher"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -20,8 +21,10 @@ func TestDefaultHandler(t *testing.T) {
r := httptest.NewRequest(http.MethodGet, "/", nil)
w := httptest.NewRecorder()

healthStore := watcher.NewHealthStore()
server, err := New(
nil,
healthStore,
)

require.NoError(t, err)
Expand All @@ -35,8 +38,10 @@ func TestDefaultHandler(t *testing.T) {
r := httptest.NewRequest(http.MethodGet, "/fake", nil)
w := httptest.NewRecorder()

healthStore := watcher.NewHealthStore()
server, err := New(
nil,
healthStore,
)

require.NoError(t, err)
Expand All @@ -54,8 +59,10 @@ func TestLiveProbe(t *testing.T) {
r := httptest.NewRequest(http.MethodGet, "/livez", nil)
w := httptest.NewRecorder()

healthStore := watcher.NewHealthStore()
server, err := New(
nil,
healthStore,
)
require.NoError(t, err)
server.router.ServeHTTP(w, r)
Expand All @@ -73,14 +80,33 @@ func TestReadyProbe(t *testing.T) {
r := httptest.NewRequest(http.MethodGet, "/readyz", nil)
w := httptest.NewRecorder()

healthStore := watcher.NewHealthStore()
healthStore.SetHealth(true)
server, err := New(
nil,
healthStore,
)
require.NoError(t, err)
server.router.ServeHTTP(w, r)

assert.Equal(t, http.StatusOK, w.Code)
})

t.Run("SadPath_ReadyProbeIsNotReady", func(t *testing.T) {
r := httptest.NewRequest(http.MethodGet, "/readyz", nil)
w := httptest.NewRecorder()

healthStore := watcher.NewHealthStore()
healthStore.SetHealth(false)
server, err := New(
nil,
healthStore,
)
require.NoError(t, err)
server.router.ServeHTTP(w, r)

assert.Equal(t, http.StatusInternalServerError, w.Code)
})
}

func TestMetricsHandler(t *testing.T) {
Expand All @@ -96,8 +122,10 @@ func TestMetricsHandler(t *testing.T) {
metrics := metrics.NewCollection()
metrics.MustRegister(registry)

healthStore := watcher.NewHealthStore()
server, err := New(
registry,
healthStore,
)

require.NoError(t, err)
Expand Down
Loading

0 comments on commit 2af75a4

Please sign in to comment.