Skip to content

Commit

Permalink
feat(cannon): Add BEACON_API_ETH_V1_BEACON_VALIDATORS (#329)
Browse files Browse the repository at this point in the history
* feat(cannon): Add BEACON_API_ETH_V1_BEACON_VALIDATORS

* add clickhouse migrations

* Add vector

* add clickhouse migrations

* refactor: Remove unnecessary code and optimize event generation

---------

Co-authored-by: Andrew Davis <1709934+Savid@users.noreply.github.com>
  • Loading branch information
samcm and Savid authored Jun 7, 2024
1 parent 57d07e7 commit 734e643
Show file tree
Hide file tree
Showing 41 changed files with 4,011 additions and 2,661 deletions.
17 changes: 17 additions & 0 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ transforms:
libp2p_trace_gossipsub_beacon_block: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK"
libp2p_trace_gossipsub_beacon_attestation: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION"
libp2p_trace_gossipsub_blob_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BLOB_SIDECAR"
beacon_api_eth_v1_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -873,3 +874,19 @@ sinks:
enabled: true
encoding:
codec: json
beacon_api_eth_v1_beacon_validators_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.beacon_api_eth_v1_beacon_validators
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: beacon-api-eth-v1-beacon-validators
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
99 changes: 99 additions & 0 deletions deploy/local/docker-compose/vector-kafka-clickhouse.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ sources:
- "^beacon-api-eth-v1-proposer-.+"
librdkafka_options:
message.max.bytes: "10485760" # 10MB
beacon_api_eth_v1_beacon_validators_kafka:
type: kafka
bootstrap_servers: "${KAFKA_BROKERS}"
auto_offset_reset: earliest
group_id: xatu-vector-kafka-clickhouse-beacon-api-eth-v1-beacon-validators
key_field: "event.id"
decoding:
codec: json
topics:
- "beacon-api-eth-v1-beacon-validators"
librdkafka_options:
message.max.bytes: "10485760" # 10MB
transforms:
xatu_server_events_meta:
type: remap
Expand All @@ -122,6 +134,7 @@ transforms:
- beacon_api_eth_v1_beacon_blob_sidecar_kafka
- beacon_p2p_events_kafka
- beacon_api_eth_v1_proposer_kafka
- beacon_api_eth_v1_beacon_validators_kafka
source: |-
.meta_client_name = .meta.client.name
.meta_client_id = .meta.client.id
Expand Down Expand Up @@ -312,6 +325,7 @@ transforms:
canonical_beacon_block_execution_transaction: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_EXECUTION_TRANSACTION"
canonical_beacon_block_proposer_slashing: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_PROPOSER_SLASHING"
canonical_beacon_block_voluntary_exit: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_VOLUNTARY_EXIT"
canonical_beacon_validators: .event.name == "BEACON_API_ETH_V1_BEACON_VALIDATORS"
canonical_beacon_block_withdrawal: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_WITHDRAWAL"
canonical_beacon_block: .event.name == "BEACON_API_ETH_V2_BEACON_BLOCK_V2" && .meta.client.additional_data.finalized_when_requested == true
canonical_beacon_proposer_duty: .event.name == "BEACON_API_ETH_V1_PROPOSER_DUTY" && .meta.client.additional_data.state_id == "finalized"
Expand Down Expand Up @@ -343,6 +357,7 @@ transforms:
- xatu_server_events_router.beacon_p2p_attestation
- xatu_server_events_router.blockprint_block_classification
- xatu_server_events_router.canonical_beacon_blob_sidecar
- xatu_server_events_router.canonical_beacon_validators
- xatu_server_events_router.canonical_beacon_block
- xatu_server_events_router.canonical_beacon_block_attester_slashing
- xatu_server_events_router.canonical_beacon_block_elaborated_attestation
Expand Down Expand Up @@ -1455,6 +1470,70 @@ transforms:
del(.event)
del(.meta)
del(.data)
canonical_beacon_validators_formatted:
type: remap
inputs:
- xatu_server_events_router.canonical_beacon_validators
source: |-
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
events = []
.updated_date_time = to_unix_timestamp(now())
for_each(array!(.data.validators)) -> |_index, validator| {
events = push(events, {
"key": .event.id,
"event_date_time": .event_date_time,
"updated_date_time": .updated_date_time,
"meta_client_name": .meta_client_name,
"meta_client_id": .meta_client_id,
"meta_client_version": .meta_client_version,
"meta_client_implementation": .meta_client_implementation,
"meta_client_os": .meta_client_os,
"meta_client_ip": .meta_client_ip,
"meta_network_id": .meta_network_id,
"meta_network_name": .meta_network_name,
"meta_client_geo_city": .meta_client_geo_city,
"meta_client_geo_country": .meta_client_geo_country,
"meta_client_geo_country_code": .meta_client_geo_country_code,
"meta_client_geo_continent_code": .meta_client_geo_continent_code,
"meta_client_geo_longitude": .meta_client_geo_longitude,
"meta_client_geo_latitude": .meta_client_geo_latitude,
"meta_client_geo_autonomous_system_number": .meta_client_geo_autonomous_system_number,
"meta_client_geo_autonomous_system_organization": .meta_client_geo_autonomous_system_organization,
"epoch": .meta.client.additional_data.epoch.number,
"epoch_start_date_time": .epoch_start_date_time,
"index": validator.index,
"balance": validator.balance,
"status": validator.status,
"activation_eligibility_epoch": validator.data.activation_eligibility_epoch,
"activation_epoch": validator.data.activation_epoch,
"effective_balance": validator.data.effective_balance,
"exit_epoch": validator.data.exit_epoch,
"pubkey": validator.data.pubkey,
"slashed": validator.data.slashed,
"withdrawable_epoch": validator.data.withdrawable_epoch,
"withdrawal_credentials": validator.data.withdrawal_credentials
})
}
. = events
blockprint_block_classification_formatted:
type: remap
inputs:
Expand Down Expand Up @@ -2288,3 +2367,23 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
canonical_beacon_validators_clickhouse:
type: clickhouse
inputs:
- canonical_beacon_validators_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: canonical_beacon_validators
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS canonical_beacon_validators on cluster '{cluster}' SYNC;
DROP TABLE IF EXISTS canonical_beacon_validators_local on cluster '{cluster}' SYNC;
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
CREATE TABLE default.canonical_beacon_validators_local on cluster '{cluster}'
(
updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
event_date_time DateTime64(3) CODEC(DoubleDelta, ZSTD(1)),
epoch UInt32 CODEC(DoubleDelta, ZSTD(1)),
epoch_start_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
`index` UInt32 CODEC(ZSTD(1)),
balance UInt64 CODEC(ZSTD(1)),
`status` LowCardinality(String),
pubkey String CODEC(ZSTD(1)),
withdrawal_credentials String CODEC(ZSTD(1)),
effective_balance UInt64 CODEC(ZSTD(1)),
slashed Bool,
activation_epoch UInt64 CODEC(ZSTD(1)),
activation_eligibility_epoch UInt64 CODEC(ZSTD(1)),
exit_epoch UInt64 CODEC(ZSTD(1)),
withdrawable_epoch UInt64 CODEC(ZSTD(1)),
meta_client_name LowCardinality(String),
meta_client_id String CODEC(ZSTD(1)),
meta_client_version LowCardinality(String),
meta_client_implementation LowCardinality(String),
meta_client_os LowCardinality(String),
meta_client_ip Nullable(IPv6) CODEC(ZSTD(1)),
meta_client_geo_city LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_country LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_country_code LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_continent_code LowCardinality(String) CODEC(ZSTD(1)),
meta_client_geo_longitude Nullable(Float64) CODEC(ZSTD(1)),
meta_client_geo_latitude Nullable(Float64) CODEC(ZSTD(1)),
meta_client_geo_autonomous_system_number Nullable(UInt32) CODEC(ZSTD(1)),
meta_client_geo_autonomous_system_organization Nullable(String) CODEC(ZSTD(1)),
meta_network_id Int32 CODEC(DoubleDelta, ZSTD(1)),
meta_network_name LowCardinality(String),
meta_consensus_version LowCardinality(String),
meta_consensus_version_major LowCardinality(String),
meta_consensus_version_minor LowCardinality(String),
meta_consensus_version_patch LowCardinality(String),
meta_consensus_implementation LowCardinality(String),
meta_labels Map(String, String) CODEC(ZSTD(1))
) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time)
PARTITION BY toStartOfMonth(epoch_start_date_time)
ORDER BY (epoch_start_date_time, index, meta_network_name);

