Skip to content

Commit

Permalink
feat(cl-mimicry): Add LIBP2P_GOSSIPSUB_BLOB_SIDECAR (#323)
Browse files Browse the repository at this point in the history
* Finish coding

* Finish configurations

* bug fixes
  • Loading branch information
0x00101010 authored Jun 3, 2024
1 parent 0b54e39 commit d0a85d4
Show file tree
Hide file tree
Showing 14 changed files with 2,412 additions and 1,625 deletions.
21 changes: 19 additions & 2 deletions deploy/local/docker-compose/vector-http-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ transforms:
libp2p_trace_join: .event.name == "LIBP2P_TRACE_JOIN"
libp2p_trace_handle_metadata: .event.name == "LIBP2P_TRACE_HANDLE_METADATA"
libp2p_trace_handle_status: .event.name == "LIBP2P_TRACE_HANDLE_STATUS"
libp2p_trace_gossipsub_beacon_block : .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK"
libp2p_trace_gossipsub_beacon_attestation : .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION"
libp2p_trace_gossipsub_beacon_block: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK"
libp2p_trace_gossipsub_beacon_attestation: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION"
libp2p_trace_gossipsub_blob_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BLOB_SIDECAR"
sinks:
metrics:
type: prometheus_exporter
Expand Down Expand Up @@ -856,3 +857,19 @@ sinks:
enabled: true
encoding:
codec: json
libp2p_trace_gossipsub_blob_sidecar_kafka:
type: kafka
buffer:
max_events: 500000
batch:
timeout_secs: 0.5
inputs:
- xatu_server_events_router.libp2p_trace_gossipsub_blob_sidecar
bootstrap_servers: "${KAFKA_BROKERS}"
key_field: "event.id"
topic: libp2p-trace-gossipsub-blob-sidecar
compression: snappy
healthcheck:
enabled: true
encoding:
codec: json
126 changes: 119 additions & 7 deletions deploy/local/docker-compose/vector-kafka-clickhouse-libp2p.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ transforms:
libp2p_trace_handle_status: .event.name == "LIBP2P_TRACE_HANDLE_STATUS"
libp2p_trace_gossipsub_beacon_block: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_BLOCK"
libp2p_trace_gossipsub_beacon_attestation: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BEACON_ATTESTATION"
libp2p_trace_gossipsub_blob_sidecar: .event.name == "LIBP2P_TRACE_GOSSIPSUB_BLOB_SIDECAR"
xatu_server_events_router_matched:
type: log_to_metric
inputs:
Expand All @@ -229,6 +230,7 @@ transforms:
- xatu_server_events_router.libp2p_trace_handle_status
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_block
- xatu_server_events_router.libp2p_trace_gossipsub_beacon_attestation
- xatu_server_events_router.libp2p_trace_gossipsub_blob_sidecar
metrics:
- type: counter
field: event.name
Expand Down Expand Up @@ -652,7 +654,6 @@ transforms:
.message_size = .meta.client.additional_data.message_size
.message_id = .meta.client.additional_data.message_id
.updated_date_time = to_unix_timestamp(now())
del(.event)
Expand Down Expand Up @@ -769,13 +770,106 @@ transforms:
.message_size = .meta.client.additional_data.message_size
.message_id = .meta.client.additional_data.message_id
.updated_date_time = to_unix_timestamp(now())
del(.event)
del(.meta)
del(.data)
del(.path)
libp2p_trace_gossipsub_blob_sidecar_formatted:
type: remap
inputs:
- xatu_server_events_router.libp2p_trace_gossipsub_blob_sidecar
source: |-
.unique_key = seahash(.event.id)
.updated_date_time = to_unix_timestamp(now())
event_date_time, err = parse_timestamp(.event.date_time, format: "%+");
if err == null {
.event_date_time = to_unix_timestamp(event_date_time, unit: "milliseconds")
} else {
.error = err
.error_description = "failed to parse event date time"
log(., level: "error", rate_limit_secs: 60)
}
.slot = .data.slot
slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.slot.start_date_time, format: "%+");
if err == null {
.slot_start_date_time = to_unix_timestamp(slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.epoch = .meta.client.additional_data.epoch.number
epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.epoch.start_date_time, format: "%+");
if err == null {
.epoch_start_date_time = to_unix_timestamp(epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.wallclock_slot = .meta.client.additional_data.wallclock_slot.number
wallclock_slot_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_slot.start_date_time, format: "%+");
if err == null {
.wallclock_slot_start_date_time = to_unix_timestamp(wallclock_slot_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock slot start date time"
log(., level: "error", rate_limit_secs: 60)
}
.wallclock_epoch = .meta.client.additional_data.epoch.number
wallclock_epoch_start_date_time, err = parse_timestamp(.meta.client.additional_data.wallclock_epoch.start_date_time, format: "%+");
if err == null {
.wallclock_epoch_start_date_time = to_unix_timestamp(wallclock_epoch_start_date_time)
} else {
.error = err
.error_description = "failed to parse wallclock epoch start date time"
log(., level: "error", rate_limit_secs: 60)
}
.propagation_slot_start_diff = .meta.client.additional_data.propagation.slot_start_diff
.proposer_index = .data.proposer_index
.blob_index = .data.index
peer_id_key, err = .meta.client.additional_data.metadata.peer_id + .meta.ethereum.network.name
if err != null {
.error = err
.error_description = "failed to generate peer id unique key"
log(., level: "error", rate_limit_secs: 60)
}
.peer_id_unique_key = seahash(peer_id_key)
.message_id = .meta.client.additional_data.message_id
.message_size = .meta.client.additional_data.message_size
topicParts, err = split(.meta.client.additional_data.topic, "/")
if err != null {
.error = err
.error_description = "failed to split topic"
} else {
if length(topicParts) != 5 {
.errDebug = {
"topic": .meta.client.additional_data.topic,
}
.error_description = "failed to split topic"
log(., level: "error", rate_limit_secs: 60)
}
}
.topic_layer = topicParts[1]
.topic_fork_digest_value = topicParts[2]
.topic_name = topicParts[3]
.topic_encoding = topicParts[4]
del(.event)
del(.meta)
del(.data)
del(.path)
libp2p_trace_rpc_exploder:
type: remap
inputs:
Expand All @@ -796,7 +890,6 @@ transforms:
# Emit the main RPC event record
events = []
peer_id_key, err = .data.meta.peer_id + .meta_network_name
if err != null {
.error = err
Expand Down Expand Up @@ -1344,9 +1437,8 @@ transforms:
.error_description = "failed to convert latency to millseconds"
log(., level: "error", rate_limit_secs: 60)
}
.protocol = .data.protocol_id
.protocol = .data.protocol_id
.updated_date_time = to_unix_timestamp(now())
del(.event)
Expand Down Expand Up @@ -1380,7 +1472,7 @@ sinks:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
skip_unknown_fields: false
libp2p_trace_connected_clickhouse:
type: clickhouse
inputs:
Expand All @@ -1400,7 +1492,7 @@ sinks:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
skip_unknown_fields: false
libp2p_trace_disconnected_clickhouse:
type: clickhouse
inputs:
Expand Down Expand Up @@ -1721,3 +1813,23 @@ sinks:
healthcheck:
enabled: true
skip_unknown_fields: false
libp2p_trace_gossipsub_blob_sidecar_clickhouse:
type: clickhouse
inputs:
- libp2p_trace_gossipsub_blob_sidecar_formatted
database: default
endpoint: "${CLICKHOUSE_ENDPOINT}"
table: libp2p_gossipsub_blob_sidecar
auth:
strategy: basic
user: "${CLICKHOUSE_USER}"
password: "${CLICKHOUSE_PASSWORD}"
batch:
max_bytes: 52428800
max_events: 200000
timeout_secs: 1
buffer:
max_events: 200000
healthcheck:
enabled: true
skip_unknown_fields: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS libp2p_gossipsub_blob_sidecar ON CLUSTER '{cluster}';
DROP TABLE IF EXISTS libp2p_gossipsub_blob_sidecar_local ON CLUSTER '{cluster}';
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
CREATE TABLE libp2p_gossipsub_blob_sidecar_local
ON CLUSTER '{cluster}' (
unique_key Int64,
updated_date_time DateTime CODEC(DoubleDelta, ZSTD(1)),
event_date_time DateTime64(3) Codec(DoubleDelta, ZSTD(1)),
slot UInt32 Codec(DoubleDelta, ZSTD(1)),
slot_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
epoch UInt32 Codec(DoubleDelta, ZSTD(1)),
epoch_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
wallclock_slot UInt32 Codec(DoubleDelta, ZSTD(1)),
wallclock_slot_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
wallclock_epoch UInt32 Codec(DoubleDelta, ZSTD(1)),
wallclock_epoch_start_date_time DateTime Codec(DoubleDelta, ZSTD(1)),
propagation_slot_start_diff UInt32 Codec(ZSTD(1)),
proposer_index UInt32 CODEC(ZSTD(1)),
blob_index UInt32 CODEC(ZSTD(1)),
peer_id_unique_key Int64,
message_id String CODEC(ZSTD(1)),
message_size UInt32 Codec(ZSTD(1)),
topic_layer LowCardinality(String),
topic_fork_digest_value LowCardinality(String),
topic_name LowCardinality(String),
topic_encoding LowCardinality(String),
meta_client_name LowCardinality(String),
meta_client_id String Codec(ZSTD(1)),
meta_client_version LowCardinality(String),
meta_client_implementation LowCardinality(String),
meta_client_os LowCardinality(String),
meta_client_ip Nullable(IPv6) Codec(ZSTD(1)),
meta_client_geo_city LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_country LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_country_code LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_continent_code LowCardinality(String) Codec(ZSTD(1)),
meta_client_geo_longitude Nullable(Float64) Codec(ZSTD(1)),
meta_client_geo_latitude Nullable(Float64) Codec(ZSTD(1)),
meta_client_geo_autonomous_system_number Nullable(UInt32) Codec(ZSTD(1)),
meta_client_geo_autonomous_system_organization Nullable(String) Codec(ZSTD(1)),
meta_network_id Int32 Codec(DoubleDelta, ZSTD(1)),
meta_network_name LowCardinality(String)
) Engine = ReplicatedReplacingMergeTree('/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}', '{replica}', updated_date_time)
PARTITION BY toStartOfMonth(slot_start_date_time)
ORDER BY (slot_start_date_time, unique_key, meta_network_name, meta_client_name);

ALTER TABLE libp2p_gossipsub_blob_sidecar_local
ON CLUSTER '{cluster}'
MODIFY COMMENT 'Table for libp2p gossipsub blob sidecar data',
COMMENT COLUMN unique_key 'Unique identifier for each record',
COMMENT COLUMN updated_date_time 'Timestamp when the record was last updated',
COMMENT COLUMN event_date_time 'Timestamp of the event with millisecond precision',
COMMENT COLUMN slot 'Slot number associated with the event',
COMMENT COLUMN slot_start_date_time 'Start date and time of the slot',
COMMENT COLUMN epoch 'Epoch number associated with the event',
COMMENT COLUMN epoch_start_date_time 'Start date and time of the epoch',
COMMENT COLUMN wallclock_slot 'Slot number of the wall clock when the event was received',
COMMENT COLUMN wallclock_slot_start_date_time 'Start date and time of the wall clock slot when the event was received',
COMMENT COLUMN wallclock_epoch 'Epoch number of the wall clock when the event was received',
COMMENT COLUMN wallclock_epoch_start_date_time 'Start date and time of the wall clock epoch when the event was received',
COMMENT COLUMN propagation_slot_start_diff 'Difference in slot start time for propagation',
COMMENT COLUMN proposer_index 'The proposer index of the beacon block',
COMMENT COLUMN blob_index 'Blob index associated with the record',
COMMENT COLUMN peer_id_unique_key 'Unique key associated with the identifier of the peer',
COMMENT COLUMN message_id 'Identifier of the message',
COMMENT COLUMN message_size 'Size of the message in bytes',
COMMENT COLUMN topic_layer 'Layer of the topic in the gossipsub protocol',
COMMENT COLUMN topic_fork_digest_value 'Fork digest value of the topic',
COMMENT COLUMN topic_name 'Name of the topic',
COMMENT COLUMN topic_encoding 'Encoding used for the topic',
COMMENT COLUMN meta_client_name 'Name of the client that generated the event',
COMMENT COLUMN meta_client_id 'Unique Session ID of the client that generated the event. This changes every time the client is restarted.',
COMMENT COLUMN meta_client_version 'Version of the client that generated the event',
COMMENT COLUMN meta_client_implementation 'Implementation of the client that generated the event',
COMMENT COLUMN meta_client_os 'Operating system of the client that generated the event',
COMMENT COLUMN meta_client_ip 'IP address of the client that generated the event',
COMMENT COLUMN meta_client_geo_city 'City of the client that generated the event',
COMMENT COLUMN meta_client_geo_country 'Country of the client that generated the event',
COMMENT COLUMN meta_client_geo_country_code 'Country code of the client that generated the event',
COMMENT COLUMN meta_client_geo_continent_code 'Continent code of the client that generated the event',
COMMENT COLUMN meta_client_geo_longitude 'Longitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_latitude 'Latitude of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_number 'Autonomous system number of the client that generated the event',
COMMENT COLUMN meta_client_geo_autonomous_system_organization 'Autonomous system organization of the client that generated the event',
COMMENT COLUMN meta_network_id 'Network ID associated with the client',
COMMENT COLUMN meta_network_name 'Name of the network associated with the client';

CREATE TABLE libp2p_gossipsub_blob_sidecar
ON CLUSTER '{cluster}'
AS libp2p_gossipsub_blob_sidecar_local
ENGINE = Distributed('{cluster}', default, libp2p_gossipsub_blob_sidecar_local, unique_key);
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ services:
"libp2p-trace-handle-status"
"libp2p-trace-gossipsub-beacon-block"
"libp2p-trace-gossipsub-beacon-attestation"
"libp2p-trace-gossipsub-blob-sidecar"
)
for topic in "$${topics[@]}"; do
echo "Creating topic: $$topic";
Expand Down
6 changes: 4 additions & 2 deletions pkg/clmimicry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"fmt"
"time"

hermes "github.com/probe-lab/hermes/eth"
"github.com/sirupsen/logrus"

"github.com/ethpandaops/xatu/pkg/clmimicry/ethereum"
"github.com/ethpandaops/xatu/pkg/output"
"github.com/ethpandaops/xatu/pkg/processor"
hermes "github.com/probe-lab/hermes/eth"
"github.com/sirupsen/logrus"
)

type Config struct {
Expand Down Expand Up @@ -142,6 +143,7 @@ type EventConfig struct {
HandleStatusEnabled bool `yaml:"handleStatusEnabled" default:"true"`
GossipSubBeaconBlockEnabled bool `yaml:"gossipSubBeaconBlockEnabled" default:"true"`
GossipSubAttestationEnabled bool `yaml:"gossipSubAttestationEnabled" default:"true"`
GossipSubBlobSidecarEnabled bool `yaml:"gossipSubBlobSidecarEnabled" default:"true"`
}

func (e *EventConfig) Validate() error {
Expand Down
Loading

0 comments on commit d0a85d4

Please sign in to comment.