Skip to content

Commit

Permalink
Merge pull request #36 from asymmetric-research/inflation-rewards
Browse files Browse the repository at this point in the history
Added inflation rewards
  • Loading branch information
johnstonematt authored Oct 7, 2024
2 parents 01db171 + ab227c7 commit d77b929
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 29 deletions.
59 changes: 42 additions & 17 deletions cmd/solana_exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,28 @@ import (
)

var (
httpTimeout = 60 * time.Second
rpcAddr = flag.String("rpcURI", "", "Solana RPC URI (including protocol and path)")
addr = flag.String("addr", ":8080", "Listen address")
votePubkey = flag.String("votepubkey", "", "Validator vote address (will only return results of this address)")
httpTimeoutSecs = flag.Int("http_timeout", 60, "HTTP timeout in seconds")
balanceAddresses = flag.String("balance-addresses", "", "Comma-separated list of addresses to monitor balances")
httpTimeout = 60 * time.Second
rpcAddr = flag.String("rpcURI", "", "Solana RPC URI (including protocol and path)")
addr = flag.String("addr", ":8080", "Listen address")
votePubkey = flag.String("votepubkey", "", "Validator vote address (will only return results of this address)")
httpTimeoutSecs = flag.Int("http_timeout", 60, "HTTP timeout in seconds")

// addresses:
balanceAddresses = flag.String(
"balance-addresses",
"",
"Comma-separated list of addresses to monitor SOL balances.",
)
leaderSlotAddresses = flag.String(
"leader-slot-addresses",
"",
"Comma-separated list of addresses to monitor leader slots by epoch for, leave nil to track by epoch for all validators (this creates a lot of Prometheus metrics with every new epoch).",
)
inflationRewardAddresses = flag.String(
"inflation-reward-addresses",
"",
"Comma-separated list of validator vote accounts to track inflationary rewards for",
)
)