ALTER TABLE default.canonical_beacon_validators_local ON CLUSTER '{cluster}'
MODIFY COMMENT 'Contains a validator state for an epoch.',
COMMENT COLUMN updated_date_time 'When this row was last updated',
COMMENT COLUMN event_date_time 'When the client fetched the beacon block from a beacon node',
COMMENT COLUMN epoch 'The epoch number from beacon block payload',
COMMENT COLUMN epoch_start_date_time 'The wall clock time when the epoch started',
COMMENT COLUMN `index` 'The index of the validator',
COMMENT COLUMN `balance` 'The balance of the validator',
COMMENT COLUMN `status` 'The status of the validator',
COMMENT COLUMN pubkey 'The public key of the validator',
COMMENT COLUMN withdrawal_credentials 'The withdrawal credentials of the validator',
COMMENT COLUMN effective_balance 'The effective balance of the validator',
COMMENT COLUMN slashed 'Whether the validator is slashed',
COMMENT COLUMN activation_epoch 'The epoch when the validator was activated',
COMMENT COLUMN activation_eligibility_epoch 'The epoch when the validator was activated',
COMMENT COLUMN exit_epoch 'The epoch when the validator exited',
COMMENT COLUMN withdrawable_epoch 'The epoch when the validator can withdraw',
COMMENT COLUMN meta_client_name 'Name of the client that generated the event',
COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.',
COMMENT COLUMN meta_client_version 'Version of the client that generated the event',
COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event',
COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event',
COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event',
COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event',
COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event',
COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event',
COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event',
COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event',
COMMENT COLUMN meta_network_id 'Ethereum network ID',
COMMENT COLUMN meta_network_name 'Ethereum network name',
COMMENT COLUMN meta_consensus_version 'Ethereum consensus client version that generated the event',
COMMENT COLUMN meta_consensus_version_major 'Ethereum consensus client major version that generated the event',
COMMENT COLUMN meta_consensus_version_minor 'Ethereum consensus client minor version that generated the event',
COMMENT COLUMN meta_consensus_version_patch 'Ethereum consensus client patch version that generated the event',
COMMENT COLUMN meta_consensus_implementation 'Ethereum consensus client implementation that generated the event',
COMMENT COLUMN meta_labels 'Labels associated with the event';

