Skip to content

Commit

Permalink
Merge pull request #3 from kilnfi/feat/add-pool-watcher
Browse files Browse the repository at this point in the history
feat: add a watcher to monitor pools
  • Loading branch information
rayanebel authored Jan 6, 2025
2 parents ce1d165 + e37b79c commit 5c40217
Show file tree
Hide file tree
Showing 26 changed files with 2,871 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
config.yaml
.codegpt
bin/
6 changes: 6 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
with-expecter: true
dir: "{{.InterfaceDir}}/mocks/"
packages:
github.com/kilnfi/cardano-validator-watcher/internal/blockfrost:
interfaces:
Client:
29 changes: 29 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
.PHONY: generate
generate:
@mockery

.PHONY: build
build: generate
@go build -ldflags="-s -w" -o bin/cardano-validator-watcher cmd/watcher/main.go

.PHONY: run
run: generate
@go run cmd/watcher/main.go --config config.yaml

.PHONY: tests
tests:
@go test -v ./...

.PHONY: coverage
coverage:
@go test -coverprofile=coverage.out ./...
@go tool cover -html=coverage.out

.PHONY: lint
lint:
@golangci-lint run ./...

.PHONY: clean
clean:
@echo "cleaning up..."
@rm -rf *.db*
70 changes: 70 additions & 0 deletions cmd/watcher/app/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package config

import (
"errors"
"fmt"

"github.com/kilnfi/cardano-validator-watcher/internal/pools"
)

type Config struct {
Pools pools.Pools `mapstructure:"pools"`
HTTP HTTPConfig `mapstructure:"http"`
Network string `mapstructure:"network"`
Blockfrost BlockFrostConfig `mapstructure:"blockfrost"`
PoolWatcherConfig PoolWatcherConfig `mapstructure:"pool-watcher"`
}

type PoolWatcherConfig struct {
Enabled bool `mapstructure:"enabled"`
RefreshInterval int `mapstructure:"refresh-interval"`
}

type HTTPConfig struct {
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
}

type BlockFrostConfig struct {
ProjectID string `mapstructure:"project-id"`
Endpoint string `mapstructure:"endpoint"`
MaxRoutines int `mapstructure:"max-routines"`
Timeout int `mapstructure:"timeout"`
}

func (c *Config) Validate() error {
switch c.Network {
case "mainnet", "preprod":
default:
return fmt.Errorf("invalid network: %s. Network must be either %s or %s", c.Network, "mainnet", "preprod")
}

if len(c.Pools) == 0 {
return errors.New("at least one pool must be defined")
}
for _, pool := range c.Pools {
if pool.Instance == "" {
return errors.New("instance is required for all pools")
}
if pool.ID == "" {
return errors.New("id is required for all pools")
}
if pool.Name == "" {
return errors.New("name is required for all pools")
}
if pool.Key == "" {
return errors.New("key is required for all pools")
}
}

activePools := c.Pools.GetActivePools()
if len(activePools) == 0 {
return errors.New("at least one active pool must be defined")
}

if c.Blockfrost.ProjectID == "" || c.Blockfrost.Endpoint == "" {
return errors.New("blockfrost project-id and endpoint are required")
}

return nil
}
256 changes: 256 additions & 0 deletions cmd/watcher/app/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package app

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/kilnfi/cardano-validator-watcher/cmd/watcher/app/config"
"github.com/kilnfi/cardano-validator-watcher/internal/blockfrost"
"github.com/kilnfi/cardano-validator-watcher/internal/blockfrost/blockfrostapi"
"github.com/kilnfi/cardano-validator-watcher/internal/metrics"
"github.com/kilnfi/cardano-validator-watcher/internal/pools"
"github.com/kilnfi/cardano-validator-watcher/internal/server/http"
"github.com/kilnfi/cardano-validator-watcher/internal/watcher"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"

"github.com/spf13/cobra"
"github.com/spf13/viper"
)

var (
configFile string
server *http.Server
cfg *config.Config
logger *slog.Logger
)

func init() {
cobra.OnInitialize(initLogger)
cobra.OnInitialize(loadConfig)
}

