Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sentry): Add summary logs #395

Merged
merged 7 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9
github.com/creasty/defaults v1.7.0
github.com/ethereum/go-ethereum v1.14.10
github.com/ethpandaops/beacon v0.41.0
github.com/ethpandaops/beacon v0.42.0
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756
github.com/ethpandaops/ethwallclock v0.3.0
github.com/go-co-op/gocron v1.27.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ github.com/ethereum/go-ethereum v1.14.10 h1:kC24WjYeRjDy86LVo6MfF5Xs7nnUu+XG4Aja
github.com/ethereum/go-ethereum v1.14.10/go.mod h1:+l/fr42Mma+xBnhefL/+z11/hcmJ2egl+ScIVPjhc7E=
github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 h1:8NfxH2iXvJ60YRB8ChToFTUzl8awsc3cJ8CbLjGIl/A=
github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk=
github.com/ethpandaops/beacon v0.41.0 h1:9CmgNeTZ6X+B1U7SOJzy3rf6WFtFb3CA2DTFEgGwLc8=
github.com/ethpandaops/beacon v0.41.0/go.mod h1:hKfalJGsF4BuWPwcGCX/4fdQR31zDJVaTLWwrkfNTzw=
github.com/ethpandaops/beacon v0.42.0 h1:5a3ld5wuAgX+N5KxEPuNfxDhdeiBG4gXlTAgCm0AuSE=
github.com/ethpandaops/beacon v0.42.0/go.mod h1:hKfalJGsF4BuWPwcGCX/4fdQR31zDJVaTLWwrkfNTzw=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m0oxOk03m11n/xgdY5ceyUf/ZxYdOs5gE=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
Expand Down
25 changes: 22 additions & 3 deletions pkg/sentry/ethereum/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ func (b *BeaconNode) Start(ctx context.Context) error {
s := gocron.NewScheduler(time.Local)

errs := make(chan error, 1)
healthyFirstTime := make(chan struct{})

b.beacon.OnFirstTimeHealthy(ctx, func(ctx context.Context, event *beacon.FirstTimeHealthyEvent) error {
b.log.Info("Upstream beacon node is healthy")

close(healthyFirstTime)

go func() {
wg := sync.WaitGroup{}

for _, service := range b.services {
Expand All @@ -88,6 +93,8 @@ func (b *BeaconNode) Start(ctx context.Context) error {
errs <- fmt.Errorf("failed to start service: %w", err)
}

b.log.WithField("service", service.Name()).Info("Waiting for service to be ready")

wg.Wait()
}

Expand All @@ -98,14 +105,26 @@ func (b *BeaconNode) Start(ctx context.Context) error {
errs <- fmt.Errorf("failed to run on ready callback: %w", err)
}
}
}()

return nil
})

s.StartAsync()

