Skip to content

Commit

Permalink
added fee rewards
Browse files Browse the repository at this point in the history
  • Loading branch information
johnstonematt committed Oct 8, 2024
1 parent 328074e commit c65c347
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 57 deletions.
5 changes: 5 additions & 0 deletions cmd/solana_exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,20 @@ func main() {
)
if *balanceAddresses != "" {
balAddresses = strings.Split(*balanceAddresses, ",")
klog.Infof("Monitoring balances for %v", balAddresses)
}
if *leaderSlotAddresses != "" {
lsAddresses = strings.Split(*leaderSlotAddresses, ",")
klog.Infof("Monitoring leader-slot by epoch for %v", lsAddresses)

}
if *inflationRewardAddresses != "" {
irAddresses = strings.Split(*inflationRewardAddresses, ",")
klog.Infof("Monitoring inflation reward by epoch for %v", irAddresses)
}
if *feeRewardAddresses != "" {
frAddresses = strings.Split(*feeRewardAddresses, ",")
klog.Infof("Monitoring fee reward by epoch for %v", frAddresses)
}

collector := NewSolanaCollector(*rpcAddr, balAddresses, lsAddresses, irAddresses, frAddresses)
Expand Down
34 changes: 14 additions & 20 deletions cmd/solana_exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
identityVotes = map[string]string{"aaa": "AAA", "bbb": "BBB", "ccc": "CCC"}
nv = len(identities)
staticEpochInfo = rpc.EpochInfo{
AbsoluteSlot: 166598,
AbsoluteSlot: 166599,
BlockHeight: 166500,
Epoch: 27,
SlotIndex: 2790,
Expand All @@ -70,25 +70,19 @@ var (
staticVoteAccounts = rpc.VoteAccounts{
Current: []rpc.VoteAccount{
{
ActivatedStake: 42,
Commission: 0,
EpochCredits: [][]int{
{1, 64, 0},
{2, 192, 64},
},
ActivatedStake: 42,
Commission: 0,
EpochCredits: [][]int{{1, 64, 0}, {2, 192, 64}},
EpochVoteAccount: true,
LastVote: 147,
NodePubkey: "bbb",
RootSlot: 18,
VotePubkey: "BBB",
},
{
ActivatedStake: 43,
Commission: 1,
EpochCredits: [][]int{
{2, 65, 1},
{3, 193, 65},
},
ActivatedStake: 43,
Commission: 1,
EpochCredits: [][]int{{2, 65, 1}, {3, 193, 65}},
EpochVoteAccount: true,
LastVote: 148,
NodePubkey: "ccc",
Expand All @@ -98,12 +92,9 @@ var (
},
Delinquent: []rpc.VoteAccount{
{
ActivatedStake: 49,
Commission: 2,
EpochCredits: [][]int{
{10, 594, 6},
{9, 98, 4},
},
ActivatedStake: 49,
Commission: 2,
EpochCredits: [][]int{{10, 594, 6}, {9, 98, 4}},
EpochVoteAccount: true,
LastVote: 92,
NodePubkey: "aaa",
Expand All @@ -112,6 +103,9 @@ var (
},
},
}
staticLeaderSchedule = map[string][]int64{
"aaa": {0, 3, 6, 9, 12}, "bbb": {1, 4, 7, 10, 13}, "ccc": {2, 5, 8, 11, 14},
}
)

/*
Expand Down Expand Up @@ -164,7 +158,7 @@ func (c *staticRPCClient) GetInflationReward(
func (c *staticRPCClient) GetLeaderSchedule(
ctx context.Context, commitment rpc.Commitment, slot int64,
) (map[string][]int64, error) {
return nil, nil
return staticLeaderSchedule, nil
}

//goland:noinspection GoUnusedParameter
Expand Down
126 changes: 89 additions & 37 deletions cmd/solana_exporter/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,17 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
<-ticker.C

ctx_, cancel := context.WithTimeout(ctx, httpTimeout)
epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentConfirmed)
// TODO: separate fee-rewards watching from general slot watching, such that general slot watching commitment level can be dropped to confirmed
epochInfo, err := c.client.GetEpochInfo(ctx_, rpc.CommitmentFinalized)
cancel()
if err != nil {
klog.Warningf("Failed to get epoch info, bailing out: %v", err)
klog.Errorf("Failed to get epoch info, bailing out: %v", err)
continue
}
cancel()

// if we are running for the first time, then we need to set our tracking numbers:
if c.currentEpoch == 0 {
c.trackEpoch(epochInfo)
c.trackEpoch(ctx, epochInfo)
}

totalTransactionsTotal.Set(float64(epochInfo.TransactionCount))
Expand All @@ -163,14 +165,15 @@ func (c *SlotWatcher) WatchSlots(ctx context.Context, pace time.Duration) {
}

// update block production metrics up until the current slot:
c.fetchAndEmitBlockProduction(ctx, epochInfo.AbsoluteSlot)
c.moveSlotWatermark(ctx, epochInfo.AbsoluteSlot)
}
}
}

// trackEpoch takes in a new rpc.EpochInfo and sets the SlotWatcher tracking metrics accordingly,
// and updates the prometheus gauges associated with those metrics.
func (c *SlotWatcher) trackEpoch(epoch *rpc.EpochInfo) {
func (c *SlotWatcher) trackEpoch(ctx context.Context, epoch *rpc.EpochInfo) {
klog.Infof("Tracking epoch %v (from %v)", epoch.Epoch, c.currentEpoch)
firstSlot, lastSlot := getEpochBounds(epoch)
// if we haven't yet set c.currentEpoch, that (hopefully) means this is the initial setup,
// and so we can simply store the tracking numbers
Expand Down Expand Up @@ -207,16 +210,29 @@ func (c *SlotWatcher) trackEpoch(epoch *rpc.EpochInfo) {
}

// emit epoch bounds:
klog.Infof("Emitting epoch bounds: %v (slots %v -> %v)", c.currentEpoch, c.firstSlot, c.lastSlot)
currentEpochNumber.Set(float64(c.currentEpoch))
epochFirstSlot.Set(float64(c.firstSlot))
epochLastSlot.Set(float64(c.lastSlot))

// update leader schedule:
ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()
klog.Infof("Updating leader schedule for epoch %v ...", c.currentEpoch)
leaderSchedule, err := GetTrimmedLeaderSchedule(
ctx, c.client, c.feeRewardAddresses, epoch.AbsoluteSlot, c.firstSlot,
)
if err != nil {
klog.Errorf("Failed to get trimmed leader schedule, bailing out: %v", err)
}
c.leaderSchedule = leaderSchedule
}

// closeCurrentEpoch is called when an epoch change-over happens, and we need to make sure we track the last
// remaining slots in the "current" epoch before we start tracking the new one.
func (c *SlotWatcher) closeCurrentEpoch(ctx context.Context, newEpoch *rpc.EpochInfo) {
c.fetchAndEmitBlockProduction(ctx, c.lastSlot)
c.trackEpoch(newEpoch)
c.moveSlotWatermark(ctx, c.lastSlot)
c.trackEpoch(ctx, newEpoch)
}

// checkValidSlotRange makes sure that the slot range we are going to query is within the current epoch we are tracking.
Expand All @@ -234,6 +250,13 @@ func (c *SlotWatcher) checkValidSlotRange(from, to int64) error {
return nil
}

// moveSlotWatermark performs all the slot-watching tasks required to move the slotWatermark to the provided 'to' slot.
func (c *SlotWatcher) moveSlotWatermark(ctx context.Context, to int64) {
c.fetchAndEmitBlockProduction(ctx, to)
c.fetchAndEmitFeeRewards(ctx, to)
c.slotWatermark = to
}

// fetchAndEmitBlockProduction fetches block production up to the provided endSlot, emits the prometheus metrics,
// and updates the SlotWatcher.slotWatermark accordingly
func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot int64) {
Expand All @@ -249,9 +272,10 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i
// fetch block production:
ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()
blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentConfirmed, nil, &startSlot, &endSlot)
blockProduction, err := c.client.GetBlockProduction(ctx, rpc.CommitmentFinalized, nil, &startSlot, &endSlot)
if err != nil {
klog.Warningf("Failed to get block production, bailing out: %v", err)
klog.Errorf("Failed to get block production, bailing out: %v", err)
return
}

// emit the metrics:
Expand All @@ -270,41 +294,39 @@ func (c *SlotWatcher) fetchAndEmitBlockProduction(ctx context.Context, endSlot i
}

klog.Infof("Fetched block production in [%v -> %v]", startSlot, endSlot)
// update the slot watermark:
c.slotWatermark = endSlot
}

// getEpochBounds returns the first slot and last slot within an [inclusive] Epoch
func getEpochBounds(info *rpc.EpochInfo) (int64, int64) {
firstSlot := info.AbsoluteSlot - info.SlotIndex
return firstSlot, firstSlot + info.SlotsInEpoch - 1
}

// fetchAndEmitInflationRewards fetches and emits the inflation rewards for the configured inflationRewardAddresses
// at the provided epoch
func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch int64) error {
klog.Infof("Fetching inflation reward for epoch %v ...", toString(epoch))

ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()
// fetchAndEmitFeeRewards fetches and emits all the fee rewards for the tracked addresses between the
// slotWatermark and endSlot
func (c *SlotWatcher) fetchAndEmitFeeRewards(ctx context.Context, endSlot int64) {
startSlot := c.slotWatermark + 1
klog.Infof("Fetching fee rewards in [%v -> %v]", startSlot, endSlot)

rewardInfos, err := c.client.GetInflationReward(
ctx, rpc.CommitmentConfirmed, c.inflationRewardAddresses, &epoch, nil,
)
if err != nil {
return err
if err := c.checkValidSlotRange(startSlot, endSlot); err != nil {
klog.Fatalf("invalid slot range: %v", err)
}
scheduleToFetch := SelectFromSchedule(c.leaderSchedule, startSlot, endSlot)
for identity, leaderSlots := range scheduleToFetch {
if len(leaderSlots) == 0 {
continue
}

for i, rewardInfo := range rewardInfos {
address := c.inflationRewardAddresses[i]
reward := float64(rewardInfo.Amount) / float64(rpc.LamportsInSol)
inflationRewards.WithLabelValues(address, toString(epoch)).Set(reward)
klog.Infof("Fetching fee rewards for %v in [%v -> %v]: %v ...", identity, startSlot, endSlot, leaderSlots)
for _, slot := range leaderSlots {
ctx, cancel := context.WithTimeout(ctx, httpTimeout)
err := c.fetchAndEmitSingleFeeReward(ctx, identity, c.currentEpoch, slot)
cancel()
if err != nil {
klog.Errorf("Failed to fetch fee rewards for %v at %v: %v", identity, slot, err)
}
}
}
klog.Infof("Fetched inflation reward for epoch %v.", epoch)
return nil

klog.Infof("Fetched fee rewards in [%v -> %v]", startSlot, endSlot)
}

func (c *SlotWatcher) fetchAndEmitFeeReward(
// fetchAndEmitSingleFeeReward fetches and emits the fee reward for a single block.
func (c *SlotWatcher) fetchAndEmitSingleFeeReward(
ctx context.Context, identity string, epoch int64, slot int64,
) error {
block, err := c.client.GetBlock(ctx, rpc.CommitmentConfirmed, slot)
Expand All @@ -328,3 +350,33 @@ func (c *SlotWatcher) fetchAndEmitFeeReward(

return nil
}

// getEpochBounds returns the first slot and last slot within an [inclusive] Epoch
func getEpochBounds(info *rpc.EpochInfo) (int64, int64) {
firstSlot := info.AbsoluteSlot - info.SlotIndex
return firstSlot, firstSlot + info.SlotsInEpoch - 1
}

// fetchAndEmitInflationRewards fetches and emits the inflation rewards for the configured inflationRewardAddresses
// at the provided epoch
func (c *SlotWatcher) fetchAndEmitInflationRewards(ctx context.Context, epoch int64) error {
klog.Infof("Fetching inflation reward for epoch %v ...", toString(epoch))

ctx, cancel := context.WithTimeout(ctx, httpTimeout)
defer cancel()

rewardInfos, err := c.client.GetInflationReward(
ctx, rpc.CommitmentConfirmed, c.inflationRewardAddresses, &epoch, nil,
)
if err != nil {
return err
}

for i, rewardInfo := range rewardInfos {
address := c.inflationRewardAddresses[i]
reward := float64(rewardInfo.Amount) / float64(rpc.LamportsInSol)
inflationRewards.WithLabelValues(address, toString(epoch)).Set(reward)
}
klog.Infof("Fetched inflation reward for epoch %v.", epoch)
return nil
}
46 changes: 46 additions & 0 deletions cmd/solana_exporter/utils.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package main

import (
"context"
"fmt"
"github.com/asymmetric-research/solana_exporter/pkg/rpc"
"k8s.io/klog/v2"
)

Expand All @@ -11,6 +13,50 @@ func assertf(condition bool, format string, args ...any) {
}
}

// toString is just a simple utility function for converting int -> string
func toString(i int64) string {
return fmt.Sprintf("%v", i)
}

// SelectFromSchedule takes a leader-schedule and returns a trimmed leader-schedule
// containing only the slots within the provided range
func SelectFromSchedule(schedule map[string][]int64, startSlot, endSlot int64) map[string][]int64 {
selected := make(map[string][]int64)
for key, values := range schedule {
var selectedValues []int64
for _, value := range values {
if value >= startSlot && value <= endSlot {
selectedValues = append(selectedValues, value)
}
}
selected[key] = selectedValues
}
return selected
}

// GetTrimmedLeaderSchedule fetches the leader schedule, but only for the validators we are interested in.
// Additionally, it adjusts the leader schedule to the current epoch offset.
func GetTrimmedLeaderSchedule(
ctx context.Context, client rpc.Provider, identities []string, slot, epochFirstSlot int64,
) (map[string][]int64, error) {
leaderSchedule, err := client.GetLeaderSchedule(ctx, rpc.CommitmentConfirmed, slot)
if err != nil {
return nil, fmt.Errorf("failed to get leader schedule: %w", err)
}

trimmedLeaderSchedule := make(map[string][]int64)
for _, id := range identities {
if leaderSlots, ok := leaderSchedule[id]; ok {
// when you fetch the leader schedule, it gives you slot indexes, we want absolute slots:
absoluteSlots := make([]int64, len(leaderSlots))
for i, slotIndex := range leaderSlots {
absoluteSlots[i] = slotIndex + epochFirstSlot
}
trimmedLeaderSchedule[id] = absoluteSlots
} else {
klog.Warningf("failed to find leader slots for %v", id)
}
}

return trimmedLeaderSchedule, nil
}
24 changes: 24 additions & 0 deletions cmd/solana_exporter/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"context"
"github.com/stretchr/testify/assert"
"testing"
)

func TestSelectFromSchedule(t *testing.T) {
selected := SelectFromSchedule(staticLeaderSchedule, 5, 10)
assert.Equal(t,
map[string][]int64{"aaa": {6, 9}, "bbb": {7, 10}, "ccc": {5, 8}},
selected,
)
}

func TestGetTrimmedLeaderSchedule(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
schedule, err := GetTrimmedLeaderSchedule(ctx, &staticRPCClient{}, []string{"aaa", "bbb"}, 10, 10)
assert.NoError(t, err)

assert.Equal(t, map[string][]int64{"aaa": {10, 13, 16, 19, 22}, "bbb": {11, 14, 17, 20, 23}}, schedule)
}

0 comments on commit c65c347

Please sign in to comment.