func init() {
Expand All @@ -35,9 +46,10 @@ type solanaCollector struct {
rpcClient rpc.Provider

// config:
slotPace time.Duration
balanceAddresses []string
leaderSlotAddresses []string
slotPace time.Duration
balanceAddresses []string
leaderSlotAddresses []string
inflationRewardAddresses []string

/// descriptors:
totalValidatorsDesc *prometheus.Desc
Expand All @@ -50,13 +62,18 @@ type solanaCollector struct {
}

func createSolanaCollector(
provider rpc.Provider, slotPace time.Duration, balanceAddresses []string, leaderSlotAddresses []string,
provider rpc.Provider,
slotPace time.Duration,
balanceAddresses []string,
leaderSlotAddresses []string,
inflationRewardAddresses []string,
) *solanaCollector {
return &solanaCollector{
rpcClient: provider,
slotPace: slotPace,
balanceAddresses: balanceAddresses,
leaderSlotAddresses: leaderSlotAddresses,
rpcClient: provider,
slotPace: slotPace,
balanceAddresses: balanceAddresses,
leaderSlotAddresses: leaderSlotAddresses,
inflationRewardAddresses: inflationRewardAddresses,
totalValidatorsDesc: prometheus.NewDesc(
"solana_active_validators",
"Total number of active validators by state",
Expand Down Expand Up @@ -102,8 +119,12 @@ func createSolanaCollector(
}
}

func NewSolanaCollector(rpcAddr string, balanceAddresses []string, leaderSlotAddresses []string) *solanaCollector {
return createSolanaCollector(rpc.NewRPCClient(rpcAddr), slotPacerSchedule, balanceAddresses, leaderSlotAddresses)
func NewSolanaCollector(
rpcAddr string, balanceAddresses []string, leaderSlotAddresses []string, inflationRewardAddresses []string,
) *solanaCollector {
return createSolanaCollector(
rpc.NewRPCClient(rpcAddr), slotPacerSchedule, balanceAddresses, leaderSlotAddresses, inflationRewardAddresses,
)
}

func (c *solanaCollector) Describe(ch chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -233,15 +254,19 @@ func main() {
var (
balAddresses []string
lsAddresses []string
irAddresses []string
)
if *balanceAddresses != "" {
balAddresses = strings.Split(*balanceAddresses, ",")
}
if *leaderSlotAddresses != "" {
lsAddresses = strings.Split(*leaderSlotAddresses, ",")
}
if *inflationRewardAddresses != "" {
irAddresses = strings.Split(*inflationRewardAddresses, ",")
}

collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses)
collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses, irAddresses)

slotWatcher := NewCollectorSlotWatcher(collector)
go slotWatcher.WatchSlots(context.Background(), collector.slotPace)
Expand Down
24 changes: 22 additions & 2 deletions cmd/solana_exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type (

var (
identities = []string{"aaa", "bbb", "ccc"}
votekeys = []string{"AAA", "BBB", "CCC"}
balances = map[string]float64{"aaa": 1, "bbb": 2, "ccc": 3}
identityVotes = map[string]string{"aaa": "AAA", "bbb": "BBB", "ccc": "CCC"}
nv = len(identities)
Expand All @@ -61,6 +62,11 @@ var (
},
Range: rpc.BlockProductionRange{FirstSlot: 1000, LastSlot: 2000},
}
staticInflationRewards = []rpc.InflationReward{
{Amount: 1000, EffectiveSlot: 166598, Epoch: 27, PostBalance: 2000},
{Amount: 2000, EffectiveSlot: 166598, Epoch: 27, PostBalance: 4000},
{Amount: 3000, EffectiveSlot: 166598, Epoch: 27, PostBalance: 6000},
}
staticVoteAccounts = rpc.VoteAccounts{
Current: []rpc.VoteAccount{
{
Expand Down Expand Up @@ -147,6 +153,13 @@ func (c *staticRPCClient) GetBalance(ctx context.Context, address string) (float
return balances[address], nil
}

//goland:noinspection GoUnusedParameter
func (c *staticRPCClient) GetInflationReward(
ctx context.Context, addresses []string, commitment rpc.Commitment, epoch *int64, minContextSlot *int64,
) ([]rpc.InflationReward, error) {
return staticInflationRewards, nil
}

/*
===== DYNAMIC CLIENT =====:
*/
Expand Down Expand Up @@ -321,6 +334,13 @@ func (c *dynamicRPCClient) GetBalance(ctx context.Context, address string) (floa
return balances[address], nil
}

//goland:noinspection GoUnusedParameter
func (c *dynamicRPCClient) GetInflationReward(
ctx context.Context, addresses []string, commitment rpc.Commitment, epoch *int64, minContextSlot *int64,
) ([]rpc.InflationReward, error) {
return staticInflationRewards, nil
}

/*
===== OTHER TEST UTILITIES =====:
*/
Expand Down Expand Up @@ -356,7 +376,7 @@ func runCollectionTests(t *testing.T, collector prometheus.Collector, testCases
}

func TestSolanaCollector_Collect_Static(t *testing.T) {
collector := createSolanaCollector(&staticRPCClient{}, slotPacerSchedule, identities, []string{})
collector := createSolanaCollector(&staticRPCClient{}, slotPacerSchedule, identities, []string{}, votekeys)
prometheus.NewPedanticRegistry().MustRegister(collector)

testCases := []collectionTest{
Expand Down Expand Up @@ -434,7 +454,7 @@ solana_account_balance{address="ccc"} 3

func TestSolanaCollector_Collect_Dynamic(t *testing.T) {
client := newDynamicRPCClient()
collector := createSolanaCollector(client, slotPacerSchedule, identities, []string{})
collector := createSolanaCollector(client, slotPacerSchedule, identities, []string{}, votekeys)
prometheus.NewPedanticRegistry().MustRegister(collector)

// start off by testing initial state:
Expand Down
51 changes: 49 additions & 2 deletions cmd/solana_exporter/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ const (
type SlotWatcher struct {
client rpc.Provider

leaderSlotAddresses []string
// config:
leaderSlotAddresses []string
inflationRewardAddresses []string

// currentEpoch is the current epoch we are watching
currentEpoch int64
Expand Down Expand Up @@ -71,10 +73,22 @@ var (
},
[]string{"status", "nodekey", "epoch"},
)

inflationRewards = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "solana_inflation_rewards",
Help: "Inflation reward earned per validator vote account, per epoch",
},
[]string{"votekey", "epoch"},
)
)

func NewCollectorSlotWatcher(collector *solanaCollector) *SlotWatcher {
return &SlotWatcher{client: collector.rpcClient, leaderSlotAddresses: collector.leaderSlotAddresses}
return &SlotWatcher{
client: collector.rpcClient,
leaderSlotAddresses: collector.leaderSlotAddresses,
inflationRewardAddresses: collector.inflationRewardAddresses,
}
}

func init() {
Expand All @@ -85,6 +99,7 @@ func init() {
prometheus.MustRegister(epochLastSlot)
prometheus.MustRegister(leaderSlotsTotal)
prometheus.MustRegister(leaderSlotsByEpoch)
prometheus.MustRegister(inflationRewards)
}

func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
Expand Down Expand Up @@ -124,6 +139,13 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
}

if epochInfo.Epoch > c.currentEpoch {
// if we have configured inflation reward addresses, fetch em
if len(c.inflationRewardAddresses) > 0 {
err = c.fetchAndEmitInflationRewards(ctx, c.currentEpoch)
if err != nil {
klog.Errorf("Failed to emit inflation rewards, bailing out: %v", err)
}
}
c.closeCurrentEpoch(ctx, epochInfo)
}

Expand Down Expand Up @@ -245,3 +267,28 @@ func getEpochBounds(info *rpc.EpochInfo) (int64, int64) {
firstSlot := info.AbsoluteSlot - info.SlotIndex
return firstSlot, firstSlot + info.SlotsInEpoch - 1
}

// fetchAndEmitInflationRewards fetches and emits the inflation rewards for the configured inflationRewardAddresses
// at the provided epoch
func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch int64) error {
epochStr := fmt.Sprintf("%d", epoch)
klog.Infof("Fetching inflation reward for epoch %v ...", epochStr)

ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()

rewardInfos, err := c.client.GetInflationReward(
ctx, c.inflationRewardAddresses, rpc.CommitmentFinalized, &epoch, nil,
)
if err != nil {
return fmt.Errorf("error fetching inflation rewards: %w", err)
}

for i, rewardInfo := range rewardInfos {
address := c.inflationRewardAddresses[i]
reward := float64(rewardInfo.Amount) / float64(rpc.LamportsInSol)
inflationRewards.WithLabelValues(address, epochStr).Set(reward)
}
klog.Infof("Fetched inflation reward for epoch %v.", epochStr)
return nil
}
26 changes: 21 additions & 5 deletions cmd/solana_exporter/slots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -91,26 +92,41 @@ func TestSolanaCollector_WatchSlots_Static(t *testing.T) {
leaderSlotsTotal.Reset()
leaderSlotsByEpoch.Reset()

collector := createSolanaCollector(&staticRPCClient{}, 100*time.Millisecond, identities, []string{})
collector := createSolanaCollector(&staticRPCClient{}, 100*time.Millisecond, identities, []string{}, votekeys)
watcher := NewCollectorSlotWatcher(collector)
prometheus.NewPedanticRegistry().MustRegister(collector)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go watcher.WatchSlots(ctx, collector.slotPace)

// make sure inflation rewards are collected:
err := watcher.fetchAndEmitInflationRewards(ctx, staticEpochInfo.Epoch)
assert.NoError(t, err)
time.Sleep(1 * time.Second)

firstSlot, lastSlot := getEpochBounds(&staticEpochInfo)
tests := []struct {
type testCase struct {
expectedValue float64
metric prometheus.Gauge
}{
}
tests := []testCase{
{expectedValue: float64(staticEpochInfo.AbsoluteSlot), metric: confirmedSlotHeight},
{expectedValue: float64(staticEpochInfo.TransactionCount), metric: totalTransactionsTotal},
{expectedValue: float64(staticEpochInfo.Epoch), metric: currentEpochNumber},
{expectedValue: float64(firstSlot), metric: epochFirstSlot},
{expectedValue: float64(lastSlot), metric: epochLastSlot},
}

// add inflation reward tests:
for i, rewardInfo := range staticInflationRewards {
epoch := fmt.Sprintf("%v", staticEpochInfo.Epoch)
test := testCase{
expectedValue: float64(rewardInfo.Amount) / float64(rpc.LamportsInSol),
metric: inflationRewards.WithLabelValues(votekeys[i], epoch),
}
tests = append(tests, test)
}

for _, testCase := range tests {
name := extractName(testCase.metric.Desc())
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -145,8 +161,8 @@ func TestSolanaCollector_WatchSlots_Dynamic(t *testing.T) {

// create clients:
client := newDynamicRPCClient()
collector := createSolanaCollector(client, 300*time.Millisecond, identities, []string{})
watcher := SlotWatcher{client: client}
collector := createSolanaCollector(client, 300*time.Millisecond, identities, []string{}, votekeys)
watcher := NewCollectorSlotWatcher(collector)
prometheus.NewPedanticRegistry().MustRegister(collector)

// start client/collector and wait a bit:
Expand Down
29 changes: 28 additions & 1 deletion pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,21 @@ type Provider interface {

// GetBalance returns the SOL balance of the account at the provided address
GetBalance(ctx context.Context, address string) (float64, error)

// GetInflationReward returns the inflation rewards (in lamports) awarded to the given addresses (vote accounts)
// during the given epoch.
GetInflationReward(
ctx context.Context, addresses []string, commitment Commitment, epoch *int64, minContextSlot *int64,
) ([]InflationReward, error)
}

func (c Commitment) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{"commitment": string(c)})
}

const (
// LamportsInSol is the number of lamports in 1 SOL (a billion)
LamportsInSol = 1_000_000_000
// CommitmentFinalized level offers the highest level of certainty for a transaction on the Solana blockchain.
// A transaction is considered “Finalized” when it is included in a block that has been confirmed by a
// supermajority of the stake, and at least 31 additional confirmed blocks have been built on top of it.
Expand Down Expand Up @@ -216,5 +224,24 @@ func (c *Client) GetBalance(ctx context.Context, address string) (float64, error
if err := c.getResponse(ctx, "getBalance", []any{address}, &resp); err != nil {
return 0, err
}
return float64(resp.Result.Value / 1_000_000_000), nil
return float64(resp.Result.Value) / float64(LamportsInSol), nil
}

func (c *Client) GetInflationReward(
ctx context.Context, addresses []string, commitment Commitment, epoch *int64, minContextSlot *int64,
) ([]InflationReward, error) {
// format params:
config := map[string]any{"commitment": string(commitment)}
if epoch != nil {
config["epoch"] = *epoch
}
if minContextSlot != nil {
config["minContextSlot"] = *minContextSlot
}

var resp response[[]InflationReward]
if err := c.getResponse(ctx, "getInflationReward", []any{addresses, config}, &resp); err != nil {
return nil, err
}
return resp.Result, nil
}
13 changes: 11 additions & 2 deletions pkg/rpc/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (

type (
response[T any] struct {
Result T `json:"result"`
Error rpcError `json:"error"`
jsonrpc string
Result T `json:"result"`
Error rpcError `json:"error"`
Id int `json:"id"`
}

contextualResult[T any] struct {
Expand Down Expand Up @@ -63,6 +65,13 @@ type (
ByIdentity map[string]HostProduction `json:"byIdentity"`
Range BlockProductionRange `json:"range"`
}

InflationReward struct {
Amount int64 `json:"amount"`
EffectiveSlot int64 `json:"effectiveSlot"`
Epoch int64 `json:"epoch"`
PostBalance int64 `json:"postBalance"`
}
)

func (hp *HostProduction) UnmarshalJSON(data []byte) error {
Expand Down

0 comments on commit d77b929

Please sign in to comment.