if err := b.beacon.Start(ctx); err != nil {
b.beacon.StartAsync(ctx)

select {
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
case <-healthyFirstTime:
// Beacon node is healthy, continue with normal operation
case <-time.After(10 * time.Minute):
return errors.New("upstream beacon node is not healthy. check your configuration.")
}

// Wait for any errors after the first healthy event
select {
case err := <-errs:
return err
Expand Down
6 changes: 5 additions & 1 deletion pkg/sentry/ethereum/services/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ func (m *MetadataService) Start(ctx context.Context) error {
return nil
}

if err := backoff.Retry(operation, backoff.NewExponentialBackOff()); err != nil {
if err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) {
m.log.WithError(err).Warnf("Failed to refresh metadata, retrying in %s", duration)
}); err != nil {
m.log.WithError(err).Warn("Failed to refresh metadata")
}

m.log.Info("Metadata service is ready")

for _, cb := range m.onReadyCallbacks {
if err := cb(ctx); err != nil {
m.log.WithError(err).Warn("Failed to execute onReady callback")
Expand Down
20 changes: 19 additions & 1 deletion pkg/sentry/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Sentry struct {
preset *Preset

shutdownFuncs []func(context.Context) error

summary *Summary
}

func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides *Override) (*Sentry, error) {
Expand Down Expand Up @@ -179,6 +181,7 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides
latestForkChoiceMu: sync.RWMutex{},
shutdownFuncs: []func(context.Context) error{},
preset: preset,
summary: NewSummary(log, time.Duration(60)*time.Second, b),
}

// If the output authorization override is set, use it
Expand Down Expand Up @@ -255,6 +258,12 @@ func (s *Sentry) Start(ctx context.Context) error {
return err
}

s.beacon.Node().OnEvent(ctx, func(ctx context.Context, event *eth2v1.Event) error {
s.summary.AddEventStreamEvents(event.Topic, 1)

return nil
})

s.beacon.OnReady(ctx, func(ctx context.Context) error {
s.log.Info("Internal beacon node is ready, subscribing to events")

Expand Down Expand Up @@ -554,6 +563,8 @@ func (s *Sentry) Start(ctx context.Context) error {
}
}

go s.summary.Start(ctx)

if s.Config.Ethereum.OverrideNetworkName != "" {
s.log.WithField("network", s.Config.Ethereum.OverrideNetworkName).Info("Overriding network name")
}
Expand Down Expand Up @@ -700,7 +711,12 @@ func (s *Sentry) syncClockDrift(_ context.Context) error {
}

s.clockDrift = response.ClockOffset
s.log.WithField("drift", s.clockDrift).Info("Updated clock drift")

s.log.WithField("drift", s.clockDrift).Debug("Updated clock drift")

if s.clockDrift > 2*time.Second || s.clockDrift < -2*time.Second {
s.log.WithField("drift", s.clockDrift).Warn("Large clock drift detected, consider configuring an NTP server on your instance")
}

return err
}
Expand All @@ -724,6 +740,8 @@ func (s *Sentry) handleNewDecoratedEvent(ctx context.Context, event *xatu.Decora

s.metrics.AddDecoratedEvent(1, eventType, networkStr)

s.summary.AddEventsExported(1)

for _, sink := range s.sinks {
if err := sink.HandleNewDecoratedEvent(ctx, event); err != nil {
s.log.
Expand Down
134 changes: 134 additions & 0 deletions pkg/sentry/summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package sentry

import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethpandaops/xatu/pkg/sentry/ethereum"
"github.com/sirupsen/logrus"
)

// Summary is a struct that holds the summary of the sentry.
type Summary struct {
log logrus.FieldLogger
printInterval time.Duration

beacon *ethereum.BeaconNode

eventStreamEvents sync.Map
eventsExported atomic.Uint64
failedEvents atomic.Uint64
}

// NewSummary creates a new summary with the given print interval.
func NewSummary(log logrus.FieldLogger, printInterval time.Duration, beacon *ethereum.BeaconNode) *Summary {
return &Summary{
log: log,
printInterval: printInterval,
beacon: beacon,
}
}

func (s *Summary) Start(ctx context.Context) {
s.log.WithField("interval", s.printInterval).Info("Starting summary")
ticker := time.NewTicker(s.printInterval)

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.Print()
}
}
}

func (s *Summary) Print() {
isSyncing := "unknown"
status := s.beacon.Node().Status()

if status != nil {
isSyncing = strconv.FormatBool(status.Syncing())
}

events := s.GetEventStreamEvents()

// Build a sorted slice of event stream topics and counts
type topicCount struct {
topic string
count uint64
}

sortedEvents := make([]topicCount, 0, len(events))
for topic, count := range events {
sortedEvents = append(sortedEvents, topicCount{topic, count})
}

sort.Slice(sortedEvents, func(i, j int) bool {
return sortedEvents[i].count > sortedEvents[j].count
})

// Create formatted strings for each topic and count
eventTopics := make([]string, len(sortedEvents))
for i, tc := range sortedEvents {
eventTopics[i] = fmt.Sprintf("%s: %d", tc.topic, tc.count)
}

eventStream := strings.Join(eventTopics, ", ")

s.log.WithFields(logrus.Fields{
"events_exported": s.GetEventsExported(),
"events_failed": s.GetFailedEvents(),
"node_is_healthy": s.beacon.Node().Healthy(),
"node_is_syncing": isSyncing,
"event_stream_events": eventStream,
}).Infof("Summary of the last %s", s.printInterval)

s.Reset()
}

func (s *Summary) AddEventsExported(count uint64) {
s.eventsExported.Add(count)
}

func (s *Summary) GetEventsExported() uint64 {
return s.eventsExported.Load()
}

func (s *Summary) AddFailedEvents(count uint64) {
s.failedEvents.Add(count)
}

func (s *Summary) GetFailedEvents() uint64 {
return s.failedEvents.Load()
}

func (s *Summary) AddEventStreamEvents(topic string, count uint64) {
current, _ := s.eventStreamEvents.LoadOrStore(topic, count)

s.eventStreamEvents.Store(topic, current.(uint64)+count)
}

func (s *Summary) GetEventStreamEvents() map[string]uint64 {
events := make(map[string]uint64)

s.eventStreamEvents.Range(func(key, value any) bool {
events[key.(string)], _ = value.(uint64)

return true
})

return events
}

func (s *Summary) Reset() {
s.eventsExported.Store(0)
s.failedEvents.Store(0)
s.eventStreamEvents = sync.Map{}
}
Loading