diff --git a/go.mod b/go.mod index e482e1ad..79b66af1 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/ethpandaops/beacon v0.42.0 github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 github.com/ethpandaops/ethwallclock v0.3.0 + github.com/ferranbt/fastssz v0.1.3 github.com/go-co-op/gocron v1.27.1 github.com/golang/protobuf v1.5.4 github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb @@ -101,7 +102,6 @@ require ( github.com/ethereum/c-kzg-4844 v1.0.0 // indirect github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 // indirect github.com/fatih/color v1.16.0 // indirect - github.com/ferranbt/fastssz v0.1.3 // indirect github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect diff --git a/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go b/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go index 6d6011af..8b56b3f6 100644 --- a/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go +++ b/pkg/cannon/deriver/beacon/eth/v2/beacon_block.go @@ -2,7 +2,6 @@ package v2 import ( "context" - "encoding/json" "fmt" "time" @@ -14,8 +13,8 @@ import ( "github.com/ethpandaops/xatu/pkg/observability" "github.com/ethpandaops/xatu/pkg/proto/eth" xatuethv1 "github.com/ethpandaops/xatu/pkg/proto/eth/v1" - xatuethv2 "github.com/ethpandaops/xatu/pkg/proto/eth/v2" "github.com/ethpandaops/xatu/pkg/proto/xatu" + ssz "github.com/ferranbt/fastssz" "github.com/golang/snappy" "github.com/google/uuid" "github.com/pkg/errors" @@ -280,7 +279,7 @@ func (b *BeaconBlockDeriver) createEventFromBlock(ctx context.Context, block *sp }, } - additionalData, err := b.getAdditionalData(ctx, block, data) + additionalData, err := b.getAdditionalData(ctx, block) if err != nil { b.log.WithError(err).Error("Failed to get extra beacon block data") @@ -294,7 +293,7 @@ func (b *BeaconBlockDeriver) createEventFromBlock(ctx context.Context, block *sp return decoratedEvent, nil } -func (b *BeaconBlockDeriver) getAdditionalData(_ context.Context, block *spec.VersionedSignedBeaconBlock, data *xatuethv2.EventBlockV2) (*xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data, error) { +func (b *BeaconBlockDeriver) getAdditionalData(_ context.Context, block *spec.VersionedSignedBeaconBlock) (*xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data, error) { extra := &xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data{} slotI, err := block.Slot() @@ -332,13 +331,18 @@ func (b *BeaconBlockDeriver) getAdditionalData(_ context.Context, block *spec.Ve } } - dataAsJSON, err := json.Marshal(block) + blockMessage, err := getBlockMessage(block) if err != nil { return nil, err } - dataSize := len(dataAsJSON) - compressedData := snappy.Encode(nil, dataAsJSON) + sszData, err := ssz.MarshalSSZ(blockMessage) + if err != nil { + return nil, err + } + + dataSize := len(sszData) + compressedData := snappy.Encode(nil, sszData) compressedDataSize := len(compressedData) blockRoot, err := block.Root() @@ -387,3 +391,18 @@ func (b *BeaconBlockDeriver) getAdditionalData(_ context.Context, block *spec.Ve return extra, nil } + +func getBlockMessage(block *spec.VersionedSignedBeaconBlock) (ssz.Marshaler, error) { + switch block.Version { + case spec.DataVersionAltair: + return block.Altair.Message, nil + case spec.DataVersionBellatrix: + return block.Bellatrix.Message, nil + case spec.DataVersionCapella: + return block.Capella.Message, nil + case spec.DataVersionDeneb: + return block.Deneb.Message, nil + default: + return nil, fmt.Errorf("unsupported block version: %s", block.Version) + } +} diff --git a/pkg/sentry/event/beacon/eth/v2/beacon_block.go b/pkg/sentry/event/beacon/eth/v2/beacon_block.go index b23dacbe..f926ef29 100644 --- a/pkg/sentry/event/beacon/eth/v2/beacon_block.go +++ b/pkg/sentry/event/beacon/eth/v2/beacon_block.go @@ -7,15 +7,14 @@ import ( "github.com/attestantio/go-eth2-client/spec" "github.com/ethpandaops/xatu/pkg/proto/eth" - xatuethv2 "github.com/ethpandaops/xatu/pkg/proto/eth/v2" "github.com/ethpandaops/xatu/pkg/proto/xatu" "github.com/ethpandaops/xatu/pkg/sentry/ethereum" + ssz "github.com/ferranbt/fastssz" "github.com/golang/snappy" "github.com/google/uuid" ttlcache "github.com/jellydator/ttlcache/v3" hashstructure "github.com/mitchellh/hashstructure/v2" "github.com/sirupsen/logrus" - "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -66,7 +65,7 @@ func (e *BeaconBlock) Decorate(ctx context.Context) (*xatu.DecoratedEvent, error }, } - additionalData, err := e.getAdditionalData(ctx, data) + additionalData, err := e.getAdditionalData(ctx) if err != nil { e.log.WithError(err).Error("Failed to get extra beacon block data") } else { @@ -123,7 +122,7 @@ func (e *BeaconBlock) ShouldIgnore(ctx context.Context) (bool, error) { return false, nil } -func (e *BeaconBlock) getAdditionalData(_ context.Context, data *xatuethv2.EventBlockV2) (*xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data, error) { +func (e *BeaconBlock) getAdditionalData(_ context.Context) (*xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data, error) { extra := &xatu.ClientMeta_AdditionalEthV2BeaconBlockV2Data{} slotI, err := e.event.Slot() @@ -164,15 +163,23 @@ func (e *BeaconBlock) getAdditionalData(_ context.Context, data *xatuethv2.Event } } - dataAsJSON, err := protojson.Marshal(data) + blockMessage, err := getBlockMessage(e.event) if err != nil { - return nil, err + e.log.WithError(err).Warn("Failed to get block message to compute block size. Missing fork version?") + } else { + sszData, err := ssz.MarshalSSZ(blockMessage) + if err != nil { + e.log.WithError(err).Warn("Failed to marshal (SSZ) block message to compute block size") + } else { + dataSize := len(sszData) + compressedData := snappy.Encode(nil, sszData) + compressedDataSize := len(compressedData) + + extra.TotalBytes = wrapperspb.UInt64(uint64(dataSize)) + extra.TotalBytesCompressed = wrapperspb.UInt64(uint64(compressedDataSize)) + } } - dataSize := len(dataAsJSON) - compressedData := snappy.Encode(nil, dataAsJSON) - compressedDataSize := len(compressedData) - switch e.event.Version { case spec.DataVersionBellatrix: bellatrixTxs := make([][]byte, len(e.event.Bellatrix.Message.Body.ExecutionPayload.Transactions)) @@ -200,11 +207,24 @@ func (e *BeaconBlock) getAdditionalData(_ context.Context, data *xatuethv2.Event compressedTransactions := snappy.Encode(nil, transactionsBytes) compressedTxSize := len(compressedTransactions) - extra.TotalBytes = wrapperspb.UInt64(uint64(dataSize)) - extra.TotalBytesCompressed = wrapperspb.UInt64(uint64(compressedDataSize)) extra.TransactionsCount = wrapperspb.UInt64(uint64(txCount)) extra.TransactionsTotalBytes = wrapperspb.UInt64(uint64(txSize)) extra.TransactionsTotalBytesCompressed = wrapperspb.UInt64(uint64(compressedTxSize)) return extra, nil } + +func getBlockMessage(block *spec.VersionedSignedBeaconBlock) (ssz.Marshaler, error) { + switch block.Version { + case spec.DataVersionAltair: + return block.Altair.Message, nil + case spec.DataVersionBellatrix: + return block.Bellatrix.Message, nil + case spec.DataVersionCapella: + return block.Capella.Message, nil + case spec.DataVersionDeneb: + return block.Deneb.Message, nil + default: + return nil, fmt.Errorf("unsupported block version: %s", block.Version) + } +}