Skip to content

Commit

Permalink
Implement Execution Client metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-ssvlabs committed Nov 14, 2024
1 parent 225de97 commit 46a4f2b
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 34 deletions.
1 change: 0 additions & 1 deletion cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ var StartNodeCmd = &cobra.Command{
cfg.ExecutionClient.Addr,
ethcommon.HexToAddress(networkConfig.RegistryContractAddr),
executionclient.WithLogger(logger),
executionclient.WithMetrics(metricsReporter),
executionclient.WithFollowDistance(executionclient.DefaultFollowDistance),
executionclient.WithConnectionTimeout(cfg.ExecutionClient.ConnectionTimeout),
executionclient.WithReconnectionInitialInterval(executionclient.DefaultReconnectionInitialInterval),
Expand Down
39 changes: 30 additions & 9 deletions eth/executionclient/execution_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"context"
"errors"
"fmt"
"math"
"math/big"
"time"

"github.com/ethereum/go-ethereum"
ethcommon "github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/eth/contract"
Expand All @@ -33,8 +35,7 @@ type ExecutionClient struct {
contractAddress ethcommon.Address

// optional
logger *zap.Logger
metrics metrics
logger *zap.Logger
// followDistance defines an offset into the past from the head block such that the block
// at this offset will be considered as very likely finalized.
followDistance uint64 // TODO: consider reading the finalized checkpoint from consensus layer
Expand All @@ -54,7 +55,6 @@ func New(ctx context.Context, nodeAddr string, contractAddr ethcommon.Address, o
nodeAddr: nodeAddr,
contractAddress: contractAddr,
logger: zap.NewNop(),
metrics: nopMetrics{},
followDistance: DefaultFollowDistance,
connectionTimeout: DefaultConnectionTimeout,
reconnectionInitialInterval: DefaultReconnectionInitialInterval,
Expand Down Expand Up @@ -169,8 +169,6 @@ func (ec *ExecutionClient) fetchLogsInBatches(ctx context.Context, startBlock, e
}
}
}

ec.metrics.ExecutionClientLastFetchedBlock(endBlock)
}()

return logs, errors
Expand Down Expand Up @@ -230,18 +228,39 @@ func (ec *ExecutionClient) Healthy(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, ec.connectionTimeout)
defer cancel()

start := time.Now()
sp, err := ec.client.SyncProgress(ctx)
if err != nil {
ec.metrics.ExecutionClientFailure()
clientStatusGauge.Record(ctx, 1,
metric.WithAttributes(executionClientAddrAttribute(ec.nodeAddr)),
metric.WithAttributes(executionClientStatusAttribute(statusFailure)),
)

return err
}
latencyHistogram.Record(ctx,
float64(time.Since(start).Seconds()),
metric.WithAttributes(executionClientAddrAttribute(ec.nodeAddr)),
)

if sp != nil {
ec.metrics.ExecutionClientSyncing()
clientStatusGauge.Record(ctx, 1,
metric.WithAttributes(executionClientAddrAttribute(ec.nodeAddr)),
metric.WithAttributes(executionClientStatusAttribute(statusSyncing)),
)
syncingDistance := sp.HighestBlock - sp.CurrentBlock
if syncingDistance <= math.MaxInt64 {
syncingDistanceGauge.Record(ctx, int64(syncingDistance), metric.WithAttributes(executionClientAddrAttribute(ec.nodeAddr)))
}

return fmt.Errorf("syncing")
}

ec.metrics.ExecutionClientReady()
clientStatusGauge.Record(ctx, 1,
metric.WithAttributes(executionClientAddrAttribute(ec.nodeAddr)),
metric.WithAttributes(executionClientStatusAttribute(statusReady)),
)
syncingDistanceGauge.Record(ctx, 0, metric.WithAttributes(executionClientAddrAttribute(ec.nodeAddr)))

