Skip to content

Commit

Permalink
Merge pull request #32 from asymmetric-research/address-configurable
Browse files Browse the repository at this point in the history
Address configurable leader slots tracking (3)
  • Loading branch information
johnstonematt authored Oct 4, 2024
2 parents 43ffbb9 + 4004494 commit b331072
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 34 deletions.
59 changes: 41 additions & 18 deletions cmd/solana_exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,28 @@ import (
)

var (
httpTimeout = 60 * time.Second
rpcAddr = flag.String("rpcURI", "", "Solana RPC URI (including protocol and path)")
addr = flag.String("addr", ":8080", "Listen address")
votePubkey = flag.String("votepubkey", "", "Validator vote address (will only return results of this address)")
httpTimeoutSecs = flag.Int("http_timeout", 60, "HTTP timeout in seconds")
balanceAddresses = flag.String("balance-addresses", "", "Comma-separated list of addresses to monitor balances")
httpTimeout = 60 * time.Second
rpcAddr = flag.String("rpcURI", "", "Solana RPC URI (including protocol and path)")
addr = flag.String("addr", ":8080", "Listen address")
votePubkey = flag.String("votepubkey", "", "Validator vote address (will only return results of this address)")
httpTimeoutSecs = flag.Int("http_timeout", 60, "HTTP timeout in seconds")
balanceAddresses = flag.String("balance-addresses", "", "Comma-separated list of addresses to monitor balances")
leaderSlotAddresses = flag.String(
"leader-slot-addresses",
"",
"Comma-separated list of addresses to monitor leader slots by epoch for, leave nil to track by epoch for all validators (this creates a lot of Prometheus metrics with every new epoch).",
)
)

func init() {
klog.InitFlags(nil)
}

