Skip to content

Commit

Permalink
refactor(climimicry): handle singleattestation event
Browse files Browse the repository at this point in the history
  • Loading branch information
mattevans committed Feb 25, 2025
1 parent 0848689 commit c7cc224
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/ethpandaops/xatu

go 1.23.5

replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.3-0.20250224103242-ac07eb24a0b6
replace github.com/probe-lab/hermes => github.com/ethpandaops/hermes v0.0.3-0.20250225010021-6b34b483f83c

require (
github.com/IBM/sarama v1.45.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
github.com/ethpandaops/ethwallclock v0.3.0/go.mod h1:y0Cu+mhGLlem19vnAV2x0hpFS5KZ7oOi2SWYayv9l24=
github.com/ethpandaops/hermes v0.0.3-0.20250224103242-ac07eb24a0b6 h1:Q+IsO41NIha1+B8TAwoIm9nsenRarbv2KTxoAwGIX+g=
github.com/ethpandaops/hermes v0.0.3-0.20250224103242-ac07eb24a0b6/go.mod h1:Am+tzMyU2kxy0NG/2/49rx66mVjgxJgTseJJFwZDb7Q=
github.com/ethpandaops/hermes v0.0.3-0.20250225010021-6b34b483f83c h1:YhbCCKzernmugunOEedigfndFkL8MFg+BqPCDxbCza8=
github.com/ethpandaops/hermes v0.0.3-0.20250225010021-6b34b483f83c/go.mod h1:Am+tzMyU2kxy0NG/2/49rx66mVjgxJgTseJJFwZDb7Q=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
Expand Down
18 changes: 11 additions & 7 deletions pkg/clmimicry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,17 @@ func (m *Mimicry) handleHandleMessageEvent(
return nil
}

payload, ok := event.Payload.(*eth.TraceEventAttestation)
if !ok {
return errors.New("invalid payload type for HandleMessage event")
}

if err := m.handleGossipAttestation(ctx, clientMeta, event, payload); err != nil {
return errors.Wrap(err, "failed to handle gossipsub beacon attestation")
switch payload := event.Payload.(type) {
case *eth.TraceEventAttestation:
if err := m.handleGossipAttestation(ctx, clientMeta, event, payload); err != nil {
return errors.Wrap(err, "failed to handle gossipsub beacon attestation")
}
case *eth.TraceEventSingleAttestation:
if err := m.handleGossipSingleAttestation(ctx, clientMeta, event, payload); err != nil {
return errors.Wrap(err, "failed to handle gossipsub single beacon attestation")
}
default:
return fmt.Errorf("invalid payload type for HandleMessage event: %T", event.Payload)
}
case strings.Contains(topic, p2p.GossipBlockMessage):
if !m.Config.Events.GossipSubBeaconBlockEnabled {
Expand Down
142 changes: 142 additions & 0 deletions pkg/clmimicry/gossipsub_single_attestation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package clmimicry

import (
"context"
"fmt"

v1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1"
"github.com/ethpandaops/xatu/pkg/proto/libp2p"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/google/uuid"
"github.com/probe-lab/hermes/eth"
"github.com/probe-lab/hermes/host"
ethtypes "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

func (m *Mimicry) handleGossipSingleAttestation(
ctx context.Context,
clientMeta *xatu.ClientMeta,
event *host.TraceEvent,
payload *eth.TraceEventSingleAttestation,
) error {
if payload.SingleAttestation == nil || payload.SingleAttestation.GetData() == nil {
return fmt.Errorf("handleGossipSingleAttestation() called with nil attestation")
}

attestationData := payload.SingleAttestation.GetData()

attestation := &v1.Attestation{
AggregationBits: string(""),
Data: &v1.AttestationData{
Slot: uint64(attestationData.GetSlot()),
BeaconBlockRoot: fmt.Sprintf("0x%x", attestationData.GetBeaconBlockRoot()),
Source: &v1.Checkpoint{
Epoch: uint64(attestationData.GetSource().GetEpoch()),
Root: fmt.Sprintf("0x%x", attestationData.GetSource().GetRoot()),
},
Target: &v1.Checkpoint{
Epoch: uint64(attestationData.GetTarget().GetEpoch()),
Root: fmt.Sprintf("0x%x", attestationData.GetTarget().GetRoot()),
},
Index: uint64(attestationData.GetCommitteeIndex()),
},
}

metadata, ok := proto.Clone(clientMeta).(*xatu.ClientMeta)
if !ok {
return fmt.Errorf("failed to clone client metadata")
}

additionalData, err := m.createAdditionalGossipSubSingleAttestationData(payload, attestationData, event)
if err != nil {
return fmt.Errorf("failed to create additional data: %w", err)
}

metadata.AdditionalData = &xatu.ClientMeta_Libp2PTraceGossipsubBeaconAttestation{
Libp2PTraceGossipsubBeaconAttestation: additionalData,
}

decoratedEvent := &xatu.DecoratedEvent{
Event: &xatu.Event{
Name: xatu.Event_LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION,
DateTime: timestamppb.New(event.Timestamp.Add(m.clockDrift)),
Id: uuid.New().String(),
},
Meta: &xatu.Meta{
Client: metadata,
},
Data: &xatu.DecoratedEvent_Libp2PTraceGossipsubBeaconAttestation{
Libp2PTraceGossipsubBeaconAttestation: attestation,
},
}

return m.handleNewDecoratedEvent(ctx, decoratedEvent)
}

//nolint:gosec // int -> uint32 common conversion pattern in xatu.
func (m *Mimicry) createAdditionalGossipSubSingleAttestationData(
payload *eth.TraceEventSingleAttestation,
attestationData *ethtypes.AttestationData,
event *host.TraceEvent,
) (*xatu.ClientMeta_AdditionalLibP2PTraceGossipSubBeaconAttestationData, error) {
wallclockSlot, wallclockEpoch, err := m.ethereum.Metadata().Wallclock().Now()
if err != nil {
return nil, fmt.Errorf("failed to get wallclock time: %w", err)
}

// Add Clock Drift
timestampAdjusted := event.Timestamp.Add(m.clockDrift)

attestionSlot := m.ethereum.Metadata().Wallclock().Slots().FromNumber(uint64(attestationData.GetSlot()))
epoch := m.ethereum.Metadata().Wallclock().Epochs().FromSlot(uint64(attestationData.GetSlot()))

extra := &xatu.ClientMeta_AdditionalLibP2PTraceGossipSubBeaconAttestationData{
WallclockSlot: &xatu.SlotV2{
Number: &wrapperspb.UInt64Value{Value: wallclockSlot.Number()},
StartDateTime: timestamppb.New(wallclockSlot.TimeWindow().Start()),
},
WallclockEpoch: &xatu.EpochV2{
Number: &wrapperspb.UInt64Value{Value: wallclockEpoch.Number()},
StartDateTime: timestamppb.New(wallclockEpoch.TimeWindow().Start()),
},
Slot: &xatu.SlotV2{
Number: &wrapperspb.UInt64Value{Value: attestionSlot.Number()},
StartDateTime: timestamppb.New(attestionSlot.TimeWindow().Start()),
},
Epoch: &xatu.EpochV2{
Number: &wrapperspb.UInt64Value{Value: epoch.Number()},
StartDateTime: timestamppb.New(epoch.TimeWindow().Start()),
},
Propagation: &xatu.PropagationV2{
SlotStartDiff: &wrapperspb.UInt64Value{
Value: uint64(timestampAdjusted.Sub(attestionSlot.TimeWindow().Start()).Milliseconds()),
},
},
}

targetEpoch := m.ethereum.Metadata().Wallclock().Epochs().FromNumber(uint64(attestationData.GetTarget().GetEpoch()))
extra.Target = &xatu.ClientMeta_AdditionalLibP2PTraceGossipSubBeaconAttestationTargetData{
Epoch: &xatu.EpochV2{
Number: &wrapperspb.UInt64Value{Value: targetEpoch.Number()},
StartDateTime: timestamppb.New(targetEpoch.TimeWindow().Start()),
},
}

sourceEpoch := m.ethereum.Metadata().Wallclock().Epochs().FromNumber(uint64(attestationData.GetSource().GetEpoch()))
extra.Source = &xatu.ClientMeta_AdditionalLibP2PTraceGossipSubBeaconAttestationSourceData{
Epoch: &xatu.EpochV2{
Number: &wrapperspb.UInt64Value{Value: sourceEpoch.Number()},
StartDateTime: timestamppb.New(sourceEpoch.TimeWindow().Start()),
},
}

extra.Metadata = &libp2p.TraceEventMetadata{PeerId: wrapperspb.String(payload.PeerID)}
extra.Topic = wrapperspb.String(payload.Topic)
extra.MessageId = wrapperspb.String(payload.MsgID)
extra.MessageSize = wrapperspb.UInt32(uint32(payload.MsgSize))

return extra, nil
}

0 comments on commit c7cc224

Please sign in to comment.