func initLogger() {
var logLevel slog.Level
switch viper.GetString("log-level") {
case "info":
logLevel = slog.LevelInfo
case "warn":
logLevel = slog.LevelWarn
case "error":
logLevel = slog.LevelError
case "debug":
logLevel = slog.LevelDebug
default:
logLevel = slog.LevelInfo
}

logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: logLevel,
}))
slog.SetDefault(logger)
}

func NewWatcherCommand() *cobra.Command {
cmd := &cobra.Command{
TraverseChildren: true,
Use: "cardano-validator-watcher",
Short: "cardano validator watcher is used to monitor our cardano pools",
Long: `cardano validator watcher is a long-running program designed
to collect metrics for monitoring our Cardano validation nodes.
This tool helps us ensure the health and performance of our nodes in the Cardano network.`,
SilenceUsage: true,
SilenceErrors: true,
RunE: run,
}

cmd.Flags().StringVarP(&configFile, "config", "", "", "config file (default is config.yml)")
cmd.Flags().StringP("log-level", "", "info", "config file (default is config.yml)")
cmd.Flags().StringP("http-server-host", "", http.ServerDefaultHost, "host on which HTTP server should listen")
cmd.Flags().IntP("http-server-port", "", http.ServerDefaultPort, "port on which HTTP server should listen")
cmd.Flags().StringP("network", "", "preprod", "cardano network ID")
cmd.Flags().StringP("blockfrost-project-id", "", "", "blockfrost project id")
cmd.Flags().StringP("blockfrost-endpoint", "", "", "blockfrost API endpoint")
cmd.Flags().IntP("blockfrost-max-routines", "", 10, "number of routines used by blockfrost to perform concurrent actions")
cmd.Flags().IntP("blockfrost-timeout", "", 60, "Timeout for requests to the Blockfrost API (in seconds)")
cmd.Flags().BoolP("pool-watcher-enabled", "", true, "Enable pool watcher")
cmd.Flags().IntP("pool-watcher-refresh-interval", "", 60, "Interval at which the pool watcher collects data about the monitored pools (in seconds)")

// bind flag to viper
checkError(viper.BindPFlag("log-level", cmd.Flag("log-level")), "unable to bind log-level flag")
checkError(viper.BindPFlag("http.host", cmd.Flag("http-server-host")), "unable to bind http-server-host flag")
checkError(viper.BindPFlag("http.port", cmd.Flag("http-server-port")), "unable to bind http-server-port flag")
checkError(viper.BindPFlag("network", cmd.Flag("network")), "unable to bind network flag")
checkError(viper.BindPFlag("blockfrost.project-id", cmd.Flag("blockfrost-project-id")), "unable to bind blockfrost-project-id flag")
checkError(viper.BindPFlag("blockfrost.endpoint", cmd.Flag("blockfrost-endpoint")), "unable to bind blockfrost-endpoint flag")
checkError(viper.BindPFlag("blockfrost.max-routines", cmd.Flag("blockfrost-max-routines")), "unable to bind blockfrost-max-routines flag")
checkError(viper.BindPFlag("blockfrost.timeout", cmd.Flag("blockfrost-timeout")), "unable to bind blockfrost-timeout flag")
checkError(viper.BindPFlag("pool-watcher.enabled", cmd.Flag("pool-watcher-enabled")), "unable to bind pool-watcher-enabled flag")
checkError(viper.BindPFlag("pool-watcher.refresh-interval", cmd.Flag("pool-watcher-refresh-interval")), "unable to bind pool-watcher-refresh-interval flag")

return cmd
}

// loadConfig read the configuration and load it.
func loadConfig() {
if configFile != "" {
viper.SetConfigFile(configFile)
} else {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
}

viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_"))
viper.AutomaticEnv()

// read the config file
if err := viper.ReadInConfig(); err != nil {
logger.Error("unable to read config file", slog.String("error", err.Error()))
os.Exit(1)
}

// unmarshal the config
cfg = &config.Config{}
if err := viper.Unmarshal(cfg); err != nil {
logger.Error("unable to unmarshal config", slog.String("error", err.Error()))
os.Exit(1)
}

// validate the config
if err := cfg.Validate(); err != nil {
logger.Error("invalid configuration", slog.String("error", err.Error()))
os.Exit(1)
}
}