type solanaCollector struct {
rpcClient rpc.Provider
slotPace time.Duration
balanceAddresses []string
rpcClient rpc.Provider
slotPace time.Duration
balanceAddresses []string
leaderSlotAddresses []string

totalValidatorsDesc *prometheus.Desc
validatorActivatedStake *prometheus.Desc
Expand All @@ -40,11 +46,14 @@ type solanaCollector struct {
balances *prometheus.Desc
}

func createSolanaCollector(provider rpc.Provider, slotPace time.Duration, balanceAddresses []string) *solanaCollector {
func createSolanaCollector(
provider rpc.Provider, slotPace time.Duration, balanceAddresses []string, leaderSlotAddresses []string,
) *solanaCollector {
return &solanaCollector{
rpcClient: provider,
slotPace: slotPace,
balanceAddresses: balanceAddresses,
rpcClient: provider,
slotPace: slotPace,
balanceAddresses: balanceAddresses,
leaderSlotAddresses: leaderSlotAddresses,
totalValidatorsDesc: prometheus.NewDesc(
"solana_active_validators",
"Total number of active validators by state",
Expand Down Expand Up @@ -90,8 +99,8 @@ func createSolanaCollector(provider rpc.Provider, slotPace time.Duration, balanc
}
}

func NewSolanaCollector(rpcAddr string, balanceAddresses []string) *solanaCollector {
return createSolanaCollector(rpc.NewRPCClient(rpcAddr), slotPacerSchedule, balanceAddresses)
func NewSolanaCollector(rpcAddr string, balanceAddresses []string, leaderSlotAddresses []string) *solanaCollector {
return createSolanaCollector(rpc.NewRPCClient(rpcAddr), slotPacerSchedule, balanceAddresses, leaderSlotAddresses)
}

func (c *solanaCollector) Describe(ch chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -214,13 +223,27 @@ func main() {
klog.Fatal("Please specify -rpcURI")
}

if *leaderSlotAddresses == "" {
klog.Warning(
"Not specifying leader-slot-addresses will lead to potentially thousands of new " +
"Prometheus metrics being created every epoch.",
)
}

httpTimeout = time.Duration(*httpTimeoutSecs) * time.Second

var monitoredAddresses []string
var (
balAddresses []string
lsAddresses []string
)
if *balanceAddresses != "" {
monitoredAddresses = strings.Split(*balanceAddresses, ",")
balAddresses = strings.Split(*balanceAddresses, ",")
}
if *leaderSlotAddresses != "" {
lsAddresses = strings.Split(*leaderSlotAddresses, ",")
}
collector := NewSolanaCollector(*rpcAddr, monitoredAddresses)

collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses)

go collector.WatchSlots(context.Background())

Expand Down
4 changes: 2 additions & 2 deletions cmd/solana_exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func runCollectionTests(t *testing.T, collector prometheus.Collector, testCases
}

func TestSolanaCollector_Collect_Static(t *testing.T) {
collector := createSolanaCollector(&staticRPCClient{}, slotPacerSchedule, identities)
collector := createSolanaCollector(&staticRPCClient{}, slotPacerSchedule, identities, []string{})
prometheus.NewPedanticRegistry().MustRegister(collector)

testCases := []collectionTest{
Expand Down Expand Up @@ -429,7 +429,7 @@ solana_validator_delinquent{nodekey="ccc",pubkey="CCC"} 0

func TestSolanaCollector_Collect_Dynamic(t *testing.T) {
client := newDynamicRPCClient()
collector := createSolanaCollector(client, slotPacerSchedule, identities)
collector := createSolanaCollector(client, slotPacerSchedule, identities, []string{})
prometheus.NewPedanticRegistry().MustRegister(collector)

// start off by testing initial state:
Expand Down
19 changes: 11 additions & 8 deletions cmd/solana_exporter/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"slices"
"time"

"github.com/asymmetric-research/solana_exporter/pkg/rpc"
Expand Down Expand Up @@ -87,7 +88,7 @@ func (c *solanaCollector) WatchSlots(ctx context.Context) {
epochLastSlot.Set(float64(lastSlot))

klog.Infof("Starting at slot %d in epoch %d (%d-%d)", firstSlot, currentEpoch, firstSlot, lastSlot)
_, err = updateCounters(c.rpcClient, currentEpoch, watermark, &lastSlot)
_, err = c.updateCounters(currentEpoch, watermark, &lastSlot)
if err != nil {
klog.Error(err)
}
Expand Down Expand Up @@ -126,7 +127,7 @@ func (c *solanaCollector) WatchSlots(ctx context.Context) {
lastSlot,
)

last, err := updateCounters(c.rpcClient, currentEpoch, watermark, &lastSlot)
last, err := c.updateCounters(currentEpoch, watermark, &lastSlot)
if err != nil {
klog.Error(err)
continue
Expand All @@ -153,7 +154,7 @@ func (c *solanaCollector) WatchSlots(ctx context.Context) {
totalTransactionsTotal.Set(float64(info.TransactionCount))
confirmedSlotHeight.Set(float64(info.AbsoluteSlot))

last, err := updateCounters(c.rpcClient, currentEpoch, watermark, nil)
last, err := c.updateCounters(currentEpoch, watermark, nil)
if err != nil {
klog.Info(err)
continue
Expand Down Expand Up @@ -183,15 +184,15 @@ func getEpochBounds(info *rpc.EpochInfo) (int64, int64, int64) {
return info.Epoch, firstSlot, lastSlot
}

func updateCounters(c rpc.Provider, epoch, firstSlot int64, lastSlotOpt *int64) (int64, error) {
func (c *solanaCollector) updateCounters(epoch, firstSlot int64, lastSlotOpt *int64) (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), httpTimeout)
defer cancel()

var lastSlot int64
var err error

if lastSlotOpt == nil {
lastSlot, err = c.GetSlot(ctx)
lastSlot, err = c.rpcClient.GetSlot(ctx)

if err != nil {
return 0, fmt.Errorf("error while getting the last slot: %v", err)
Expand All @@ -214,7 +215,7 @@ func updateCounters(c rpc.Provider, epoch, firstSlot int64, lastSlotOpt *int64)
ctx, cancel = context.WithTimeout(context.Background(), httpTimeout)
defer cancel()

blockProduction, err := c.GetBlockProduction(ctx, &firstSlot, &lastSlot)
blockProduction, err := c.rpcClient.GetBlockProduction(ctx, &firstSlot, &lastSlot)
if err != nil {
return 0, fmt.Errorf("failed to fetch block production, retrying: %v", err)
}
Expand All @@ -228,8 +229,10 @@ func updateCounters(c rpc.Provider, epoch, firstSlot int64, lastSlotOpt *int64)
leaderSlotsTotal.WithLabelValues("valid", host).Add(valid)
leaderSlotsTotal.WithLabelValues("skipped", host).Add(skipped)

leaderSlotsByEpoch.WithLabelValues("valid", host, epochStr).Add(valid)
leaderSlotsByEpoch.WithLabelValues("skipped", host, epochStr).Add(skipped)
if len(c.leaderSlotAddresses) == 0 || slices.Contains(c.leaderSlotAddresses, host) {
leaderSlotsByEpoch.WithLabelValues("valid", host, epochStr).Add(valid)
leaderSlotsByEpoch.WithLabelValues("skipped", host, epochStr).Add(skipped)
}

klog.V(1).Infof(
"Epoch %s, slots %d-%d, node %s: Added %d valid and %d skipped slots",
Expand Down
8 changes: 2 additions & 6 deletions cmd/solana_exporter/slots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,7 @@ func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
leaderSlotsTotal.Reset()
leaderSlotsByEpoch.Reset()

collector := createSolanaCollector(
&staticRPCClient{},
100*time.Millisecond,
identities,
)
collector := createSolanaCollector(&staticRPCClient{}, 100*time.Millisecond, identities, []string{})
prometheus.NewPedanticRegistry().MustRegister(collector)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -149,7 +145,7 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {

// create clients:
client := newDynamicRPCClient()
collector := createSolanaCollector(client, 300*time.Millisecond, identities)
collector := createSolanaCollector(client, 300*time.Millisecond, identities, []string{})
prometheus.NewPedanticRegistry().MustRegister(collector)

// start client/collector and wait a bit:
Expand Down

0 comments on commit b331072

Please sign in to comment.