return nil
}
Expand Down Expand Up @@ -303,7 +322,9 @@ func (ec *ExecutionClient) streamLogsToChan(ctx context.Context, logs chan<- Blo
return lastBlock, fmt.Errorf("fetch logs: %w", err)
}
fromBlock = toBlock + 1
ec.metrics.ExecutionClientLastFetchedBlock(fromBlock)
if fromBlock <= math.MaxInt64 {
lastProcessedBlockGauge.Record(ctx, int64(fromBlock), metric.WithAttributes(executionClientAddrAttribute(ec.nodeAddr)))
}
}
}
}
Expand Down
1 change: 0 additions & 1 deletion eth/executionclient/execution_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestFetchHistoricalLogs(t *testing.T) {
contractAddr,
WithLogger(logger),
WithFollowDistance(followDistance),
WithMetrics(nopMetrics{}),
WithConnectionTimeout(2*time.Second),
WithReconnectionInitialInterval(2*time.Second),
)
Expand Down
16 changes: 0 additions & 16 deletions eth/executionclient/metrics.go

This file was deleted.

78 changes: 78 additions & 0 deletions eth/executionclient/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package executionclient

import (
"fmt"

"github.com/ssvlabs/ssv/observability"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
observabilityComponentName = "github.com/ssvlabs/ssv/eth/executionclient"
observabilityComponentNamespace = "ssv.el"
)

type executionClientStatus string

const (
statusSyncing = "syncing"
statusFailure = "failure"
statusReady = "ready"
)

var (
meter = otel.Meter(observabilityComponentName)

latencyHistogram = observability.GetMetric(
fmt.Sprintf("%s.latency.duration", observabilityComponentNamespace),
func(metricName string) (metric.Float64Histogram, error) {
return meter.Float64Histogram(
metricName,
metric.WithUnit("s"),
metric.WithDescription("execution client latency in seconds"),
metric.WithExplicitBucketBoundaries(observability.SecondsHistogramBuckets...),
)
},
)

syncingDistanceGauge = observability.GetMetric(
fmt.Sprintf("%s.syncing.distance", observabilityComponentNamespace),
func(metricName string) (metric.Int64Gauge, error) {
return meter.Int64Gauge(
metricName,
metric.WithUnit("{block}"),
metric.WithDescription("execution client syncing distance which is a delta between highest and current blocks"))
},
)

clientStatusGauge = observability.GetMetric(
fmt.Sprintf("%s.syncing.status", observabilityComponentNamespace),
func(metricName string) (metric.Int64Gauge, error) {
return meter.Int64Gauge(
metricName,
metric.WithDescription("execution client syncing status"))
},
)

lastProcessedBlockGauge = observability.GetMetric(
fmt.Sprintf("%s.syncing.last_processed_block", observabilityComponentNamespace),
func(metricName string) (metric.Int64Gauge, error) {
return meter.Int64Gauge(
metricName,
metric.WithUnit("{block_number}"),
metric.WithDescription("last processed block by execution client"))
},
)
)

func executionClientAddrAttribute(value string) attribute.KeyValue {
eventNameAttrName := fmt.Sprintf("%s.addr", observabilityComponentNamespace)
return attribute.String(eventNameAttrName, value)
}

func executionClientStatusAttribute(value executionClientStatus) attribute.KeyValue {
eventNameAttrName := fmt.Sprintf("%s.status", observabilityComponentNamespace)
return attribute.String(eventNameAttrName, string(value))
}
7 changes: 0 additions & 7 deletions eth/executionclient/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ func WithLogger(logger *zap.Logger) Option {
}
}

// WithMetrics enables reporting metrics.
func WithMetrics(metrics metrics) Option {
return func(s *ExecutionClient) {
s.metrics = metrics
}
}

// WithFollowDistance sets finalization offset (a block at this offset into the past
// from the head block will be considered as very likely finalized).
func WithFollowDistance(offset uint64) Option {
Expand Down

0 comments on commit 46a4f2b

Please sign in to comment.