func run(_ *cobra.Command, _ []string) error {
// Initialize context and cancel function
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Initialize signal channel for handling interrupts
ctx, cancel = signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer cancel()

eg, ctx := errgroup.WithContext(ctx)

// Initialize blockfrost and cardano clients with options
blockfrost := createBlockfrostClient()

// Initialize prometheus metrics
registry := prometheus.NewRegistry()
metrics := metrics.NewCollection()
metrics.MustRegister(registry)

// Start HTTP server
if err := startHTTPServer(eg, registry); err != nil {
return fmt.Errorf("unable to start http server: %w", err)
}

// Start Pool Watcher
if cfg.PoolWatcherConfig.Enabled {
startPoolWatcher(ctx, eg, blockfrost, metrics, cfg.Pools)
}

<-ctx.Done()
logger.Info("shutting down")

// shutting down HTTP server
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
logger.Info("stopping http server")
if err := server.Stop(ctx); err != nil {
logger.Error("unable to stop http service", slog.String("error", err.Error()))
}

if err := eg.Wait(); err != nil {
if errors.Is(err, context.Canceled) {
logger.Info("Program interrupted by user")
return nil
}
return fmt.Errorf("error during execution: %w", err)
}
return nil
}

func createBlockfrostClient() blockfrost.Client {
opts := blockfrostapi.ClientOptions{
ProjectID: cfg.Blockfrost.ProjectID,
Server: cfg.Blockfrost.Endpoint,
MaxRoutines: cfg.Blockfrost.MaxRoutines,
Timeout: time.Second * time.Duration(cfg.Blockfrost.Timeout),
}
return blockfrostapi.NewClient(opts)
}

func startHTTPServer(eg *errgroup.Group, registry *prometheus.Registry) error {
var err error

server, err = http.New(
registry,
http.WithHost(cfg.HTTP.Host),
http.WithPort(cfg.HTTP.Port),
)
if err != nil {
return fmt.Errorf("unable to create http server: %w", err)
}

eg.Go(func() error {
logger.Info(
"starting http server",
slog.String("component", "http-server"),
slog.String("addr", fmt.Sprintf("%s:%d", cfg.HTTP.Host, cfg.HTTP.Port)),
)
if err := server.Start(); err != nil {
return fmt.Errorf("unable to start http server: %w", err)
}
return nil
})

return nil
}

// startPoolWatcher starts the pool watcher service
func startPoolWatcher(
ctx context.Context,
eg *errgroup.Group,
blockfrost blockfrost.Client,
metrics *metrics.Collection,
pools pools.Pools,
) {
eg.Go(func() error {
options := watcher.PoolWatcherOptions{
RefreshInterval: time.Second * time.Duration(cfg.PoolWatcherConfig.RefreshInterval),
Network: cfg.Network,
}
logger.Info(
"starting watcher",
slog.String("component", "pool-watcher"),
)
poolWatcher, err := watcher.NewPoolWatcher(blockfrost, metrics, pools, options)
if err != nil {
return fmt.Errorf("unable to create pool watcher: %w", err)
}
if err := poolWatcher.Start(ctx); err != nil {
return fmt.Errorf("unable to start pool watcher: %w", err)
}
return nil
})
}

// checkError is a helper function to log an error and exit the program
// used for the flag parsing
func checkError(err error, msg string) {
if err != nil {
logger.Error(msg, slog.String("error", err.Error()))
os.Exit(1)
}
}
20 changes: 20 additions & 0 deletions cmd/watcher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package main

import (
"log/slog"
"os"

"github.com/kilnfi/cardano-validator-watcher/cmd/watcher/app"
)

func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))

command := app.NewWatcherCommand()
if err := command.Execute(); err != nil {
logger.Error("command execution failed", slog.String("error", err.Error()))
os.Exit(1)
}
}
Loading

0 comments on commit 5c40217

Please sign in to comment.