Skip to content

Commit

Permalink
Observability: Duty Scheduler instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-ssvlabs committed Nov 13, 2024
1 parent bafae73 commit 0c69041
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 18 deletions.
49 changes: 49 additions & 0 deletions operator/duties/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package duties

import (
"fmt"

"github.com/ssvlabs/ssv/observability"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
)

const (
observabilityComponentName = "github.com/ssvlabs/ssv/operator/duties"
observabilityComponentNamespace = "ssv.operator.duty_scheduler"
)

var (
meter = otel.Meter(observabilityComponentName)

slotDelayHistogram = observability.GetMetric(
fmt.Sprintf("%s.slot_ticker_delay.duration", observabilityComponentNamespace),
func(metricName string) (metric.Float64Histogram, error) {
return meter.Float64Histogram(
metricName,
metric.WithUnit("s"),
metric.WithDescription("delay of the slot ticker"),
metric.WithExplicitBucketBoundaries([]float64{5, 10, 20, 100, 500, 5000}...))
},
)

dutiesExecutedCounter = observability.GetMetric(
fmt.Sprintf("%s.executions", observabilityComponentNamespace),
func(metricName string) (metric.Int64Counter, error) {
return meter.Int64Counter(
metricName,
metric.WithUnit("{duty}"),
metric.WithDescription("total number of duties executed by scheduler"))
},
)

committeeDutiesExecutedCounter = observability.GetMetric(
fmt.Sprintf("%s.committee_executions", observabilityComponentNamespace),
func(metricName string) (metric.Int64Counter, error) {
return meter.Int64Counter(
metricName,
metric.WithUnit("{committee_duty}"),
metric.WithDescription("total number of committee duties executed by scheduler"))
},
)
)
26 changes: 8 additions & 18 deletions operator/duties/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/v4/async/event"
"github.com/sourcegraph/conc/pool"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"

genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types"
Expand All @@ -34,19 +34,6 @@ import (

//go:generate mockgen -package=duties -destination=./scheduler_mock.go -source=./scheduler.go

var slotDelayHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "slot_ticker_delay_milliseconds",
Help: "The delay in milliseconds of the slot ticker",
Buckets: []float64{5, 10, 20, 100, 500, 5000}, // Buckets in milliseconds. Adjust as per your needs.
})

func init() {
logger := zap.L()
if err := prometheus.Register(slotDelayHistogram); err != nil {
logger.Debug("could not register prometheus collector")
}
}

const (
// blockPropagationDelay time to propagate around the nodes
// before kicking off duties for the block's slot.
Expand Down Expand Up @@ -383,11 +370,12 @@ func (s *Scheduler) ExecuteGenesisDuties(logger *zap.Logger, duties []*genesissp
if slotDelay >= 100*time.Millisecond {
logger.Debug("⚠️ late duty execution", zap.Int64("slot_delay", slotDelay.Milliseconds()))
}
slotDelayHistogram.Observe(float64(slotDelay.Milliseconds()))
slotDelayHistogram.Record(context.TODO(), slotDelay.Seconds())
go func() {
if duty.Type == genesisspectypes.BNRoleAttester || duty.Type == genesisspectypes.BNRoleSyncCommittee {
s.waitOneThirdOrValidBlock(duty.Slot)
}
dutiesExecutedCounter.Add(context.TODO(), 1, metric.WithAttributes(attribute.String("duty.type", duty.Type.String())))
s.dutyExecutor.ExecuteGenesisDuty(logger, duty)
}()
}
Expand All @@ -402,11 +390,12 @@ func (s *Scheduler) ExecuteDuties(logger *zap.Logger, duties []*spectypes.Valida
if slotDelay >= 100*time.Millisecond {
logger.Debug("⚠️ late duty execution", zap.Int64("slot_delay", slotDelay.Milliseconds()))
}
slotDelayHistogram.Observe(float64(slotDelay.Milliseconds()))
slotDelayHistogram.Record(context.TODO(), slotDelay.Seconds())
go func() {
if duty.Type == spectypes.BNRoleAttester || duty.Type == spectypes.BNRoleSyncCommittee {
s.waitOneThirdOrValidBlock(duty.Slot)
}
dutiesExecutedCounter.Add(context.TODO(), 1, metric.WithAttributes(attribute.String("duty.type", duty.Type.String())))
s.dutyExecutor.ExecuteDuty(logger, duty)
}()
}
Expand All @@ -424,9 +413,10 @@ func (s *Scheduler) ExecuteCommitteeDuties(logger *zap.Logger, duties committeeD
if slotDelay >= 100*time.Millisecond {
logger.Debug("⚠️ late duty execution", zap.Int64("slot_delay", slotDelay.Milliseconds()))
}
slotDelayHistogram.Observe(float64(slotDelay.Milliseconds()))
slotDelayHistogram.Record(context.TODO(), slotDelay.Seconds())
go func() {
s.waitOneThirdOrValidBlock(duty.Slot)
committeeDutiesExecutedCounter.Add(context.TODO(), 1, metric.WithAttributes(attribute.String("committee.runner.role", committee.duty.RunnerRole().String())))
s.dutyExecutor.ExecuteCommitteeDuty(logger, committee.id, duty)
}()
}
Expand Down

0 comments on commit 0c69041

Please sign in to comment.