CREATE TABLE canonical_beacon_validators on cluster '{cluster}' AS canonical_beacon_validators_local
ENGINE = Distributed('{cluster}', default, canonical_beacon_validators_local, cityHash64('epoch_start_date_time', 'index', 'meta_network_name'));
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ services:
"libp2p-trace-gossipsub-beacon-block"
"libp2p-trace-gossipsub-beacon-attestation"
"libp2p-trace-gossipsub-blob-sidecar"
"beacon-api-eth-v1-beacon-validators"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9
github.com/creasty/defaults v1.7.0
github.com/ethereum/go-ethereum v1.13.15
github.com/ethpandaops/beacon v0.35.0
github.com/ethpandaops/beacon v0.37.0
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756
github.com/ethpandaops/ethwallclock v0.3.0
github.com/go-co-op/gocron v1.27.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ github.com/ethereum/c-kzg-4844 v0.4.0 h1:3MS1s4JtA868KpJxroZoepdV0ZKBp3u/O5HcZ7R
github.com/ethereum/c-kzg-4844 v0.4.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0=
github.com/ethereum/go-ethereum v1.13.15 h1:U7sSGYGo4SPjP6iNIifNoyIAiNjrmQkz6EwQG+/EZWo=
github.com/ethereum/go-ethereum v1.13.15/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU=
github.com/ethpandaops/beacon v0.35.0 h1:ZkHfxm41N0wkv503Xdb6rFxLuEnIonClUQWUPFHS5VU=
github.com/ethpandaops/beacon v0.35.0/go.mod h1:B+SLxj1gnDd/Ia7cl/uuhzo1wyVf2p2puL6lmzPdPro=
github.com/ethpandaops/beacon v0.37.0 h1:T+F0IEjkSrevAbGA4zsqvqjnm4IRp+JKLsd8DyAO8ZQ=
github.com/ethpandaops/beacon v0.37.0/go.mod h1:B+SLxj1gnDd/Ia7cl/uuhzo1wyVf2p2puL6lmzPdPro=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m0oxOk03m11n/xgdY5ceyUf/ZxYdOs5gE=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
Expand Down
17 changes: 17 additions & 0 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,23 @@ func (c *Cannon) startBeaconBlockProcessor(ctx context.Context) error {
c.beacon,
clientMeta,
),
v1.NewBeaconValidatorsDeriver(
c.log,
&c.Config.Derivers.BeaconValidatorsConfig,
iterator.NewBackfillingCheckpoint(
c.log,
networkName,
networkID,
xatu.CannonType_BEACON_API_ETH_V1_BEACON_VALIDATORS,
c.coordinatorClient,
wallclock,
&backfillingCheckpointIteratorMetrics,
c.beacon,
finalizedCheckpoint,
),
c.beacon,
clientMeta,
),
}

c.eventDerivers = eventDerivers
Expand Down
Loading

0 comments on commit 734e643

Please sign in to comment.