diff --git a/.hack/devnet/run.sh b/.hack/devnet/run.sh
index 09193fe5..b1a6cd3b 100755
--- a/.hack/devnet/run.sh
+++ b/.hack/devnet/run.sh
@@ -9,10 +9,11 @@ fi
## Run devnet using kurtosis
ENCLAVE_NAME="${ENCLAVE_NAME:-dora}"
+ETHEREUM_PACKAGE="${ETHEREUM_PACKAGE:-github.com/ethpandaops/ethereum-package}"
if kurtosis enclave inspect "$ENCLAVE_NAME" > /dev/null; then
echo "Kurtosis enclave '$ENCLAVE_NAME' is already up."
else
- kurtosis run github.com/ethpandaops/ethereum-package \
+ kurtosis run "$ETHEREUM_PACKAGE" \
--image-download always \
--enclave "$ENCLAVE_NAME" \
--args-file "${config_file}"
diff --git a/clients/consensus/chainspec.go b/clients/consensus/chainspec.go
index 43311d65..644df0fc 100644
--- a/clients/consensus/chainspec.go
+++ b/clients/consensus/chainspec.go
@@ -16,40 +16,42 @@ type ForkVersion struct {
// https://github.com/ethereum/consensus-specs/blob/dev/configs/mainnet.yaml
type ChainSpec struct {
- PresetBase string `yaml:"PRESET_BASE"`
- ConfigName string `yaml:"CONFIG_NAME"`
- MinGenesisTime time.Time `yaml:"MIN_GENESIS_TIME"`
- GenesisForkVersion phase0.Version `yaml:"GENESIS_FORK_VERSION"`
- AltairForkVersion phase0.Version `yaml:"ALTAIR_FORK_VERSION"`
- AltairForkEpoch *uint64 `yaml:"ALTAIR_FORK_EPOCH"`
- BellatrixForkVersion phase0.Version `yaml:"BELLATRIX_FORK_VERSION"`
- BellatrixForkEpoch *uint64 `yaml:"BELLATRIX_FORK_EPOCH"`
- CapellaForkVersion phase0.Version `yaml:"CAPELLA_FORK_VERSION"`
- CapellaForkEpoch *uint64 `yaml:"CAPELLA_FORK_EPOCH"`
- DenebForkVersion phase0.Version `yaml:"DENEB_FORK_VERSION"`
- DenebForkEpoch *uint64 `yaml:"DENEB_FORK_EPOCH"`
- ElectraForkVersion phase0.Version `yaml:"ELECTRA_FORK_VERSION"`
- ElectraForkEpoch *uint64 `yaml:"ELECTRA_FORK_EPOCH"`
- Eip7594ForkVersion phase0.Version `yaml:"EIP7594_FORK_VERSION"`
- Eip7594ForkEpoch *uint64 `yaml:"EIP7594_FORK_EPOCH"`
- SecondsPerSlot time.Duration `yaml:"SECONDS_PER_SLOT"`
- SlotsPerEpoch uint64 `yaml:"SLOTS_PER_EPOCH"`
- EpochsPerHistoricalVector uint64 `yaml:"EPOCHS_PER_HISTORICAL_VECTOR"`
- EpochsPerSlashingVector uint64 `yaml:"EPOCHS_PER_SLASHINGS_VECTOR"`
- EpochsPerSyncCommitteePeriod uint64 `yaml:"EPOCHS_PER_SYNC_COMMITTEE_PERIOD"`
- MinSeedLookahead uint64 `yaml:"MIN_SEED_LOOKAHEAD"`
- ShuffleRoundCount uint64 `yaml:"SHUFFLE_ROUND_COUNT"`
- MaxEffectiveBalance uint64 `yaml:"MAX_EFFECTIVE_BALANCE"`
- MaxEffectiveBalanceElectra uint64 `yaml:"MAX_EFFECTIVE_BALANCE_ELECTRA"`
- TargetCommitteeSize uint64 `yaml:"TARGET_COMMITTEE_SIZE"`
- MaxCommitteesPerSlot uint64 `yaml:"MAX_COMMITTEES_PER_SLOT"`
- MinPerEpochChurnLimit uint64 `yaml:"MIN_PER_EPOCH_CHURN_LIMIT"`
- ChurnLimitQuotient uint64 `yaml:"CHURN_LIMIT_QUOTIENT"`
- DomainBeaconProposer phase0.DomainType `yaml:"DOMAIN_BEACON_PROPOSER"`
- DomainBeaconAttester phase0.DomainType `yaml:"DOMAIN_BEACON_ATTESTER"`
- DomainSyncCommittee phase0.DomainType `yaml:"DOMAIN_SYNC_COMMITTEE"`
- SyncCommitteeSize uint64 `yaml:"SYNC_COMMITTEE_SIZE"`
- DepositContractAddress []byte `yaml:"DEPOSIT_CONTRACT_ADDRESS"`
+ PresetBase string `yaml:"PRESET_BASE"`
+ ConfigName string `yaml:"CONFIG_NAME"`
+ MinGenesisTime time.Time `yaml:"MIN_GENESIS_TIME"`
+ GenesisForkVersion phase0.Version `yaml:"GENESIS_FORK_VERSION"`
+ AltairForkVersion phase0.Version `yaml:"ALTAIR_FORK_VERSION"`
+ AltairForkEpoch *uint64 `yaml:"ALTAIR_FORK_EPOCH"`
+ BellatrixForkVersion phase0.Version `yaml:"BELLATRIX_FORK_VERSION"`
+ BellatrixForkEpoch *uint64 `yaml:"BELLATRIX_FORK_EPOCH"`
+ CapellaForkVersion phase0.Version `yaml:"CAPELLA_FORK_VERSION"`
+ CapellaForkEpoch *uint64 `yaml:"CAPELLA_FORK_EPOCH"`
+ DenebForkVersion phase0.Version `yaml:"DENEB_FORK_VERSION"`
+ DenebForkEpoch *uint64 `yaml:"DENEB_FORK_EPOCH"`
+ ElectraForkVersion phase0.Version `yaml:"ELECTRA_FORK_VERSION"`
+ ElectraForkEpoch *uint64 `yaml:"ELECTRA_FORK_EPOCH"`
+ Eip7594ForkVersion phase0.Version `yaml:"EIP7594_FORK_VERSION"`
+ Eip7594ForkEpoch *uint64 `yaml:"EIP7594_FORK_EPOCH"`
+ SecondsPerSlot time.Duration `yaml:"SECONDS_PER_SLOT"`
+ SlotsPerEpoch uint64 `yaml:"SLOTS_PER_EPOCH"`
+ EpochsPerHistoricalVector uint64 `yaml:"EPOCHS_PER_HISTORICAL_VECTOR"`
+ EpochsPerSlashingVector uint64 `yaml:"EPOCHS_PER_SLASHINGS_VECTOR"`
+ EpochsPerSyncCommitteePeriod uint64 `yaml:"EPOCHS_PER_SYNC_COMMITTEE_PERIOD"`
+ MinSeedLookahead uint64 `yaml:"MIN_SEED_LOOKAHEAD"`
+ ShuffleRoundCount uint64 `yaml:"SHUFFLE_ROUND_COUNT"`
+ MaxEffectiveBalance uint64 `yaml:"MAX_EFFECTIVE_BALANCE"`
+ MaxEffectiveBalanceElectra uint64 `yaml:"MAX_EFFECTIVE_BALANCE_ELECTRA"`
+ TargetCommitteeSize uint64 `yaml:"TARGET_COMMITTEE_SIZE"`
+ MaxCommitteesPerSlot uint64 `yaml:"MAX_COMMITTEES_PER_SLOT"`
+ MinPerEpochChurnLimit uint64 `yaml:"MIN_PER_EPOCH_CHURN_LIMIT"`
+ ChurnLimitQuotient uint64 `yaml:"CHURN_LIMIT_QUOTIENT"`
+ DomainBeaconProposer phase0.DomainType `yaml:"DOMAIN_BEACON_PROPOSER"`
+ DomainBeaconAttester phase0.DomainType `yaml:"DOMAIN_BEACON_ATTESTER"`
+ DomainSyncCommittee phase0.DomainType `yaml:"DOMAIN_SYNC_COMMITTEE"`
+ SyncCommitteeSize uint64 `yaml:"SYNC_COMMITTEE_SIZE"`
+ DepositContractAddress []byte `yaml:"DEPOSIT_CONTRACT_ADDRESS"`
+ MaxConsolidationRequestsPerPayload uint64 `yaml:"MAX_CONSOLIDATION_REQUESTS_PER_PAYLOAD"`
+ MaxWithdrawalRequestsPerPayload uint64 `yaml:"MAX_WITHDRAWAL_REQUESTS_PER_PAYLOAD"`
// EIP7594: PeerDAS
NumberOfColumns *uint64 `yaml:"NUMBER_OF_COLUMNS"`
diff --git a/config/default.config.yml b/config/default.config.yml
index 0e9fed70..2cc2643e 100644
--- a/config/default.config.yml
+++ b/config/default.config.yml
@@ -50,7 +50,9 @@ executionapi:
- name: "local"
url: "http://127.0.0.1:8545"
- depositLogBatchSize: 1000
+ logBatchSize: 1000
+ depositDeployBlock: 0 # el block number from where to crawl the deposit contract (should be <=, but close to the deposit contract deployment block)
+ electraDeployBlock: 0 # el block number from where to crawl the electra system contracts (should be <=, but close to electra fork activation block)
# indexer keeps track of the latest epochs in memory.
indexer:
diff --git a/db/consolidation_request_txs.go b/db/consolidation_request_txs.go
new file mode 100644
index 00000000..05666781
--- /dev/null
+++ b/db/consolidation_request_txs.go
@@ -0,0 +1,243 @@
+package db
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/ethpandaops/dora/dbtypes"
+ "github.com/jmoiron/sqlx"
+)
+
+func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequestTx, tx *sqlx.Tx) error {
+ var sql strings.Builder
+ fmt.Fprint(&sql,
+ EngineQuery(map[dbtypes.DBEngineType]string{
+ dbtypes.DBEnginePgsql: "INSERT INTO consolidation_request_txs ",
+ dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_request_txs ",
+ }),
+ "(block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, source_index, target_pubkey, target_index, tx_hash, tx_sender, tx_target, dequeue_block)",
+ " VALUES ",
+ )
+ argIdx := 0
+ fieldCount := 14
+
+ args := make([]any, len(consolidationTxs)*fieldCount)
+ for i, consolidationTx := range consolidationTxs {
+ if i > 0 {
+ fmt.Fprintf(&sql, ", ")
+ }
+ fmt.Fprintf(&sql, "(")
+ for f := 0; f < fieldCount; f++ {
+ if f > 0 {
+ fmt.Fprintf(&sql, ", ")
+ }
+ fmt.Fprintf(&sql, "$%v", argIdx+f+1)
+
+ }
+ fmt.Fprintf(&sql, ")")
+
+ args[argIdx+0] = consolidationTx.BlockNumber
+ args[argIdx+1] = consolidationTx.BlockIndex
+ args[argIdx+2] = consolidationTx.BlockTime
+ args[argIdx+3] = consolidationTx.BlockRoot
+ args[argIdx+4] = consolidationTx.ForkId
+ args[argIdx+5] = consolidationTx.SourceAddress
+ args[argIdx+6] = consolidationTx.SourcePubkey
+ args[argIdx+7] = consolidationTx.SourceIndex
+ args[argIdx+8] = consolidationTx.TargetPubkey
+ args[argIdx+9] = consolidationTx.TargetIndex
+ args[argIdx+10] = consolidationTx.TxHash
+ args[argIdx+11] = consolidationTx.TxSender
+ args[argIdx+12] = consolidationTx.TxTarget
+ args[argIdx+13] = consolidationTx.DequeueBlock
+ argIdx += fieldCount
+ }
+ fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
+ dbtypes.DBEnginePgsql: " ON CONFLICT (block_number, block_index) DO UPDATE SET source_index = excluded.source_index, target_index = excluded.target_index, fork_id = excluded.fork_id",
+ dbtypes.DBEngineSqlite: "",
+ }))
+
+ _, err := tx.Exec(sql.String(), args...)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func GetConsolidationRequestTxsByDequeueRange(dequeueFirst uint64, dequeueLast uint64) []*dbtypes.ConsolidationRequestTx {
+ consolidationTxs := []*dbtypes.ConsolidationRequestTx{}
+
+ err := ReaderDb.Select(&consolidationTxs, `SELECT consolidation_request_txs.*
+ FROM consolidation_request_txs
+ WHERE dequeue_block >= $1 AND dequeue_block <= $2
+ ORDER BY dequeue_block ASC, block_number ASC, block_index ASC
+ `, dequeueFirst, dequeueLast)
+ if err != nil {
+ logger.Errorf("Error while fetching consolidation request txs: %v", err)
+ return nil
+ }
+
+ return consolidationTxs
+}
+
+func GetConsolidationRequestTxsByTxHashes(txHashes [][]byte) []*dbtypes.ConsolidationRequestTx {
+ var sql strings.Builder
+ args := []interface{}{}
+
+ fmt.Fprint(&sql, `SELECT consolidation_request_txs.*
+ FROM consolidation_request_txs
+ WHERE tx_hash IN (
+ `)
+
+ for idx, txHash := range txHashes {
+ if idx > 0 {
+ fmt.Fprintf(&sql, ", ")
+ }
+ args = append(args, txHash)
+ fmt.Fprintf(&sql, "$%v", len(args))
+ }
+ fmt.Fprintf(&sql, ")")
+
+ consolidationTxs := []*dbtypes.ConsolidationRequestTx{}
+ err := ReaderDb.Select(&consolidationTxs, sql.String(), args...)
+ if err != nil {
+ logger.Errorf("Error while fetching consolidation request txs: %v", err)
+ return nil
+ }
+
+ return consolidationTxs
+}
+
+func GetConsolidationRequestTxsFiltered(offset uint64, limit uint32, canonicalForkIds []uint64, filter *dbtypes.ConsolidationRequestTxFilter) ([]*dbtypes.ConsolidationRequestTx, uint64, error) {
+ var sql strings.Builder
+ args := []interface{}{}
+ fmt.Fprint(&sql, `
+ WITH cte AS (
+ SELECT
+ block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, source_index, target_pubkey, target_index, tx_hash, tx_sender, tx_target, dequeue_block
+ FROM consolidation_request_txs
+ `)
+
+ if filter.SrcValidatorName != "" {
+ fmt.Fprint(&sql, `
+ LEFT JOIN validator_names AS source_names ON source_names."index" = consolidation_request_txs.source_index
+ `)
+ }
+ if filter.TgtValidatorName != "" {
+ fmt.Fprint(&sql, `
+ LEFT JOIN validator_names AS target_names ON target_names."index" = consolidation_request_txs.target_index
+ `)
+ }
+
+ filterOp := "WHERE"
+ if filter.MinDequeue > 0 {
+ args = append(args, filter.MinDequeue)
+ fmt.Fprintf(&sql, " %v dequeue_block >= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.MaxDequeue > 0 {
+ args = append(args, filter.MaxDequeue)
+ fmt.Fprintf(&sql, " %v dequeue_block <= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if len(filter.SourceAddress) > 0 {
+ args = append(args, filter.SourceAddress)
+ fmt.Fprintf(&sql, " %v source_address = $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.MinSrcIndex > 0 {
+ args = append(args, filter.MinSrcIndex)
+ fmt.Fprintf(&sql, " %v source_index >= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.MaxSrcIndex > 0 {
+ args = append(args, filter.MaxSrcIndex)
+ fmt.Fprintf(&sql, " %v source_index <= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.MinTgtIndex > 0 {
+ args = append(args, filter.MinTgtIndex)
+ fmt.Fprintf(&sql, " %v target_index >= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.MaxTgtIndex > 0 {
+ args = append(args, filter.MaxTgtIndex)
+ fmt.Fprintf(&sql, " %v target_index <= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.SrcValidatorName != "" {
+ args = append(args, "%"+filter.SrcValidatorName+"%")
+ fmt.Fprintf(&sql, " %v ", filterOp)
+ fmt.Fprintf(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
+ dbtypes.DBEnginePgsql: ` source_names.name ilike $%v `,
+ dbtypes.DBEngineSqlite: ` source_names.name LIKE $%v `,
+ }), len(args))
+ filterOp = "AND"
+ }
+ if filter.TgtValidatorName != "" {
+ args = append(args, "%"+filter.TgtValidatorName+"%")
+ fmt.Fprintf(&sql, " %v ", filterOp)
+ fmt.Fprintf(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
+ dbtypes.DBEnginePgsql: ` target_names.name ilike $%v `,
+ dbtypes.DBEngineSqlite: ` target_names.name LIKE $%v `,
+ }), len(args))
+ filterOp = "AND"
+ }
+
+ if filter.WithOrphaned != 1 {
+ forkIdStr := make([]string, len(canonicalForkIds))
+ for i, forkId := range canonicalForkIds {
+ forkIdStr[i] = fmt.Sprintf("%v", forkId)
+ }
+ if len(forkIdStr) == 0 {
+ forkIdStr = append(forkIdStr, "0")
+ }
+
+ if filter.WithOrphaned == 0 {
+ fmt.Fprintf(&sql, " %v fork_id IN (%v)", filterOp, strings.Join(forkIdStr, ","))
+ filterOp = "AND"
+ } else if filter.WithOrphaned == 2 {
+ fmt.Fprintf(&sql, " %v fork_id NOT IN (%v)", filterOp, strings.Join(forkIdStr, ","))
+ filterOp = "AND"
+ }
+ }
+
+ args = append(args, limit)
+ fmt.Fprintf(&sql, `)
+ SELECT
+ count(*) AS block_number,
+ 0 AS block_index,
+ 0 AS block_time,
+ null AS block_root,
+ 0 AS fork_id,
+ null AS source_address,
+ 0 AS source_index,
+ null AS source_pubkey,
+ 0 AS target_index,
+ null AS target_pubkey,
+ null AS tx_hash,
+ null AS tx_sender,
+ null AS tx_target,
+ 0 AS dequeue_block
+ FROM cte
+ UNION ALL SELECT * FROM (
+ SELECT * FROM cte
+ ORDER BY block_time DESC
+ LIMIT $%v
+ `, len(args))
+
+ if offset > 0 {
+ args = append(args, offset)
+ fmt.Fprintf(&sql, " OFFSET $%v ", len(args))
+ }
+ fmt.Fprintf(&sql, ") AS t1")
+
+ consolidationRequestTxs := []*dbtypes.ConsolidationRequestTx{}
+ err := ReaderDb.Select(&consolidationRequestTxs, sql.String(), args...)
+ if err != nil {
+ logger.Errorf("Error while fetching filtered consolidation request txs: %v", err)
+ return nil, 0, err
+ }
+
+ return consolidationRequestTxs[1:], consolidationRequestTxs[0].BlockNumber, nil
+}
diff --git a/db/consolidation_requests.go b/db/consolidation_requests.go
index 574c762c..c5f12470 100644
--- a/db/consolidation_requests.go
+++ b/db/consolidation_requests.go
@@ -15,11 +15,11 @@ func InsertConsolidationRequests(consolidations []*dbtypes.ConsolidationRequest,
dbtypes.DBEnginePgsql: "INSERT INTO consolidation_requests ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_requests ",
}),
- "(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash)",
+ "(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash, block_number)",
" VALUES ",
)
argIdx := 0
- fieldCount := 11
+ fieldCount := 12
args := make([]interface{}, len(consolidations)*fieldCount)
for i, consolidation := range consolidations {
@@ -46,6 +46,7 @@ func InsertConsolidationRequests(consolidations []*dbtypes.ConsolidationRequest,
args[argIdx+8] = consolidation.TargetIndex
args[argIdx+9] = consolidation.TargetPubkey[:]
args[argIdx+10] = consolidation.TxHash[:]
+ args[argIdx+11] = consolidation.BlockNumber
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
@@ -65,7 +66,7 @@ func GetConsolidationRequestsFiltered(offset uint64, limit uint32, finalizedBloc
fmt.Fprint(&sql, `
WITH cte AS (
SELECT
- slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash
+ slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash, block_number
FROM consolidation_requests
`)
@@ -158,7 +159,8 @@ func GetConsolidationRequestsFiltered(offset uint64, limit uint32, finalizedBloc
null AS source_pubkey,
0 AS target_index,
null AS target_pubkey,
- null AS tx_hash
+ null AS tx_hash,
+ 0 AS block_number
FROM cte
UNION ALL SELECT * FROM (
SELECT * FROM cte
@@ -181,3 +183,28 @@ func GetConsolidationRequestsFiltered(offset uint64, limit uint32, finalizedBloc
return consolidationRequests[1:], consolidationRequests[0].SlotNumber, nil
}
+
+func GetConsolidationRequestsByElBlockRange(firstSlot uint64, lastSlot uint64) []*dbtypes.ConsolidationRequest {
+ consolidationRequests := []*dbtypes.ConsolidationRequest{}
+
+ err := ReaderDb.Select(&consolidationRequests, `
+ SELECT consolidation_requests.*
+ FROM consolidation_requests
+ WHERE block_number >= $1 AND block_number <= $2
+ ORDER BY block_number ASC, slot_index ASC
+ `, firstSlot, lastSlot)
+ if err != nil {
+ logger.Errorf("Error while fetching consolidation requests: %v", err)
+ return nil
+ }
+
+ return consolidationRequests
+}
+
+func UpdateConsolidationRequestTxHash(slotRoot []byte, slotIndex uint64, txHash []byte, tx *sqlx.Tx) error {
+ _, err := tx.Exec(`UPDATE consolidation_requests SET tx_hash = $1 WHERE slot_root = $2 AND slot_index = $3`, txHash, slotRoot, slotIndex)
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/db/forks.go b/db/forks.go
index 0d21bc73..74cb3318 100644
--- a/db/forks.go
+++ b/db/forks.go
@@ -137,3 +137,17 @@ func UpdateForkParent(parentRoot []byte, parentForkId uint64, tx *sqlx.Tx) error
return nil
}
+
+func GetForkById(forkId uint64) *dbtypes.Fork {
+ var fork dbtypes.Fork
+
+ err := ReaderDb.Get(&fork, `SELECT fork_id, base_slot, base_root, leaf_slot, leaf_root, parent_fork
+ FROM forks
+ WHERE fork_id = $1
+ `, forkId)
+ if err != nil {
+ return nil
+ }
+
+ return &fork
+}
diff --git a/db/schema/pgsql/20240805095505_pectra-updates2.sql b/db/schema/pgsql/20240805095505_pectra-updates2.sql
index 947b0fa9..0f67fd1f 100644
--- a/db/schema/pgsql/20240805095505_pectra-updates2.sql
+++ b/db/schema/pgsql/20240805095505_pectra-updates2.sql
@@ -4,7 +4,7 @@
DROP TABLE IF EXISTS public."consolidations";
CREATE TABLE IF NOT EXISTS public."consolidation_requests" (
- slot_number INT NOT NULL,
+ slot_number BIGINT NOT NULL,
slot_root bytea NOT NULL,
slot_index INT NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
@@ -42,7 +42,7 @@ CREATE INDEX IF NOT EXISTS "consolidation_requests_fork_idx"
DROP TABLE IF EXISTS public."el_requests";
CREATE TABLE IF NOT EXISTS public."withdrawal_requests" (
- slot_number INT NOT NULL,
+ slot_number BIGINT NOT NULL,
slot_root bytea NOT NULL,
slot_index INT NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
diff --git a/db/schema/pgsql/20241006182734_pectra-updates3.sql b/db/schema/pgsql/20241006182734_pectra-updates3.sql
new file mode 100644
index 00000000..3a853240
--- /dev/null
+++ b/db/schema/pgsql/20241006182734_pectra-updates3.sql
@@ -0,0 +1,130 @@
+-- +goose Up
+-- +goose StatementBegin
+
+CREATE TABLE IF NOT EXISTS public."consolidation_request_txs" (
+ block_number BIGINT NOT NULL,
+ block_index INT NOT NULL,
+ block_time BIGINT NOT NULL,
+ block_root bytea NOT NULL,
+ fork_id BIGINT NOT NULL DEFAULT 0,
+ source_address bytea NOT NULL,
+ source_pubkey bytea NULL,
+ source_index BIGINT NULL,
+ target_pubkey bytea NULL,
+ target_index BIGINT NULL,
+ tx_hash bytea NULL,
+ tx_sender bytea NOT NULL,
+ tx_target bytea NOT NULL,
+ dequeue_block BIGINT NOT NULL,
+ CONSTRAINT consolidation_pkey PRIMARY KEY (block_root, block_index)
+);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_block_number_idx"
+ ON public."consolidation_request_txs"
+ ("block_number" ASC NULLS FIRST);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_addr_idx"
+ ON public."consolidation_request_txs"
+ ("source_address" ASC NULLS FIRST);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_idx"
+ ON public."consolidation_request_txs"
+ ("source_index" ASC NULLS FIRST);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_target_idx"
+ ON public."consolidation_request_txs"
+ ("target_index" ASC NULLS FIRST);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_tx_hash_idx"
+ ON public."consolidation_request_txs"
+ ("tx_hash" ASC);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_fork_idx"
+ ON public."consolidation_request_txs"
+ ("fork_id" ASC NULLS FIRST);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_dequeue_block_idx"
+ ON public."consolidation_request_txs"
+ ("dequeue_block" ASC NULLS FIRST);
+
+-- add block_number to consolidation_requests
+ALTER TABLE public."consolidation_requests"
+ ADD "block_number" BIGINT NOT NULL DEFAULT 0;
+
+UPDATE public."consolidation_requests"
+ SET "block_number" = (
+ SELECT eth_block_number
+ FROM public."slots"
+ WHERE public."slots".root = public."consolidation_requests".slot_root
+ );
+
+CREATE INDEX IF NOT EXISTS "consolidation_requests_block_number_idx"
+ ON public."consolidation_requests"
+ ("block_number" ASC NULLS FIRST);
+
+CREATE TABLE IF NOT EXISTS public."withdrawal_request_txs" (
+ block_number BIGINT NOT NULL,
+ block_index INT NOT NULL,
+ block_time BIGINT NOT NULL,
+ block_root bytea NOT NULL,
+ fork_id BIGINT NOT NULL DEFAULT 0,
+ source_address bytea NOT NULL,
+ validator_pubkey bytea NOT NULL,
+ validator_index BIGINT NULL,
+ amount BIGINT NOT NULL,
+ tx_hash bytea NULL,
+ tx_sender bytea NOT NULL,
+ tx_target bytea NOT NULL,
+ dequeue_block BIGINT NOT NULL,
+ CONSTRAINT withdrawal_request_txs_pkey PRIMARY KEY (block_root, block_index)
+);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_block_number_idx"
+ ON public."withdrawal_request_txs"
+ ("block_number" ASC NULLS FIRST);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_source_addr_idx"
+ ON public."withdrawal_request_txs"
+ ("source_address" ASC NULLS FIRST);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_validator_index_idx"
+ ON public."withdrawal_request_txs"
+ ("validator_index" ASC NULLS FIRST);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_amount_idx"
+ ON public."withdrawal_request_txs"
+ ("amount" ASC NULLS FIRST);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_tx_hash_idx"
+ ON public."withdrawal_request_txs"
+ ("tx_hash" ASC);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_fork_idx"
+ ON public."withdrawal_request_txs"
+ ("fork_id" ASC NULLS FIRST);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_dequeue_block_idx"
+ ON public."withdrawal_request_txs"
+ ("dequeue_block" ASC NULLS FIRST);
+
+-- add block_number to withdrawal_requests
+ALTER TABLE public."withdrawal_requests"
+ ADD "block_number" BIGINT NOT NULL DEFAULT 0;
+
+UPDATE public."withdrawal_requests"
+ SET "block_number" = (
+ SELECT eth_block_number
+ FROM public."slots"
+ WHERE public."slots".root = public."withdrawal_requests".slot_root
+ );
+
+CREATE INDEX IF NOT EXISTS "withdrawal_requests_block_number_idx"
+ ON public."withdrawal_requests"
+ ("block_number" ASC NULLS FIRST);
+
+
+-- +goose StatementEnd
+-- +goose Down
+-- +goose StatementBegin
+SELECT 'NOT SUPPORTED';
+-- +goose StatementEnd
diff --git a/db/schema/sqlite/20240805095505_pectra-updates2.sql b/db/schema/sqlite/20240805095505_pectra-updates2.sql
index e2e4d6da..dc926161 100644
--- a/db/schema/sqlite/20240805095505_pectra-updates2.sql
+++ b/db/schema/sqlite/20240805095505_pectra-updates2.sql
@@ -4,7 +4,7 @@
DROP TABLE IF EXISTS "consolidations";
CREATE TABLE IF NOT EXISTS "consolidation_requests" (
- slot_number INT NOT NULL,
+ slot_number BIGINT NOT NULL,
slot_root BLOB NOT NULL,
slot_index INT NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
@@ -42,7 +42,7 @@ CREATE INDEX IF NOT EXISTS "consolidation_requests_fork_idx"
DROP TABLE IF EXISTS "el_requests";
CREATE TABLE IF NOT EXISTS "withdrawal_requests" (
- slot_number INT NOT NULL,
+ slot_number BIGINT NOT NULL,
slot_root BLOB NOT NULL,
slot_index INT NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
diff --git a/db/schema/sqlite/20241006182734_pectra-updates3.sql b/db/schema/sqlite/20241006182734_pectra-updates3.sql
new file mode 100644
index 00000000..65753830
--- /dev/null
+++ b/db/schema/sqlite/20241006182734_pectra-updates3.sql
@@ -0,0 +1,129 @@
+-- +goose Up
+-- +goose StatementBegin
+
+CREATE TABLE IF NOT EXISTS "consolidation_request_txs" (
+ block_number BIGINT NOT NULL,
+ block_index INT NOT NULL,
+ block_time BIGINT NOT NULL,
+ block_root BLOB NOT NULL,
+ fork_id BIGINT NOT NULL DEFAULT 0,
+ source_address BLOB NOT NULL,
+ source_pubkey BLOB NULL,
+ source_index BIGINT NULL,
+ target_pubkey BLOB NULL,
+ target_index BIGINT NULL,
+ tx_hash BLOB NULL,
+ tx_sender BLOB NOT NULL,
+ tx_target BLOB NOT NULL,
+ dequeue_block BIGINT NOT NULL,
+ CONSTRAINT consolidation_pkey PRIMARY KEY (block_root, block_index)
+);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_block_number_idx"
+ ON "consolidation_request_txs"
+ ("block_number" ASC);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_addr_idx"
+ ON "consolidation_request_txs"
+ ("source_address" ASC);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_idx"
+ ON "consolidation_request_txs"
+ ("source_index" ASC);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_target_idx"
+ ON "consolidation_request_txs"
+ ("target_index" ASC);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_tx_hash_idx"
+ ON "consolidation_request_txs"
+ ("tx_hash" ASC);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_fork_idx"
+ ON "consolidation_request_txs"
+ ("fork_id" ASC);
+
+CREATE INDEX IF NOT EXISTS "consolidation_request_txs_dequeue_block_idx"
+ ON "consolidation_request_txs"
+ ("dequeue_block" ASC);
+
+-- add block_number to consolidation_requests
+ALTER TABLE "consolidation_requests"
+ ADD "block_number" BIGINT NOT NULL DEFAULT 0;
+
+UPDATE "consolidation_requests"
+ SET "block_number" = (
+ SELECT eth_block_number
+ FROM "slots"
+ WHERE "slots".root = "consolidation_requests".slot_root
+ );
+
+CREATE INDEX IF NOT EXISTS "consolidation_requests_block_number_idx"
+ ON "consolidation_requests"
+ ("block_number" ASC);
+
+CREATE TABLE IF NOT EXISTS "withdrawal_request_txs" (
+ block_number BIGINT NOT NULL,
+ block_index INT NOT NULL,
+ block_time BIGINT NOT NULL,
+ block_root BLOB NOT NULL,
+ fork_id BIGINT NOT NULL DEFAULT 0,
+ source_address BLOB NOT NULL,
+ validator_pubkey BLOB NOT NULL,
+ validator_index BIGINT NULL,
+ amount BIGINT NOT NULL,
+ tx_hash BLOB NULL,
+ tx_sender BLOB NOT NULL,
+ tx_target BLOB NOT NULL,
+ dequeue_block BIGINT NOT NULL,
+ CONSTRAINT withdrawal_request_txs_pkey PRIMARY KEY (block_root, block_index)
+);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_block_number_idx"
+ ON "withdrawal_request_txs"
+ ("block_number" ASC);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_source_addr_idx"
+ ON "withdrawal_request_txs"
+ ("source_address" ASC);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_validator_index_idx"
+ ON "withdrawal_request_txs"
+ ("validator_index" ASC);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_amount_idx"
+ ON "withdrawal_request_txs"
+ ("amount" ASC);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_tx_hash_idx"
+ ON "withdrawal_request_txs"
+ ("tx_hash" ASC);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_fork_idx"
+ ON "withdrawal_request_txs"
+ ("fork_id" ASC);
+
+CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_dequeue_block_idx"
+ ON "withdrawal_request_txs"
+ ("dequeue_block" ASC);
+
+-- add block_number to withdrawal_requests
+ALTER TABLE "withdrawal_requests"
+ ADD "block_number" BIGINT NOT NULL DEFAULT 0;
+
+UPDATE "withdrawal_requests"
+ SET "block_number" = (
+ SELECT eth_block_number
+ FROM "slots"
+ WHERE "slots".root = "withdrawal_requests".slot_root
+ );
+
+CREATE INDEX IF NOT EXISTS "withdrawal_requests_block_number_idx"
+ ON "withdrawal_requests"
+ ("block_number" ASC);
+
+-- +goose StatementEnd
+-- +goose Down
+-- +goose StatementBegin
+SELECT 'NOT SUPPORTED';
+-- +goose StatementEnd
diff --git a/db/slots.go b/db/slots.go
index 3e2f8064..fb6ec06e 100644
--- a/db/slots.go
+++ b/db/slots.go
@@ -385,7 +385,6 @@ func GetHighestRootBeforeSlot(slot uint64, withOrphaned bool) []byte {
SELECT root FROM slots WHERE slot < $1 `+statusFilter+` AND status != 0 ORDER BY slot DESC LIMIT 1
`, slot)
if err != nil {
- logger.Errorf("Error while fetching highest root before %v: %v", slot, err)
return nil
}
return result
diff --git a/db/withdrawal_request_txs.go b/db/withdrawal_request_txs.go
new file mode 100644
index 00000000..5aaeb050
--- /dev/null
+++ b/db/withdrawal_request_txs.go
@@ -0,0 +1,227 @@
+package db
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/ethpandaops/dora/dbtypes"
+ "github.com/jmoiron/sqlx"
+)
+
+func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx *sqlx.Tx) error {
+ var sql strings.Builder
+ fmt.Fprint(&sql,
+ EngineQuery(map[dbtypes.DBEngineType]string{
+ dbtypes.DBEnginePgsql: "INSERT INTO withdrawal_request_txs ",
+ dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO withdrawal_request_txs ",
+ }),
+ "(block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, validator_index, amount, tx_hash, tx_sender, tx_target, dequeue_block)",
+ " VALUES ",
+ )
+ argIdx := 0
+ fieldCount := 13
+
+ args := make([]any, len(withdrawalTxs)*fieldCount)
+ for i, withdrawalTx := range withdrawalTxs {
+ if i > 0 {
+ fmt.Fprintf(&sql, ", ")
+ }
+ fmt.Fprintf(&sql, "(")
+ for f := 0; f < fieldCount; f++ {
+ if f > 0 {
+ fmt.Fprintf(&sql, ", ")
+ }
+ fmt.Fprintf(&sql, "$%v", argIdx+f+1)
+
+ }
+ fmt.Fprintf(&sql, ")")
+
+ args[argIdx+0] = withdrawalTx.BlockNumber
+ args[argIdx+1] = withdrawalTx.BlockIndex
+ args[argIdx+2] = withdrawalTx.BlockTime
+ args[argIdx+3] = withdrawalTx.BlockRoot
+ args[argIdx+4] = withdrawalTx.ForkId
+ args[argIdx+5] = withdrawalTx.SourceAddress
+ args[argIdx+6] = withdrawalTx.ValidatorPubkey
+ args[argIdx+7] = withdrawalTx.ValidatorIndex
+ args[argIdx+8] = withdrawalTx.Amount
+ args[argIdx+9] = withdrawalTx.TxHash
+ args[argIdx+10] = withdrawalTx.TxSender
+ args[argIdx+11] = withdrawalTx.TxTarget
+ args[argIdx+12] = withdrawalTx.DequeueBlock
+ argIdx += fieldCount
+ }
+ fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
+ dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET fork_id = excluded.fork_id",
+ dbtypes.DBEngineSqlite: "",
+ }))
+
+ _, err := tx.Exec(sql.String(), args...)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func GetWithdrawalRequestTxsByDequeueRange(dequeueFirst uint64, dequeueLast uint64) []*dbtypes.WithdrawalRequestTx {
+ withdrawalTxs := []*dbtypes.WithdrawalRequestTx{}
+
+ err := ReaderDb.Select(&withdrawalTxs, `SELECT withdrawal_request_txs.*
+ FROM withdrawal_request_txs
+ WHERE dequeue_block >= $1 AND dequeue_block <= $2
+ ORDER BY dequeue_block ASC, block_number ASC, block_index ASC
+ `, dequeueFirst, dequeueLast)
+ if err != nil {
+ logger.Errorf("Error while fetching withdrawal request transactions: %v", err)
+ return nil
+ }
+
+ return withdrawalTxs
+}
+
+func GetWithdrawalRequestTxsByTxHashes(txHashes [][]byte) []*dbtypes.WithdrawalRequestTx {
+ var sql strings.Builder
+ args := []interface{}{}
+
+ fmt.Fprint(&sql, `SELECT withdrawal_request_txs.*
+ FROM withdrawal_request_txs
+ WHERE tx_hash IN (
+ `)
+
+ for idx, txHash := range txHashes {
+ if idx > 0 {
+ fmt.Fprintf(&sql, ", ")
+ }
+ args = append(args, txHash)
+ fmt.Fprintf(&sql, "$%v", len(args))
+ }
+ fmt.Fprintf(&sql, ")")
+
+ withdrawalTxs := []*dbtypes.WithdrawalRequestTx{}
+ err := ReaderDb.Select(&withdrawalTxs, sql.String(), args...)
+ if err != nil {
+ logger.Errorf("Error while fetching withdrawal request txs: %v", err)
+ return nil
+ }
+
+ return withdrawalTxs
+}
+
+func GetWithdrawalRequestTxsFiltered(offset uint64, limit uint32, canonicalForkIds []uint64, filter *dbtypes.WithdrawalRequestTxFilter) ([]*dbtypes.WithdrawalRequestTx, uint64, error) {
+ var sql strings.Builder
+ args := []interface{}{}
+ fmt.Fprint(&sql, `
+ WITH cte AS (
+ SELECT
+ block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, validator_index, amount, tx_hash, tx_sender, tx_target, dequeue_block
+ FROM withdrawal_request_txs
+ `)
+
+ if filter.ValidatorName != "" {
+ fmt.Fprint(&sql, `
+ LEFT JOIN validator_names AS source_names ON source_names."index" = withdrawal_request_txs.validator_index
+ `)
+ }
+
+ filterOp := "WHERE"
+ if filter.MinDequeue > 0 {
+ args = append(args, filter.MinDequeue)
+ fmt.Fprintf(&sql, " %v dequeue_block >= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.MaxDequeue > 0 {
+ args = append(args, filter.MaxDequeue)
+ fmt.Fprintf(&sql, " %v dequeue_block <= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if len(filter.SourceAddress) > 0 {
+ args = append(args, filter.SourceAddress)
+ fmt.Fprintf(&sql, " %v source_address = $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.MinIndex > 0 {
+ args = append(args, filter.MinIndex)
+ fmt.Fprintf(&sql, " %v validator_index >= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.MaxIndex > 0 {
+ args = append(args, filter.MaxIndex)
+ fmt.Fprintf(&sql, " %v validator_index <= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.ValidatorName != "" {
+ args = append(args, "%"+filter.ValidatorName+"%")
+ fmt.Fprintf(&sql, " %v ", filterOp)
+ fmt.Fprintf(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
+ dbtypes.DBEnginePgsql: ` source_names.name ilike $%v `,
+ dbtypes.DBEngineSqlite: ` source_names.name LIKE $%v `,
+ }), len(args))
+ filterOp = "AND"
+ }
+ if filter.MinAmount != nil {
+ args = append(args, *filter.MinAmount)
+ fmt.Fprintf(&sql, " %v amount >= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+ if filter.MaxAmount != nil {
+ args = append(args, *filter.MaxAmount)
+ fmt.Fprintf(&sql, " %v amount <= $%v", filterOp, len(args))
+ filterOp = "AND"
+ }
+
+ if filter.WithOrphaned != 1 {
+ forkIdStr := make([]string, len(canonicalForkIds))
+ for i, forkId := range canonicalForkIds {
+ forkIdStr[i] = fmt.Sprintf("%v", forkId)
+ }
+ if len(forkIdStr) == 0 {
+ forkIdStr = append(forkIdStr, "0")
+ }
+
+ if filter.WithOrphaned == 0 {
+ fmt.Fprintf(&sql, " %v fork_id IN (%v)", filterOp, strings.Join(forkIdStr, ","))
+ filterOp = "AND"
+ } else if filter.WithOrphaned == 2 {
+ fmt.Fprintf(&sql, " %v fork_id NOT IN (%v)", filterOp, strings.Join(forkIdStr, ","))
+ filterOp = "AND"
+ }
+ }
+
+ args = append(args, limit)
+ fmt.Fprintf(&sql, `)
+ SELECT
+ count(*) AS block_number,
+ 0 AS block_index,
+ 0 AS block_time,
+ null AS block_root,
+ 0 AS fork_id,
+ null AS source_address,
+ 0 AS validator_index,
+ null AS validator_pubkey,
+ 0 AS amount,
+ null AS tx_hash,
+ null AS tx_sender,
+ null AS tx_target,
+ 0 AS dequeue_block
+ FROM cte
+ UNION ALL SELECT * FROM (
+ SELECT * FROM cte
+ ORDER BY block_time DESC
+ LIMIT $%v
+ `, len(args))
+
+ if offset > 0 {
+ args = append(args, offset)
+ fmt.Fprintf(&sql, " OFFSET $%v ", len(args))
+ }
+ fmt.Fprintf(&sql, ") AS t1")
+
+ withdrawalRequestTxs := []*dbtypes.WithdrawalRequestTx{}
+ err := ReaderDb.Select(&withdrawalRequestTxs, sql.String(), args...)
+ if err != nil {
+ logger.Errorf("Error while fetching filtered withdrawal request txs: %v", err)
+ return nil, 0, err
+ }
+
+ return withdrawalRequestTxs[1:], withdrawalRequestTxs[0].BlockNumber, nil
+}
diff --git a/db/withdrawal_requests.go b/db/withdrawal_requests.go
index 04d6768a..88062b9f 100644
--- a/db/withdrawal_requests.go
+++ b/db/withdrawal_requests.go
@@ -15,11 +15,11 @@ func InsertWithdrawalRequests(elRequests []*dbtypes.WithdrawalRequest, tx *sqlx.
dbtypes.DBEnginePgsql: "INSERT INTO withdrawal_requests ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO withdrawal_requests ",
}),
- "(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, validator_index, validator_pubkey, amount, tx_hash)",
+ "(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, validator_index, validator_pubkey, amount, tx_hash, block_number)",
" VALUES ",
)
argIdx := 0
- fieldCount := 10
+ fieldCount := 11
args := make([]any, len(elRequests)*fieldCount)
for i, elRequest := range elRequests {
@@ -46,6 +46,7 @@ func InsertWithdrawalRequests(elRequests []*dbtypes.WithdrawalRequest, tx *sqlx.
args[argIdx+7] = elRequest.ValidatorPubkey
args[argIdx+8] = elRequest.Amount
args[argIdx+9] = elRequest.TxHash
+ args[argIdx+10] = elRequest.BlockNumber
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
@@ -66,7 +67,7 @@ func GetWithdrawalRequestsFiltered(offset uint64, limit uint32, finalizedBlock u
fmt.Fprint(&sql, `
WITH cte AS (
SELECT
- slot_number, slot_index, slot_root, orphaned, fork_id, source_address, validator_index, validator_pubkey, amount
+ slot_number, slot_index, slot_root, orphaned, fork_id, source_address, validator_index, validator_pubkey, amount, tx_hash, block_number
FROM withdrawal_requests
`)
@@ -143,7 +144,9 @@ func GetWithdrawalRequestsFiltered(offset uint64, limit uint32, finalizedBlock u
null AS source_address,
0 AS validator_index,
null AS validator_pubkey,
- 0 AS amount
+ 0 AS amount,
+ null AS tx_hash,
+ 0 AS block_number
FROM cte
UNION ALL SELECT * FROM (
SELECT * FROM cte
@@ -166,3 +169,28 @@ func GetWithdrawalRequestsFiltered(offset uint64, limit uint32, finalizedBlock u
return withdrawalRequests[1:], withdrawalRequests[0].SlotNumber, nil
}
+
+func GetWithdrawalRequestsByElBlockRange(firstSlot uint64, lastSlot uint64) []*dbtypes.WithdrawalRequest {
+ withdrawalRequests := []*dbtypes.WithdrawalRequest{}
+
+ err := ReaderDb.Select(&withdrawalRequests, `
+ SELECT withdrawal_requests.*
+ FROM withdrawal_requests
+ WHERE block_number >= $1 AND block_number <= $2
+ ORDER BY block_number ASC, slot_index ASC
+ `, firstSlot, lastSlot)
+ if err != nil {
+ logger.Errorf("Error while fetching withdrawal requests: %v", err)
+ return nil
+ }
+
+ return withdrawalRequests
+}
+
+func UpdateWithdrawalRequestTxHash(slotRoot []byte, slotIndex uint64, txHash []byte, tx *sqlx.Tx) error {
+ _, err := tx.Exec(`UPDATE withdrawal_requests SET tx_hash = $1 WHERE slot_root = $2 AND slot_index = $3`, txHash, slotRoot, slotIndex)
+ if err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/dbtypes/dbtypes.go b/dbtypes/dbtypes.go
index 5e015950..a55b23ef 100644
--- a/dbtypes/dbtypes.go
+++ b/dbtypes/dbtypes.go
@@ -263,6 +263,24 @@ type ConsolidationRequest struct {
TargetIndex *uint64 `db:"target_index"`
TargetPubkey []byte `db:"target_pubkey"`
TxHash []byte `db:"tx_hash"`
+ BlockNumber uint64 `db:"block_number"`
+}
+
+type ConsolidationRequestTx struct {
+ BlockNumber uint64 `db:"block_number"`
+ BlockIndex uint64 `db:"block_index"`
+ BlockTime uint64 `db:"block_time"`
+ BlockRoot []byte `db:"block_root"`
+ ForkId uint64 `db:"fork_id"`
+ SourceAddress []byte `db:"source_address"`
+ SourcePubkey []byte `db:"source_pubkey"`
+ SourceIndex *uint64 `db:"source_index"`
+ TargetPubkey []byte `db:"target_pubkey"`
+ TargetIndex *uint64 `db:"target_index"`
+ TxHash []byte `db:"tx_hash"`
+ TxSender []byte `db:"tx_sender"`
+ TxTarget []byte `db:"tx_target"`
+ DequeueBlock uint64 `db:"dequeue_block"`
}
type WithdrawalRequest struct {
@@ -276,4 +294,21 @@ type WithdrawalRequest struct {
ValidatorPubkey []byte `db:"validator_pubkey"`
Amount uint64 `db:"amount"`
TxHash []byte `db:"tx_hash"`
+ BlockNumber uint64 `db:"block_number"`
+}
+
+type WithdrawalRequestTx struct {
+ BlockNumber uint64 `db:"block_number"`
+ BlockIndex uint64 `db:"block_index"`
+ BlockTime uint64 `db:"block_time"`
+ BlockRoot []byte `db:"block_root"`
+ ForkId uint64 `db:"fork_id"`
+ SourceAddress []byte `db:"source_address"`
+ ValidatorPubkey []byte `db:"validator_pubkey"`
+ ValidatorIndex *uint64 `db:"validator_index"`
+ Amount uint64 `db:"amount"`
+ TxHash []byte `db:"tx_hash"`
+ TxSender []byte `db:"tx_sender"`
+ TxTarget []byte `db:"tx_target"`
+ DequeueBlock uint64 `db:"dequeue_block"`
}
diff --git a/dbtypes/other.go b/dbtypes/other.go
index 1442936b..d8842cdc 100644
--- a/dbtypes/other.go
+++ b/dbtypes/other.go
@@ -104,6 +104,18 @@ type WithdrawalRequestFilter struct {
WithOrphaned uint8
}
+type WithdrawalRequestTxFilter struct {
+ MinDequeue uint64
+ MaxDequeue uint64
+ SourceAddress []byte
+ MinIndex uint64
+ MaxIndex uint64
+ ValidatorName string
+ MinAmount *uint64
+ MaxAmount *uint64
+ WithOrphaned uint8
+}
+
type ConsolidationRequestFilter struct {
MinSlot uint64
MaxSlot uint64
@@ -116,3 +128,16 @@ type ConsolidationRequestFilter struct {
TgtValidatorName string
WithOrphaned uint8
}
+
+type ConsolidationRequestTxFilter struct {
+ MinDequeue uint64
+ MaxDequeue uint64
+ SourceAddress []byte
+ MinSrcIndex uint64
+ MaxSrcIndex uint64
+ SrcValidatorName string
+ MinTgtIndex uint64
+ MaxTgtIndex uint64
+ TgtValidatorName string
+ WithOrphaned uint8
+}
diff --git a/handlers/el_consolidations.go b/handlers/el_consolidations.go
index 3b2e17fd..18d4e497 100644
--- a/handlers/el_consolidations.go
+++ b/handlers/el_consolidations.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"fmt"
"net/http"
"net/url"
@@ -8,7 +9,9 @@ import (
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethpandaops/dora/db"
"github.com/ethpandaops/dora/dbtypes"
+ "github.com/ethpandaops/dora/indexer/beacon"
"github.com/ethpandaops/dora/services"
"github.com/ethpandaops/dora/templates"
"github.com/ethpandaops/dora/types/models"
@@ -174,7 +177,7 @@ func buildFilteredElConsolidationsPageData(pageIdx uint64, pageSize uint64, minS
pageData.PrevPageIndex = pageIdx - 1
}
- // load voluntary exits
+ // load consolidation requests
consolidationRequestFilter := &dbtypes.ConsolidationRequestFilter{
MinSlot: minSlot,
MaxSlot: maxSlot,
@@ -190,11 +193,28 @@ func buildFilteredElConsolidationsPageData(pageIdx uint64, pageSize uint64, minS
dbElConsolidations, totalRows := services.GlobalBeaconService.GetConsolidationRequestsByFilter(consolidationRequestFilter, pageIdx-1, uint32(pageSize))
+ // helper to load tx details for consolidation requests
+ buildTxDetails := func(consolidationTx *dbtypes.ConsolidationRequestTx) *models.ElConsolidationsPageDataConsolidationTxDetails {
+ txDetails := &models.ElConsolidationsPageDataConsolidationTxDetails{
+ BlockNumber: consolidationTx.BlockNumber,
+ BlockHash: fmt.Sprintf("%#x", consolidationTx.BlockRoot),
+ BlockTime: consolidationTx.BlockTime,
+ TxOrigin: common.Address(consolidationTx.TxSender).Hex(),
+ TxTarget: common.Address(consolidationTx.TxTarget).Hex(),
+ TxHash: fmt.Sprintf("%#x", consolidationTx.TxHash),
+ }
+
+ return txDetails
+ }
+
chainState := services.GlobalBeaconService.GetChainState()
validatorSetRsp := services.GlobalBeaconService.GetCachedValidatorSet()
+ matcherHeight := services.GlobalBeaconService.GetConsolidationIndexer().GetMatcherHeight()
+
+ requestTxDetailsFor := [][]byte{}
for _, elConsolidation := range dbElConsolidations {
- elWithdrawalData := &models.ElConsolidationsPageDataConsolidation{
+ elConsolidationData := &models.ElConsolidationsPageDataConsolidation{
SlotNumber: elConsolidation.SlotNumber,
SlotRoot: elConsolidation.SlotRoot,
Time: chainState.SlotToTime(phase0.Slot(elConsolidation.SlotNumber)),
@@ -202,30 +222,79 @@ func buildFilteredElConsolidationsPageData(pageIdx uint64, pageSize uint64, minS
SourceAddr: elConsolidation.SourceAddress,
SourcePublicKey: elConsolidation.SourcePubkey,
TargetPublicKey: elConsolidation.TargetPubkey,
+ TransactionHash: elConsolidation.TxHash,
}
if elConsolidation.SourceIndex != nil {
- elWithdrawalData.SourceValidatorIndex = *elConsolidation.SourceIndex
- elWithdrawalData.SourceValidatorName = services.GlobalBeaconService.GetValidatorName(*elConsolidation.SourceIndex)
+ elConsolidationData.SourceValidatorIndex = *elConsolidation.SourceIndex
+ elConsolidationData.SourceValidatorName = services.GlobalBeaconService.GetValidatorName(*elConsolidation.SourceIndex)
- if uint64(len(validatorSetRsp)) > elWithdrawalData.SourceValidatorIndex && validatorSetRsp[elWithdrawalData.SourceValidatorIndex] != nil {
- elWithdrawalData.SourceValidatorValid = true
+ if uint64(len(validatorSetRsp)) > elConsolidationData.SourceValidatorIndex && validatorSetRsp[elConsolidationData.SourceValidatorIndex] != nil {
+ elConsolidationData.SourceValidatorValid = true
}
}
if elConsolidation.TargetIndex != nil {
- elWithdrawalData.TargetValidatorIndex = *elConsolidation.TargetIndex
- elWithdrawalData.TargetValidatorName = services.GlobalBeaconService.GetValidatorName(*elConsolidation.TargetIndex)
+ elConsolidationData.TargetValidatorIndex = *elConsolidation.TargetIndex
+ elConsolidationData.TargetValidatorName = services.GlobalBeaconService.GetValidatorName(*elConsolidation.TargetIndex)
- if uint64(len(validatorSetRsp)) > elWithdrawalData.TargetValidatorIndex && validatorSetRsp[elWithdrawalData.TargetValidatorIndex] != nil {
- elWithdrawalData.TargetValidatorValid = true
+ if uint64(len(validatorSetRsp)) > elConsolidationData.TargetValidatorIndex && validatorSetRsp[elConsolidationData.TargetValidatorIndex] != nil {
+ elConsolidationData.TargetValidatorValid = true
}
}
- pageData.ElRequests = append(pageData.ElRequests, elWithdrawalData)
+ if len(elConsolidationData.TransactionHash) > 0 {
+ elConsolidationData.LinkedTransaction = true
+ requestTxDetailsFor = append(requestTxDetailsFor, elConsolidationData.TransactionHash)
+ } else if elConsolidation.BlockNumber > matcherHeight {
+ // consolidation request has not been matched with a tx yet, try to find the tx on the fly
+ consolidationRequestTx := db.GetConsolidationRequestTxsByDequeueRange(elConsolidation.BlockNumber, elConsolidation.BlockNumber)
+ if len(consolidationRequestTx) > 1 {
+ forkIds := services.GlobalBeaconService.GetParentForkIds(beacon.ForkKey(elConsolidation.ForkId))
+ isParentFork := func(forkId uint64) bool {
+ for _, parentForkId := range forkIds {
+ if uint64(parentForkId) == forkId {
+ return true
+ }
+ }
+ return false
+ }
+
+ matchingTxs := []*dbtypes.ConsolidationRequestTx{}
+ for _, tx := range consolidationRequestTx {
+ if isParentFork(tx.ForkId) {
+ matchingTxs = append(matchingTxs, tx)
+ }
+ }
+
+ if len(matchingTxs) >= int(elConsolidation.SlotIndex)+1 {
+ elConsolidationData.TransactionHash = matchingTxs[elConsolidation.SlotIndex].TxHash
+ elConsolidationData.LinkedTransaction = true
+ elConsolidationData.TransactionDetails = buildTxDetails(matchingTxs[elConsolidation.SlotIndex])
+ }
+
+ } else if len(consolidationRequestTx) == 1 {
+ elConsolidationData.TransactionHash = consolidationRequestTx[0].TxHash
+ elConsolidationData.LinkedTransaction = true
+ elConsolidationData.TransactionDetails = buildTxDetails(consolidationRequestTx[0])
+ }
+ }
+
+ pageData.ElRequests = append(pageData.ElRequests, elConsolidationData)
}
pageData.RequestCount = uint64(len(pageData.ElRequests))
+ // load tx details for consolidation requests
+ if len(requestTxDetailsFor) > 0 {
+ for _, txDetails := range db.GetConsolidationRequestTxsByTxHashes(requestTxDetailsFor) {
+ for _, elConsolidation := range pageData.ElRequests {
+ if elConsolidation.TransactionHash != nil && bytes.Equal(elConsolidation.TransactionHash, txDetails.TxHash) {
+ elConsolidation.TransactionDetails = buildTxDetails(txDetails)
+ }
+ }
+ }
+ }
+
if pageData.RequestCount > 0 {
pageData.FirstIndex = pageData.ElRequests[0].SlotNumber
pageData.LastIndex = pageData.ElRequests[pageData.RequestCount-1].SlotNumber
diff --git a/handlers/el_withdrawals.go b/handlers/el_withdrawals.go
index 5577ce88..50201d07 100644
--- a/handlers/el_withdrawals.go
+++ b/handlers/el_withdrawals.go
@@ -1,6 +1,7 @@
package handlers
import (
+ "bytes"
"fmt"
"net/http"
"net/url"
@@ -8,7 +9,9 @@ import (
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethpandaops/dora/db"
"github.com/ethpandaops/dora/dbtypes"
+ "github.com/ethpandaops/dora/indexer/beacon"
"github.com/ethpandaops/dora/services"
"github.com/ethpandaops/dora/templates"
"github.com/ethpandaops/dora/types/models"
@@ -180,18 +183,36 @@ func buildFilteredElWithdrawalsPageData(pageIdx uint64, pageSize uint64, minSlot
dbElWithdrawals, totalRows := services.GlobalBeaconService.GetWithdrawalRequestsByFilter(withdrawalRequestFilter, pageIdx-1, uint32(pageSize))
+ // helper to load tx details for withdrawal requests
+ buildTxDetails := func(withdrawalTx *dbtypes.WithdrawalRequestTx) *models.ElWithdrawalsPageDataWithdrawalTxDetails {
+ txDetails := &models.ElWithdrawalsPageDataWithdrawalTxDetails{
+ BlockNumber: withdrawalTx.BlockNumber,
+ BlockHash: fmt.Sprintf("%#x", withdrawalTx.BlockRoot),
+ BlockTime: withdrawalTx.BlockTime,
+ TxOrigin: common.Address(withdrawalTx.TxSender).Hex(),
+ TxTarget: common.Address(withdrawalTx.TxTarget).Hex(),
+ TxHash: fmt.Sprintf("%#x", withdrawalTx.TxHash),
+ }
+
+ return txDetails
+ }
+
chainState := services.GlobalBeaconService.GetChainState()
validatorSetRsp := services.GlobalBeaconService.GetCachedValidatorSet()
+ matcherHeight := services.GlobalBeaconService.GetWithdrawalIndexer().GetMatcherHeight()
+
+ requestTxDetailsFor := [][]byte{}
for _, elWithdrawal := range dbElWithdrawals {
elWithdrawalData := &models.ElWithdrawalsPageDataWithdrawal{
- SlotNumber: elWithdrawal.SlotNumber,
- SlotRoot: elWithdrawal.SlotRoot,
- Time: chainState.SlotToTime(phase0.Slot(elWithdrawal.SlotNumber)),
- Orphaned: elWithdrawal.Orphaned,
- SourceAddr: elWithdrawal.SourceAddress,
- Amount: elWithdrawal.Amount,
- PublicKey: elWithdrawal.ValidatorPubkey,
+ SlotNumber: elWithdrawal.SlotNumber,
+ SlotRoot: elWithdrawal.SlotRoot,
+ Time: chainState.SlotToTime(phase0.Slot(elWithdrawal.SlotNumber)),
+ Orphaned: elWithdrawal.Orphaned,
+ SourceAddr: elWithdrawal.SourceAddress,
+ Amount: elWithdrawal.Amount,
+ PublicKey: elWithdrawal.ValidatorPubkey,
+ TransactionHash: elWithdrawal.TxHash,
}
if elWithdrawal.ValidatorIndex != nil {
@@ -203,10 +224,57 @@ func buildFilteredElWithdrawalsPageData(pageIdx uint64, pageSize uint64, minSlot
}
}
+ if len(elWithdrawalData.TransactionHash) > 0 {
+ elWithdrawalData.LinkedTransaction = true
+ requestTxDetailsFor = append(requestTxDetailsFor, elWithdrawalData.TransactionHash)
+ } else if elWithdrawal.BlockNumber > matcherHeight {
+ // withdrawal request has not been matched with a tx yet, try to find the tx on the fly
+ withdrawalRequestTx := db.GetWithdrawalRequestTxsByDequeueRange(elWithdrawal.BlockNumber, elWithdrawal.BlockNumber)
+ if len(withdrawalRequestTx) > 1 {
+ forkIds := services.GlobalBeaconService.GetParentForkIds(beacon.ForkKey(elWithdrawal.ForkId))
+ isParentFork := func(forkId uint64) bool {
+ for _, parentForkId := range forkIds {
+ if uint64(parentForkId) == forkId {
+ return true
+ }
+ }
+ return false
+ }
+
+ matchingTxs := []*dbtypes.WithdrawalRequestTx{}
+ for _, tx := range withdrawalRequestTx {
+ if isParentFork(tx.ForkId) {
+ matchingTxs = append(matchingTxs, tx)
+ }
+ }
+
+ if len(matchingTxs) >= int(elWithdrawal.SlotIndex)+1 {
+ elWithdrawalData.TransactionHash = matchingTxs[elWithdrawal.SlotIndex].TxHash
+ elWithdrawalData.LinkedTransaction = true
+ elWithdrawalData.TransactionDetails = buildTxDetails(matchingTxs[elWithdrawal.SlotIndex])
+ }
+ } else if len(withdrawalRequestTx) == 1 {
+ elWithdrawalData.TransactionHash = withdrawalRequestTx[0].TxHash
+ elWithdrawalData.LinkedTransaction = true
+ elWithdrawalData.TransactionDetails = buildTxDetails(withdrawalRequestTx[0])
+ }
+ }
+
pageData.ElRequests = append(pageData.ElRequests, elWithdrawalData)
}
pageData.RequestCount = uint64(len(pageData.ElRequests))
+ // load tx details for withdrawal requests
+ if len(requestTxDetailsFor) > 0 {
+ for _, txDetails := range db.GetWithdrawalRequestTxsByTxHashes(requestTxDetailsFor) {
+ for _, elWithdrawal := range pageData.ElRequests {
+ if elWithdrawal.TransactionHash != nil && bytes.Equal(elWithdrawal.TransactionHash, txDetails.TxHash) {
+ elWithdrawal.TransactionDetails = buildTxDetails(txDetails)
+ }
+ }
+ }
+ }
+
if pageData.RequestCount > 0 {
pageData.FirstIndex = pageData.ElRequests[0].SlotNumber
pageData.LastIndex = pageData.ElRequests[pageData.RequestCount-1].SlotNumber
diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go
index 347aa68a..b1325e14 100644
--- a/indexer/beacon/forkcache.go
+++ b/indexer/beacon/forkcache.go
@@ -7,6 +7,7 @@ import (
"sync"
"github.com/attestantio/go-eth2-client/spec/phase0"
+ "github.com/ethereum/go-ethereum/common/lru"
"github.com/ethpandaops/dora/db"
"github.com/ethpandaops/dora/dbtypes"
"github.com/jmoiron/sqlx"
@@ -19,6 +20,7 @@ type forkCache struct {
forkMap map[ForkKey]*Fork
finalizedForkId ForkKey
lastForkId ForkKey
+ parentIdCache *lru.Cache[ForkKey, ForkKey]
forkProcessLock sync.Mutex
}
@@ -26,8 +28,9 @@ type forkCache struct {
// newForkCache creates a new instance of the forkCache struct.
func newForkCache(indexer *Indexer) *forkCache {
return &forkCache{
- indexer: indexer,
- forkMap: make(map[ForkKey]*Fork),
+ indexer: indexer,
+ forkMap: make(map[ForkKey]*Fork),
+ parentIdCache: lru.NewCache[ForkKey, ForkKey](1000),
}
}
@@ -117,11 +120,23 @@ func (cache *forkCache) removeFork(forkId ForkKey) {
// getParentForkIds returns the parent fork ids of the given fork.
func (cache *forkCache) getParentForkIds(forkId ForkKey) []ForkKey {
parentForks := []ForkKey{forkId}
-
+ parentForkId := forkId
thisFork := cache.getForkById(forkId)
- for thisFork != nil && thisFork.parentFork != 0 {
- parentForks = append(parentForks, thisFork.parentFork)
- thisFork = cache.getForkById(thisFork.parentFork)
+
+ for parentForkId > 1 {
+ if thisFork != nil {
+ parentForkId = thisFork.parentFork
+ } else if cachedParent, isCached := cache.parentIdCache.Get(parentForkId); isCached {
+ parentForkId = cachedParent
+ } else if dbFork := db.GetForkById(uint64(parentForkId)); dbFork != nil {
+ parentForkId = ForkKey(dbFork.ParentFork)
+ cache.parentIdCache.Add(ForkKey(dbFork.ForkId), ForkKey(dbFork.ParentFork))
+ } else {
+ break
+ }
+
+ thisFork = cache.getForkById(parentForkId)
+ parentForks = append(parentForks, parentForkId)
}
return parentForks
@@ -200,6 +215,8 @@ func (cache *forkCache) setFinalizedEpoch(finalizedSlot phase0.Slot, justifiedRo
continue
}
+ cache.parentIdCache.Add(fork.forkId, fork.parentFork)
+
delete(cache.forkMap, fork.forkId)
}
diff --git a/indexer/beacon/forkdetection.go b/indexer/beacon/forkdetection.go
index 4db82f3d..b1d80793 100644
--- a/indexer/beacon/forkdetection.go
+++ b/indexer/beacon/forkdetection.go
@@ -113,6 +113,7 @@ func (cache *forkCache) processBlock(block *Block) error {
newFork := &newForkInfo{
fork: fork,
}
+ cache.parentIdCache.Add(fork.forkId, fork.parentFork)
newForks = append(newForks, newFork)
fmt.Fprintf(&logbuf, ", head1(%v): %v [%v]", fork.forkId, block.Slot, block.Root.String())
@@ -142,6 +143,7 @@ func (cache *forkCache) processBlock(block *Block) error {
fork: otherFork,
updateRoots: updatedRoots,
}
+ cache.parentIdCache.Add(otherFork.forkId, otherFork.parentFork)
newForks = append(newForks, newFork)
if updatedFork != nil {
@@ -185,6 +187,7 @@ func (cache *forkCache) processBlock(block *Block) error {
fork: fork,
updateRoots: updatedRoots,
}
+ cache.parentIdCache.Add(fork.forkId, fork.parentFork)
newForks = append(newForks, newFork)
if updatedFork != nil {
@@ -321,6 +324,7 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip
if forks := cache.getForkByBase(startBlock.Root); len(forks) > 0 && forks[0].parentFork != forkId {
for _, fork := range forks {
fork.parentFork = forkId
+ cache.parentIdCache.Add(fork.forkId, fork.parentFork)
}
updatedFork = &updateForkInfo{
diff --git a/indexer/beacon/indexer_getter.go b/indexer/beacon/indexer_getter.go
index 7ae1ef2b..6bf34284 100644
--- a/indexer/beacon/indexer_getter.go
+++ b/indexer/beacon/indexer_getter.go
@@ -116,6 +116,15 @@ func (indexer *Indexer) GetBlockCacheState() (finalizedEpoch phase0.Epoch, prune
return indexer.lastFinalizedEpoch, indexer.lastPrunedEpoch
}
+// GetSynchronizerState returns the state of the synchronizer, including whether it is running and the current epoch.
+func (indexer *Indexer) GetSynchronizerState() (running bool, syncHead phase0.Epoch) {
+ if indexer.synchronizer == nil {
+ return false, 0
+ }
+
+ return indexer.synchronizer.running, indexer.synchronizer.currentEpoch
+}
+
// GetForkHeads returns a slice of fork heads in the indexer.
func (indexer *Indexer) GetForkHeads() []*ForkHead {
return indexer.forkCache.getForkHeads()
@@ -236,3 +245,8 @@ func (indexer *Indexer) GetEpochStats(epoch phase0.Epoch, overrideForkId *ForkKe
return bestEpochStats
}
+
+// GetParentForkIds returns the parent fork ids of the given fork.
+func (indexer *Indexer) GetParentForkIds(forkId ForkKey) []ForkKey {
+ return indexer.forkCache.getParentForkIds(forkId)
+}
diff --git a/indexer/beacon/writedb.go b/indexer/beacon/writedb.go
index 9aa2d32f..3563d61e 100644
--- a/indexer/beacon/writedb.go
+++ b/indexer/beacon/writedb.go
@@ -707,6 +707,8 @@ func (dbw *dbWriter) buildDbConsolidationRequests(block *Block, orphaned bool, o
validatorSetMap[validator.Validator.PublicKey] = uint64(idx)
}
+ blockNumber, _ := blockBody.ExecutionBlockNumber()
+
dbConsolidations := make([]*dbtypes.ConsolidationRequest, len(consolidations))
for idx, consolidation := range consolidations {
dbConsolidation := &dbtypes.ConsolidationRequest{
@@ -718,6 +720,7 @@ func (dbw *dbWriter) buildDbConsolidationRequests(block *Block, orphaned bool, o
SourceAddress: consolidation.SourceAddress[:],
SourcePubkey: consolidation.SourcePubkey[:],
TargetPubkey: consolidation.TargetPubkey[:],
+ BlockNumber: blockNumber,
}
if overrideForkId != nil {
dbConsolidation.ForkId = uint64(*overrideForkId)
@@ -772,6 +775,8 @@ func (dbw *dbWriter) buildDbWithdrawalRequests(block *Block, orphaned bool, over
validatorSetMap[validator.Validator.PublicKey] = uint64(idx)
}
+ blockNumber, _ := blockBody.ExecutionBlockNumber()
+
dbWithdrawalRequests := make([]*dbtypes.WithdrawalRequest, len(withdrawalRequests))
for idx, withdrawalRequest := range withdrawalRequests {
dbWithdrawalRequest := &dbtypes.WithdrawalRequest{
@@ -783,6 +788,7 @@ func (dbw *dbWriter) buildDbWithdrawalRequests(block *Block, orphaned bool, over
SourceAddress: withdrawalRequest.SourceAddress[:],
ValidatorPubkey: withdrawalRequest.ValidatorPubkey[:],
Amount: uint64(withdrawalRequest.Amount),
+ BlockNumber: blockNumber,
}
if overrideForkId != nil {
dbWithdrawalRequest.ForkId = uint64(*overrideForkId)
diff --git a/indexer/execution/consolidation_indexer.go b/indexer/execution/consolidation_indexer.go
new file mode 100644
index 00000000..a467534c
--- /dev/null
+++ b/indexer/execution/consolidation_indexer.go
@@ -0,0 +1,297 @@
+package execution
+
+import (
+ "bytes"
+ "fmt"
+ "time"
+
+ "github.com/attestantio/go-eth2-client/spec/phase0"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/jmoiron/sqlx"
+ "github.com/sirupsen/logrus"
+
+ "github.com/ethpandaops/dora/db"
+ "github.com/ethpandaops/dora/dbtypes"
+ "github.com/ethpandaops/dora/indexer/beacon"
+ "github.com/ethpandaops/dora/utils"
+)
+
+const consolidationContractAddr = "0x01aBEa29659e5e97C95107F20bb753cD3e09bBBb"
+
+// ConsolidationIndexer is the indexer for the eip-7251 consolidation system contract
+type ConsolidationIndexer struct {
+ indexerCtx *IndexerCtx
+ logger logrus.FieldLogger
+ indexer *contractIndexer[dbtypes.ConsolidationRequestTx]
+ matcher *transactionMatcher[consolidationRequestMatch]
+}
+
+type consolidationRequestMatch struct {
+ slotRoot []byte
+ slotIndex uint64
+ txHash []byte
+}
+
+// NewConsolidationIndexer creates a new consolidation system contract indexer
+func NewConsolidationIndexer(indexer *IndexerCtx) *ConsolidationIndexer {
+ batchSize := utils.Config.ExecutionApi.LogBatchSize
+ if batchSize == 0 {
+ batchSize = 1000
+ }
+
+ ci := &ConsolidationIndexer{
+ indexerCtx: indexer,
+ logger: indexer.logger.WithField("indexer", "consolidations"),
+ }
+
+ specs := indexer.chainState.GetSpecs()
+
+ // create contract indexer for the consolidation contract
+ ci.indexer = newContractIndexer(
+ indexer,
+ indexer.logger.WithField("contract-indexer", "consolidations"),
+ &contractIndexerOptions[dbtypes.ConsolidationRequestTx]{
+ stateKey: "indexer.consolidationindexer",
+ batchSize: batchSize,
+ contractAddress: common.HexToAddress(consolidationContractAddr),
+ deployBlock: uint64(utils.Config.ExecutionApi.ElectraDeployBlock),
+ dequeueRate: specs.MaxConsolidationRequestsPerPayload,
+
+ processFinalTx: ci.processFinalTx,
+ processRecentTx: ci.processRecentTx,
+ persistTxs: ci.persistConsolidationTxs,
+ },
+ )
+
+ // create transaction matcher for the consolidation contract
+ ci.matcher = newTransactionMatcher(
+ indexer,
+ indexer.logger.WithField("contract-matcher", "consolidations"),
+ &transactionMatcherOptions[consolidationRequestMatch]{
+ stateKey: "indexer.consolidationmatcher",
+ deployBlock: uint64(utils.Config.ExecutionApi.ElectraDeployBlock),
+ timeLimit: 2 * time.Second,
+
+ matchBlockRange: ci.matchBlockRange,
+ persistMatches: ci.persistMatches,
+ },
+ )
+
+ go ci.runConsolidationIndexerLoop()
+
+ return ci
+}
+
+// GetMatcherHeight returns the last processed el block number from the transaction matcher
+func (ci *ConsolidationIndexer) GetMatcherHeight() uint64 {
+ return ci.matcher.GetMatcherHeight()
+}
+
+// runConsolidationIndexerLoop is the main loop for the consolidation indexer
+func (ci *ConsolidationIndexer) runConsolidationIndexerLoop() {
+ defer utils.HandleSubroutinePanic("ConsolidationIndexer.runConsolidationIndexerLoop")
+
+ for {
+ time.Sleep(30 * time.Second)
+ ci.logger.Debugf("run consolidation indexer logic")
+
+ err := ci.indexer.runContractIndexer()
+ if err != nil {
+ ci.logger.Errorf("indexer error: %v", err)
+ }
+
+ err = ci.matcher.runTransactionMatcher(ci.indexer.state.FinalBlock)
+ if err != nil {
+ ci.logger.Errorf("matcher error: %v", err)
+ }
+ }
+}
+
+// processFinalTx is the callback for the contract indexer for finalized transactions
+// it parses the transaction and returns the corresponding consolidation request transaction
+func (ci *ConsolidationIndexer) processFinalTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64) (*dbtypes.ConsolidationRequestTx, error) {
+ requestTx := ci.parseRequestLog(log, nil)
+ if requestTx == nil {
+ return nil, fmt.Errorf("invalid consolidation log")
+ }
+
+ txTo := *tx.To()
+
+ requestTx.BlockTime = header.Time
+ requestTx.TxSender = txFrom[:]
+ requestTx.TxTarget = txTo[:]
+ requestTx.DequeueBlock = dequeueBlock
+
+ return requestTx, nil
+}
+
+// processRecentTx is the callback for the contract indexer for recent transactions
+// it parses the transaction and returns the corresponding consolidation request transaction
+func (ci *ConsolidationIndexer) processRecentTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64, fork *forkWithClients) (*dbtypes.ConsolidationRequestTx, error) {
+ requestTx := ci.parseRequestLog(log, &fork.forkId)
+ if requestTx == nil {
+ return nil, fmt.Errorf("invalid consolidation log")
+ }
+
+ txTo := *tx.To()
+
+ requestTx.BlockTime = header.Time
+ requestTx.TxSender = txFrom[:]
+ requestTx.TxTarget = txTo[:]
+ requestTx.DequeueBlock = dequeueBlock
+
+ clBlock := ci.indexerCtx.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash))
+ if len(clBlock) > 0 {
+ requestTx.ForkId = uint64(clBlock[0].GetForkId())
+ } else {
+ requestTx.ForkId = uint64(fork.forkId)
+ }
+
+ return requestTx, nil
+}
+
+// parseRequestLog parses a consolidation request log and returns the corresponding consolidation request transaction
+func (ci *ConsolidationIndexer) parseRequestLog(log *types.Log, forkId *beacon.ForkKey) *dbtypes.ConsolidationRequestTx {
+ // data layout:
+ // 0-20: sender address (20 bytes)
+ // 20-68: source pubkey (48 bytes)
+ // 68-116: target pubkey (48 bytes)
+
+ if len(log.Data) < 116 {
+ ci.logger.Warnf("invalid consolidation log data length: %v", len(log.Data))
+ return nil
+ }
+
+ senderAddr := log.Data[:20]
+ sourcePubkey := log.Data[20:68]
+ targetPubkey := log.Data[68:116]
+
+ // get the validator indices for the source and target pubkeys
+ var sourceIndex, targetIndex *uint64
+ for _, validator := range ci.indexerCtx.beaconIndexer.GetCanonicalValidatorSet(forkId) {
+ if sourceIndex == nil && bytes.Equal(validator.Validator.PublicKey[:], sourcePubkey) {
+ index := uint64(validator.Index)
+ sourceIndex = &index
+ if targetIndex != nil {
+ break
+ }
+ }
+ if targetIndex == nil && bytes.Equal(validator.Validator.PublicKey[:], targetPubkey) {
+ index := uint64(validator.Index)
+ targetIndex = &index
+ if sourceIndex != nil {
+ break
+ }
+ }
+ }
+
+ requestTx := &dbtypes.ConsolidationRequestTx{
+ BlockNumber: log.BlockNumber,
+ BlockIndex: uint64(log.Index),
+ BlockRoot: log.BlockHash[:],
+ SourceAddress: senderAddr,
+ SourcePubkey: sourcePubkey,
+ SourceIndex: sourceIndex,
+ TargetPubkey: targetPubkey,
+ TargetIndex: targetIndex,
+ TxHash: log.TxHash[:],
+ }
+
+ return requestTx
+}
+
+// persistConsolidationTxs is the callback for the contract indexer to persist consolidation request transactions to the database
+func (ci *ConsolidationIndexer) persistConsolidationTxs(tx *sqlx.Tx, requests []*dbtypes.ConsolidationRequestTx) error {
+ requestCount := len(requests)
+ for requestIdx := 0; requestIdx < requestCount; requestIdx += 500 {
+ endIdx := requestIdx + 500
+ if endIdx > requestCount {
+ endIdx = requestCount
+ }
+
+ err := db.InsertConsolidationRequestTxs(requests[requestIdx:endIdx], tx)
+ if err != nil {
+ return fmt.Errorf("error while inserting consolidation txs: %v", err)
+ }
+ }
+
+ return nil
+}
+
+// matchBlockRange is the callback for the transaction matcher to match consolidation request transactions to transactions in the database
+func (ds *ConsolidationIndexer) matchBlockRange(fromBlock uint64, toBlock uint64) ([]*consolidationRequestMatch, error) {
+ requestMatches := []*consolidationRequestMatch{}
+
+ // get all consolidation request transactions that are dequeued in the block range
+ dequeueConsolidationTxs := db.GetConsolidationRequestTxsByDequeueRange(fromBlock, toBlock)
+ if len(dequeueConsolidationTxs) > 0 {
+ firstBlock := dequeueConsolidationTxs[0].DequeueBlock
+ lastBlock := dequeueConsolidationTxs[len(dequeueConsolidationTxs)-1].DequeueBlock
+
+ // get all consolidation requests that are in the block range
+ for _, consolidationRequest := range db.GetConsolidationRequestsByElBlockRange(firstBlock, lastBlock) {
+ if len(consolidationRequest.TxHash) > 0 {
+ continue
+ }
+
+ // check if the consolidation request is from a parent fork
+ parentForkIds := ds.indexerCtx.beaconIndexer.GetParentForkIds(beacon.ForkKey(consolidationRequest.ForkId))
+ isParentFork := func(forkId uint64) bool {
+ if forkId == consolidationRequest.ForkId {
+ return true
+ }
+ for _, parentForkId := range parentForkIds {
+ if uint64(parentForkId) == forkId {
+ return true
+ }
+ }
+ return false
+ }
+
+ // get all consolidation request transactions that are from the current fork
+ matchingTxs := []*dbtypes.ConsolidationRequestTx{}
+ for _, tx := range dequeueConsolidationTxs {
+ if tx.DequeueBlock == consolidationRequest.BlockNumber && isParentFork(tx.ForkId) {
+ matchingTxs = append(matchingTxs, tx)
+ }
+ }
+
+ // if no consolidation request transactions are from the current fork, get all transactions that dequeued in the requests block number
+ if len(matchingTxs) == 0 {
+ for _, tx := range dequeueConsolidationTxs {
+ if tx.DequeueBlock == consolidationRequest.BlockNumber {
+ matchingTxs = append(matchingTxs, tx)
+ }
+ }
+ }
+
+ if len(matchingTxs) < int(consolidationRequest.SlotIndex)+1 {
+ continue // no transaction found for this consolidation request
+ }
+
+ txHash := matchingTxs[consolidationRequest.SlotIndex].TxHash
+ ds.logger.Debugf("Matched consolidation request %d:%v with tx 0x%x", consolidationRequest.SlotNumber, consolidationRequest.SlotIndex, txHash)
+
+ requestMatches = append(requestMatches, &consolidationRequestMatch{
+ slotRoot: consolidationRequest.SlotRoot,
+ slotIndex: consolidationRequest.SlotIndex,
+ txHash: txHash,
+ })
+ }
+ }
+
+ return requestMatches, nil
+}
+
+// persistMatches is the callback for the transaction matcher to persist matches to the database
+func (ds *ConsolidationIndexer) persistMatches(tx *sqlx.Tx, matches []*consolidationRequestMatch) error {
+ for _, match := range matches {
+ err := db.UpdateConsolidationRequestTxHash(match.slotRoot, match.slotIndex, match.txHash, tx)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/indexer/execution/contract_indexer.go b/indexer/execution/contract_indexer.go
new file mode 100644
index 00000000..aa9322c7
--- /dev/null
+++ b/indexer/execution/contract_indexer.go
@@ -0,0 +1,608 @@
+package execution
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "math"
+ "math/big"
+ "time"
+
+ "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/jmoiron/sqlx"
+ "github.com/sirupsen/logrus"
+
+ "github.com/ethpandaops/dora/clients/execution"
+ "github.com/ethpandaops/dora/db"
+ "github.com/ethpandaops/dora/indexer/beacon"
+)
+
+// contractIndexer handles the indexing of contract events for a specific system contract
+// it crawls logs in order and tracks the queue length to precalculate the dequeue block number where the request will be sent to the beacon chain
+type contractIndexer[TxType any] struct {
+ indexer *IndexerCtx
+ logger logrus.FieldLogger
+ options *contractIndexerOptions[TxType]
+ state *contractIndexerState
+}
+
+// contractIndexerOptions defines the configuration for the contract indexer
+type contractIndexerOptions[TxType any] struct {
+ stateKey string // key to identify the indexer state in the database
+ batchSize int // number of logs to fetch per request
+ contractAddress common.Address // address of the contract to index
+ deployBlock uint64 // block number from where to start crawling logs
+ dequeueRate uint64 // number of logs to dequeue per block, 0 for no queue
+
+ // processFinalTx processes a finalized transaction log
+ processFinalTx func(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64) (*TxType, error)
+
+ // processRecentTx processes a recent (non-finalized) transaction log
+ processRecentTx func(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64, fork *forkWithClients) (*TxType, error)
+
+ // persistTxs persists processed transactions to the database
+ persistTxs func(tx *sqlx.Tx, txs []*TxType) error
+}
+
+// contractIndexerState represents the current state of the contract indexer
+type contractIndexerState struct {
+ FinalBlock uint64 `json:"final_block"`
+ FinalQueueLen uint64 `json:"final_queue"`
+ ForkStates map[beacon.ForkKey]*contractIndexerForkState `json:"fork_states"`
+}
+
+// contractIndexerForkState represents the state of the contract indexer for a specific unfinalized fork
+type contractIndexerForkState struct {
+ Block uint64 `json:"b"`
+ QueueLen uint64 `json:"q"`
+}
+
+// newContractIndexer creates a new contract indexer with the given options
+func newContractIndexer[TxType any](indexer *IndexerCtx, logger logrus.FieldLogger, options *contractIndexerOptions[TxType]) *contractIndexer[TxType] {
+ ci := &contractIndexer[TxType]{
+ indexer: indexer,
+ logger: logger,
+ options: options,
+ }
+
+ return ci
+}
+
+// loadState loads the contract indexer state from the database
+func (ci *contractIndexer[_]) loadState() {
+ syncState := contractIndexerState{}
+ db.GetExplorerState(ci.options.stateKey, &syncState)
+ ci.state = &syncState
+
+ if ci.state.ForkStates == nil {
+ ci.state.ForkStates = make(map[beacon.ForkKey]*contractIndexerForkState)
+ }
+
+ if ci.state.FinalBlock == 0 {
+ ci.state.FinalBlock = ci.options.deployBlock
+ }
+}
+
+// persistState saves the current contract indexer state to the database
+func (ci *contractIndexer[_]) persistState(tx *sqlx.Tx) error {
+ finalizedBlockNumber := ci.getFinalizedBlockNumber()
+ for forkId, forkState := range ci.state.ForkStates {
+ if forkState.Block < finalizedBlockNumber {
+ delete(ci.state.ForkStates, forkId)
+ }
+ }
+
+ err := db.SetExplorerState(ci.options.stateKey, ci.state, tx)
+ if err != nil {
+ return fmt.Errorf("error while updating contract indexer state: %v", err)
+ }
+
+ return nil
+}
+
+// runContractIndexer is the main entry point for running the contract indexer
+// It processes finalized and recent block ranges in order
+func (ci *contractIndexer[_]) runContractIndexer() error {
+ if ci.state == nil {
+ ci.loadState()
+ }
+
+ finalizedEpoch, _ := ci.indexer.chainState.GetFinalizedCheckpoint()
+ if finalizedEpoch > 0 {
+ finalizedBlockNumber := ci.getFinalizedBlockNumber()
+
+ if finalizedBlockNumber == 0 {
+ return fmt.Errorf("finalized block not found in cache or db")
+ }
+
+ if finalizedBlockNumber < ci.state.FinalBlock {
+ return fmt.Errorf("finalized block number (%v) smaller than index state (%v)", finalizedBlockNumber, ci.state.FinalBlock)
+ }
+
+ if finalizedBlockNumber > ci.state.FinalBlock {
+ err := ci.processFinalizedBlocks(finalizedBlockNumber)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ ci.processRecentBlocks()
+
+ return nil
+}
+
+// getFinalizedBlockNumber retrieves the latest finalized el block number
+func (ci *contractIndexer[_]) getFinalizedBlockNumber() uint64 {
+ var finalizedBlockNumber uint64
+
+ _, finalizedRoot := ci.indexer.chainState.GetFinalizedCheckpoint()
+ if finalizedBlock := ci.indexer.beaconIndexer.GetBlockByRoot(finalizedRoot); finalizedBlock != nil {
+ if indexVals := finalizedBlock.GetBlockIndex(); indexVals != nil {
+ finalizedBlockNumber = indexVals.ExecutionNumber
+ }
+ }
+
+ if finalizedBlockNumber == 0 {
+ // load from db
+ if finalizedBlock := db.GetSlotByRoot(finalizedRoot[:]); finalizedBlock != nil && finalizedBlock.EthBlockNumber != nil {
+ finalizedBlockNumber = *finalizedBlock.EthBlockNumber
+ }
+ }
+
+ return finalizedBlockNumber
+}
+
+// loadFilteredLogs fetches filtered logs from the execution client
+func (ci *contractIndexer[_]) loadFilteredLogs(ctx context.Context, client *execution.Client, query ethereum.FilterQuery) ([]types.Log, error) {
+ ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
+ defer cancel()
+
+ return client.GetRPCClient().GetEthClient().FilterLogs(ctx, query)
+}
+
+// loadTransactionByHash fetches a transaction by its hash from the execution client
+func (ci *contractIndexer[_]) loadTransactionByHash(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Transaction, error) {
+ ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ defer cancel()
+
+ tx, _, err := client.GetRPCClient().GetEthClient().TransactionByHash(ctx, hash)
+ return tx, err
+}
+
+// loadHeaderByHash fetches a block header by its hash from the execution client
+func (ci *contractIndexer[_]) loadHeaderByHash(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Header, error) {
+ ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ defer cancel()
+
+ return client.GetRPCClient().GetHeaderByHash(ctx, hash)
+}
+
+// processFinalizedBlocks processes contract events from finalized block ranges
+// it fetches logs in batches and calls the provided processFinalTx function to process each log
+func (ci *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber uint64) error {
+ clients := ci.indexer.getFinalizedClients(execution.AnyClient)
+ if len(clients) == 0 {
+ return fmt.Errorf("no ready execution client found")
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ retryCount := 0
+
+ // process blocks in range until the finalized block is reached
+ for ci.state.FinalBlock < finalizedBlockNumber {
+ client := clients[retryCount%len(clients)]
+
+ batchSize := uint64(ci.options.batchSize)
+ if retryCount > 0 {
+ // reduce batch size on retries to avoid response limit errors for block ranges with many logs
+ batchSize /= uint64(math.Pow(2, float64(retryCount)))
+ if batchSize < 10 {
+ batchSize = 10
+ }
+ }
+
+ toBlock := ci.state.FinalBlock + uint64(ci.options.batchSize)
+ if toBlock > finalizedBlockNumber {
+ toBlock = finalizedBlockNumber
+ }
+
+ // fetch logs from the execution client
+ query := ethereum.FilterQuery{
+ FromBlock: big.NewInt(0).SetUint64(ci.state.FinalBlock + 1),
+ ToBlock: big.NewInt(0).SetUint64(toBlock),
+ Addresses: []common.Address{
+ ci.options.contractAddress,
+ },
+ }
+
+ logs, err := ci.loadFilteredLogs(ctx, client, query)
+ if err != nil {
+ if retryCount < 3 {
+ retryCount++
+ continue
+ }
+
+ return fmt.Errorf("error fetching contract logs: %v", err)
+ }
+
+ ci.logger.Debugf("received contract logs for block %v - %v: %v events", ci.state.FinalBlock, toBlock, len(logs))
+
+ retryCount = 0
+
+ // parse logs and load tx/block details
+ var txHash, txHeaderHash []byte
+ var txDetails *types.Transaction
+ var txBlockHeader *types.Header
+
+ requestTxs := []*TxType{}
+ queueBlock := ci.state.FinalBlock
+ queueLength := ci.state.FinalQueueLen
+
+ // we start crawling from the next block, so we need to decrease the queue length for the current block
+ if queueLength > ci.options.dequeueRate {
+ queueLength -= ci.options.dequeueRate
+ } else {
+ queueLength = 0
+ }
+
+ for idx := range logs {
+ log := &logs[idx]
+
+ // load transaction if not already loaded
+ if txHash == nil || !bytes.Equal(txHash, log.TxHash[:]) {
+ txDetails, err = ci.loadTransactionByHash(ctx, client, log.TxHash)
+ if err != nil {
+ return fmt.Errorf("could not load tx details (%v): %v", log.TxHash, err)
+ }
+
+ txHash = log.TxHash[:]
+ }
+
+ // load block header if not already loaded
+ if txBlockHeader == nil || !bytes.Equal(txHeaderHash, log.BlockHash[:]) {
+ txBlockHeader, err = ci.loadHeaderByHash(ctx, client, log.BlockHash)
+ if err != nil {
+ return fmt.Errorf("could not load block details (%v): %v", log.BlockHash, err)
+ }
+
+ txHeaderHash = log.BlockHash[:]
+ }
+
+ // get transaction sender
+ txFrom, err := types.Sender(types.LatestSignerForChainID(txDetails.ChainId()), txDetails)
+ if err != nil {
+ return fmt.Errorf("could not decode tx sender (%v): %v", log.TxHash, err)
+ }
+
+ // process queue decrease for past blocks
+ if queueBlock > log.BlockNumber {
+ ci.logger.Warnf("contract log for block %v received after block %v", log.BlockNumber, queueBlock)
+ return nil
+ } else if ci.options.dequeueRate > 0 && queueBlock < log.BlockNumber {
+ // calculate how many requests were dequeued since the last processed log
+ dequeuedRequests := (log.BlockNumber - queueBlock) * ci.options.dequeueRate
+ if dequeuedRequests > queueLength {
+ queueLength = 0
+ } else {
+ queueLength -= dequeuedRequests
+ }
+
+ queueBlock = log.BlockNumber
+ }
+
+ // calculate the dequeue block number for the current log
+ var dequeueBlock uint64
+ if ci.options.dequeueRate > 0 {
+ dequeueBlock = log.BlockNumber + (queueLength / ci.options.dequeueRate)
+ queueLength++
+ } else {
+ dequeueBlock = log.BlockNumber
+ }
+
+ // process the log and get the corresponding transaction
+ requestTx, err := ci.options.processFinalTx(log, txDetails, txBlockHeader, txFrom, dequeueBlock)
+ if err != nil {
+ continue
+ }
+
+ if requestTx == nil {
+ continue
+ }
+
+ requestTxs = append(requestTxs, requestTx)
+ }
+
+ // calculate how many requests were dequeued at the end of the current block range
+ if ci.options.dequeueRate > 0 && queueBlock < toBlock {
+ dequeuedRequests := (toBlock - queueBlock) * ci.options.dequeueRate
+ if dequeuedRequests > queueLength {
+ queueLength = 0
+ } else {
+ queueLength -= dequeuedRequests
+ }
+
+ queueBlock = toBlock
+ }
+
+ if len(requestTxs) > 0 {
+ ci.logger.Infof("crawled transactions for block %v - %v: %v events", ci.state.FinalBlock, toBlock, len(requestTxs))
+ }
+
+ // persist the processed transactions and update the indexer state
+ err = ci.persistFinalizedRequestTxs(toBlock, queueLength, requestTxs)
+ if err != nil {
+ return fmt.Errorf("could not persist indexed transactions: %v", err)
+ }
+
+ // cooldown to avoid rate limiting from external archive nodes
+ time.Sleep(1 * time.Second)
+ }
+ return nil
+}
+
+// processRecentBlocks processes contract events from recent (non-finalized) blocks across all forks
+func (ci *contractIndexer[_]) processRecentBlocks() error {
+ headForks := ci.indexer.getForksWithClients(execution.AnyClient)
+ for _, headFork := range headForks {
+ err := ci.processRecentBlocksForFork(headFork)
+ if err != nil {
+ if headFork.canonical {
+ ci.logger.Errorf("could not process recent events from canonical fork %v: %v", headFork.forkId, err)
+ } else {
+ ci.logger.Warnf("could not process recent events from fork %v: %v", headFork.forkId, err)
+ }
+ }
+ }
+ return nil
+}
+
+// processRecentBlocksForFork processes contract events from recent blocks for a specific fork
+func (ci *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWithClients) error {
+ // get the head el block number for the fork
+ elHeadBlock := ci.indexer.beaconIndexer.GetCanonicalHead(&headFork.forkId)
+ if elHeadBlock == nil {
+ return fmt.Errorf("head block not found")
+ }
+
+ elHeadBlockIndex := elHeadBlock.GetBlockIndex()
+ if elHeadBlockIndex == nil {
+ return fmt.Errorf("head block index not found")
+ }
+
+ elHeadBlockNumber := elHeadBlockIndex.ExecutionNumber
+ if elHeadBlockNumber > 0 {
+ elHeadBlockNumber--
+ }
+
+ startBlockNumber := ci.state.FinalBlock + 1
+ queueLength := ci.state.FinalQueueLen
+
+ // get last processed block for this fork
+ if forkState := ci.state.ForkStates[headFork.forkId]; forkState != nil && forkState.Block <= elHeadBlockNumber {
+ if forkState.Block == elHeadBlockNumber {
+ return nil // already processed
+ }
+
+ startBlockNumber = forkState.Block + 1
+ queueLength = forkState.QueueLen
+ } else {
+ // seems we haven't seen this fork before, check if we can continue from a parent fork
+ for parentForkId := range ci.indexer.beaconIndexer.GetParentForkIds(headFork.forkId) {
+ if parentForkState := ci.state.ForkStates[beacon.ForkKey(parentForkId)]; parentForkState != nil && parentForkState.Block <= elHeadBlockNumber {
+ startBlockNumber = parentForkState.Block + 1
+ queueLength = parentForkState.QueueLen
+ }
+ }
+ }
+
+ var resError error
+ var ctxCancel context.CancelFunc
+ defer func() {
+ if ctxCancel != nil {
+ ctxCancel()
+ }
+ }()
+
+ queueBlock := startBlockNumber
+ // we start crawling from the next block, so we need to decrease the queue length for the current block
+ if queueLength > ci.options.dequeueRate {
+ queueLength -= ci.options.dequeueRate
+ } else {
+ queueLength = 0
+ }
+
+ // process blocks in range until the head el block is reached
+ for startBlockNumber <= elHeadBlockNumber {
+ var toBlock uint64
+ var logs []types.Log
+ var reqError error
+ var txHash, txHeaderHash []byte
+ var txDetails *types.Transaction
+ var txBlockHeader *types.Header
+
+ requestTxs := []*TxType{}
+
+ for retryCount := 0; retryCount < 3; retryCount++ {
+ client := headFork.clients[retryCount%len(headFork.clients)]
+
+ batchSize := uint64(ci.options.batchSize)
+ if retryCount > 0 {
+ // reduce batch size on retries to avoid response limit errors for block ranges with many logs
+ batchSize /= uint64(math.Pow(2, float64(retryCount)))
+ if batchSize < 10 {
+ batchSize = 10
+ }
+ }
+
+ toBlock = startBlockNumber + uint64(ci.options.batchSize)
+ if toBlock > elHeadBlockNumber {
+ toBlock = elHeadBlockNumber
+ }
+
+ if ctxCancel != nil {
+ ctxCancel()
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 600*time.Second)
+ ctxCancel = cancel
+
+ // fetch logs from the execution client
+ query := ethereum.FilterQuery{
+ FromBlock: big.NewInt(0).SetUint64(startBlockNumber),
+ ToBlock: big.NewInt(0).SetUint64(toBlock),
+ Addresses: []common.Address{
+ ci.options.contractAddress,
+ },
+ }
+
+ logs, reqError = ci.loadFilteredLogs(ctx, client, query)
+ if reqError != nil {
+ ci.logger.Warnf("error fetching contract logs for fork %v (%v-%v): %v", headFork.forkId, startBlockNumber, toBlock, reqError)
+ continue
+ }
+
+ for idx := range logs {
+ var err error
+
+ log := &logs[idx]
+
+ // load transaction if not already loaded
+ if txHash == nil || !bytes.Equal(txHash, log.TxHash[:]) {
+ txDetails, err = ci.loadTransactionByHash(ctx, client, log.TxHash)
+ if err != nil {
+ return fmt.Errorf("could not load tx details (%v): %v", log.TxHash, err)
+ }
+
+ txHash = log.TxHash[:]
+ }
+
+ // load block header if not already loaded
+ if txBlockHeader == nil || !bytes.Equal(txHeaderHash, log.BlockHash[:]) {
+ txBlockHeader, err = ci.loadHeaderByHash(ctx, client, log.BlockHash)
+ if err != nil {
+ return fmt.Errorf("could not load block details (%v): %v", log.BlockHash, err)
+ }
+
+ txHeaderHash = log.BlockHash[:]
+ }
+
+ // get transaction sender
+ txFrom, err := types.Sender(types.LatestSignerForChainID(txDetails.ChainId()), txDetails)
+ if err != nil {
+ return fmt.Errorf("could not decode tx sender (%v): %v", log.TxHash, err)
+ }
+
+ // process queue decrease for past blocks
+ if queueBlock > log.BlockNumber {
+ ci.logger.Warnf("contract log for block %v received after block %v", log.BlockNumber, queueBlock)
+ return nil
+ } else if ci.options.dequeueRate > 0 && queueBlock < log.BlockNumber {
+ dequeuedRequests := (log.BlockNumber - queueBlock) * ci.options.dequeueRate
+ if dequeuedRequests > queueLength {
+ queueLength = 0
+ } else {
+ queueLength -= dequeuedRequests
+ }
+
+ queueBlock = log.BlockNumber
+ }
+
+ // calculate the dequeue block number for the current log
+ var dequeueBlock uint64
+ if ci.options.dequeueRate > 0 {
+ dequeueBlock = log.BlockNumber + (queueLength / ci.options.dequeueRate)
+ queueLength++
+ } else {
+ dequeueBlock = log.BlockNumber
+ }
+
+ // process the log and get the corresponding transaction
+ requestTx, err := ci.options.processRecentTx(log, txDetails, txBlockHeader, txFrom, dequeueBlock, headFork)
+ if err != nil {
+ continue
+ }
+
+ if requestTx == nil {
+ continue
+ }
+
+ requestTxs = append(requestTxs, requestTx)
+ }
+
+ // calculate how many requests were dequeued at the end of the current block range
+ if queueBlock < toBlock {
+ dequeuedRequests := (toBlock - queueBlock) * ci.options.dequeueRate
+ if dequeuedRequests > queueLength {
+ queueLength = 0
+ } else {
+ queueLength -= dequeuedRequests
+ }
+
+ queueBlock = toBlock
+ }
+
+ if len(requestTxs) > 0 {
+ ci.logger.Infof("crawled recent contract logs for fork %v (%v-%v): %v events", headFork.forkId, startBlockNumber, toBlock, len(requestTxs))
+ }
+
+ // persist the processed transactions and update the indexer state
+ err := ci.persistRecentRequestTxs(headFork.forkId, queueBlock, queueLength, requestTxs)
+ if err != nil {
+ return fmt.Errorf("could not persist contract logs: %v", err)
+ }
+
+ // cooldown to avoid rate limiting from external archive nodes
+ time.Sleep(1 * time.Second)
+
+ break
+ }
+
+ if reqError != nil {
+ return fmt.Errorf("error fetching contract logs for fork %v (%v-%v): %v", headFork.forkId, startBlockNumber, toBlock, reqError)
+ }
+
+ startBlockNumber = toBlock + 1
+ }
+
+ return resError
+}
+
+// persistFinalizedRequestTxs persists processed finalized transactions and the indexer state to the database
+func (ci *contractIndexer[TxType]) persistFinalizedRequestTxs(finalBlockNumber, finalQueueLen uint64, requests []*TxType) error {
+ return db.RunDBTransaction(func(tx *sqlx.Tx) error {
+ if len(requests) > 0 {
+ err := ci.options.persistTxs(tx, requests)
+ if err != nil {
+ return fmt.Errorf("error while persisting contract logs: %v", err)
+ }
+ }
+
+ ci.state.FinalBlock = finalBlockNumber
+ ci.state.FinalQueueLen = finalQueueLen
+
+ return ci.persistState(tx)
+ })
+}
+
+// persistRecentRequestTxs persists processed recent transactions and the indexer state to the database
+func (ci *contractIndexer[TxType]) persistRecentRequestTxs(forkId beacon.ForkKey, finalBlockNumber, finalQueueLen uint64, requests []*TxType) error {
+ return db.RunDBTransaction(func(tx *sqlx.Tx) error {
+ if len(requests) > 0 {
+ err := ci.options.persistTxs(tx, requests)
+ if err != nil {
+ return fmt.Errorf("error while persisting contract logs: %v", err)
+ }
+ }
+
+ ci.state.ForkStates[forkId] = &contractIndexerForkState{
+ Block: finalBlockNumber,
+ QueueLen: finalQueueLen,
+ }
+
+ return ci.persistState(tx)
+ })
+}
diff --git a/indexer/execution/deposit_indexer.go b/indexer/execution/deposit_indexer.go
index 67bfbb6e..5f855f40 100644
--- a/indexer/execution/deposit_indexer.go
+++ b/indexer/execution/deposit_indexer.go
@@ -2,16 +2,13 @@ package execution
import (
"bytes"
- "context"
"encoding/binary"
"fmt"
"log"
- "math/big"
"strings"
"time"
"github.com/attestantio/go-eth2-client/spec/phase0"
- "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@@ -22,28 +19,27 @@ import (
"github.com/protolambda/ztyp/tree"
"github.com/sirupsen/logrus"
- "github.com/ethpandaops/dora/clients/execution"
"github.com/ethpandaops/dora/db"
"github.com/ethpandaops/dora/dbtypes"
"github.com/ethpandaops/dora/utils"
)
+const depositContractAbi = `[{"inputs":[],"stateMutability":"nonpayable","type":"constructor"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"bytes","name":"pubkey","type":"bytes"},{"indexed":false,"internalType":"bytes","name":"withdrawal_credentials","type":"bytes"},{"indexed":false,"internalType":"bytes","name":"amount","type":"bytes"},{"indexed":false,"internalType":"bytes","name":"signature","type":"bytes"},{"indexed":false,"internalType":"bytes","name":"index","type":"bytes"}],"name":"DepositEvent","type":"event"},{"inputs":[{"internalType":"bytes","name":"pubkey","type":"bytes"},{"internalType":"bytes","name":"withdrawal_credentials","type":"bytes"},{"internalType":"bytes","name":"signature","type":"bytes"},{"internalType":"bytes32","name":"deposit_data_root","type":"bytes32"}],"name":"deposit","outputs":[],"stateMutability":"payable","type":"function"},{"inputs":[],"name":"get_deposit_count","outputs":[{"internalType":"bytes","name":"","type":"bytes"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"get_deposit_root","outputs":[{"internalType":"bytes32","name":"","type":"bytes32"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"bytes4","name":"interfaceId","type":"bytes4"}],"name":"supportsInterface","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"pure","type":"function"}]`
+
+// DepositIndexer is the indexer for the deposit contract
type DepositIndexer struct {
- indexer *IndexerCtx
- logger logrus.FieldLogger
- state *dbtypes.DepositIndexerState
- batchSize int
- depositContract common.Address
- depositContractAbi *abi.ABI
- depositEventTopic []byte
- depositSigDomain zrnt_common.BLSDomain
- unfinalizedDeposits map[uint64]map[common.Hash]bool
-}
+ indexerCtx *IndexerCtx
+ logger logrus.FieldLogger
+ indexer *contractIndexer[dbtypes.DepositTx]
-const depositContractAbi = `[{"inputs":[],"stateMutability":"nonpayable","type":"constructor"},{"anonymous":false,"inputs":[{"indexed":false,"internalType":"bytes","name":"pubkey","type":"bytes"},{"indexed":false,"internalType":"bytes","name":"withdrawal_credentials","type":"bytes"},{"indexed":false,"internalType":"bytes","name":"amount","type":"bytes"},{"indexed":false,"internalType":"bytes","name":"signature","type":"bytes"},{"indexed":false,"internalType":"bytes","name":"index","type":"bytes"}],"name":"DepositEvent","type":"event"},{"inputs":[{"internalType":"bytes","name":"pubkey","type":"bytes"},{"internalType":"bytes","name":"withdrawal_credentials","type":"bytes"},{"internalType":"bytes","name":"signature","type":"bytes"},{"internalType":"bytes32","name":"deposit_data_root","type":"bytes32"}],"name":"deposit","outputs":[],"stateMutability":"payable","type":"function"},{"inputs":[],"name":"get_deposit_count","outputs":[{"internalType":"bytes","name":"","type":"bytes"}],"stateMutability":"view","type":"function"},{"inputs":[],"name":"get_deposit_root","outputs":[{"internalType":"bytes32","name":"","type":"bytes32"}],"stateMutability":"view","type":"function"},{"inputs":[{"internalType":"bytes4","name":"interfaceId","type":"bytes4"}],"name":"supportsInterface","outputs":[{"internalType":"bool","name":"","type":"bool"}],"stateMutability":"pure","type":"function"}]`
+ depositContractAbi *abi.ABI
+ depositEventTopic []byte
+ depositSigDomain zrnt_common.BLSDomain
+}
+// NewDepositIndexer creates a new deposit contract indexer
func NewDepositIndexer(indexer *IndexerCtx) *DepositIndexer {
- batchSize := utils.Config.ExecutionApi.DepositLogBatchSize
+ batchSize := utils.Config.ExecutionApi.LogBatchSize
if batchSize == 0 {
batchSize = 1000
}
@@ -60,380 +56,137 @@ func NewDepositIndexer(indexer *IndexerCtx) *DepositIndexer {
depositSigDomain := zrnt_common.ComputeDomain(zrnt_common.DOMAIN_DEPOSIT, zrnt_common.Version(genesisForkVersion), zrnt_common.Root{})
ds := &DepositIndexer{
- indexer: indexer,
- logger: indexer.logger.WithField("indexer", "deposit"),
- batchSize: batchSize,
- depositContract: common.Address(specs.DepositContractAddress),
- depositContractAbi: &contractAbi,
- depositEventTopic: depositEventTopic[:],
- depositSigDomain: depositSigDomain,
- unfinalizedDeposits: map[uint64]map[common.Hash]bool{},
+ indexerCtx: indexer,
+ logger: indexer.logger.WithField("indexer", "deposit"),
+ depositContractAbi: &contractAbi,
+ depositEventTopic: depositEventTopic[:],
+ depositSigDomain: depositSigDomain,
}
+ // create contract indexer for the deposit contract
+ ds.indexer = newContractIndexer(
+ indexer,
+ ds.logger.WithField("routine", "crawler"),
+ &contractIndexerOptions[dbtypes.DepositTx]{
+ stateKey: "indexer.depositstate",
+ batchSize: batchSize,
+ contractAddress: common.Address(specs.DepositContractAddress),
+ deployBlock: uint64(utils.Config.ExecutionApi.DepositDeployBlock),
+ dequeueRate: 0,
+
+ processFinalTx: ds.processFinalTx,
+ processRecentTx: ds.processRecentTx,
+ persistTxs: ds.persistDepositTxs,
+ },
+ )
+
go ds.runDepositIndexerLoop()
return ds
}
+// runDepositIndexerLoop is the main loop for the deposit indexer
func (ds *DepositIndexer) runDepositIndexerLoop() {
- defer utils.HandleSubroutinePanic("runCacheLoop")
+ defer utils.HandleSubroutinePanic("DepositIndexer.runDepositIndexerLoop")
for {
time.Sleep(60 * time.Second)
ds.logger.Debugf("run deposit indexer logic")
- err := ds.runDepositIndexer()
+ err := ds.indexer.runContractIndexer()
if err != nil {
ds.logger.Errorf("deposit indexer error: %v", err)
}
}
}
-func (ds *DepositIndexer) runDepositIndexer() error {
- // get indexer state
- if ds.state == nil {
- ds.loadState()
+// processFinalTx is the callback for the contract indexer to process final transactions
+// it parses the transaction and returns the corresponding deposit transaction
+func (ci *DepositIndexer) processFinalTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64) (*dbtypes.DepositTx, error) {
+ requestTx := ci.parseDepositLog(log)
+ if requestTx == nil {
+ return nil, fmt.Errorf("invalid deposit log")
}
- justifiedEpoch, justifiedRoot := ds.indexer.chainState.GetJustifiedCheckpoint()
- if justifiedEpoch > 0 {
-
- finalizedBlock := ds.indexer.beaconIndexer.GetBlockByRoot(justifiedRoot)
- if finalizedBlock == nil {
- return fmt.Errorf("could not get finalized block from cache (0x%x)", justifiedRoot)
- }
-
- indexVals := finalizedBlock.GetBlockIndex()
- if indexVals == nil {
- return fmt.Errorf("could not get finalized block index values (0x%x)", justifiedRoot)
- }
-
- finalizedBlockNumber := indexVals.ExecutionNumber
-
- if finalizedBlockNumber < ds.state.FinalBlock {
- return fmt.Errorf("finalized block number (%v) smaller than index state (%v)", finalizedBlockNumber, ds.state.FinalBlock)
- }
-
- if finalizedBlockNumber > ds.state.FinalBlock {
- err := ds.processFinalizedBlocks(finalizedBlockNumber)
- if err != nil {
- return err
- }
- }
- }
-
- ds.processRecentBlocks()
-
- return nil
-}
-
-func (ds *DepositIndexer) loadState() {
- syncState := dbtypes.DepositIndexerState{}
- db.GetExplorerState("indexer.depositstate", &syncState)
- ds.state = &syncState
-}
-
-func (ds *DepositIndexer) loadFilteredLogs(ctx context.Context, client *execution.Client, query ethereum.FilterQuery) ([]types.Log, error) {
- ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
- defer cancel()
-
- return client.GetRPCClient().GetEthClient().FilterLogs(ctx, query)
-}
-
-func (ds *DepositIndexer) loadTransactionByHash(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Transaction, error) {
- ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
- defer cancel()
-
- tx, _, err := client.GetRPCClient().GetEthClient().TransactionByHash(ctx, hash)
- return tx, err
-}
+ txTo := *tx.To()
-func (ds *DepositIndexer) loadHeaderByNumber(ctx context.Context, client *execution.Client, number uint64) (*types.Header, error) {
- ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
- defer cancel()
+ requestTx.BlockTime = header.Time
+ requestTx.TxSender = txFrom[:]
+ requestTx.TxTarget = txTo[:]
- return client.GetRPCClient().GetHeaderByNumber(ctx, number)
+ return requestTx, nil
}
-func (ds *DepositIndexer) processFinalizedBlocks(finalizedBlockNumber uint64) error {
- clients := ds.indexer.getFinalizedClients(execution.AnyClient)
- if len(clients) == 0 {
- return fmt.Errorf("no ready execution client found")
+// processRecentTx is the callback for the contract indexer to process recent transactions
+// it parses the transaction and returns the corresponding deposit transaction
+func (ci *DepositIndexer) processRecentTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64, fork *forkWithClients) (*dbtypes.DepositTx, error) {
+ requestTx := ci.parseDepositLog(log)
+ if requestTx == nil {
+ return nil, fmt.Errorf("invalid deposit log")
}
- client := clients[0]
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
+ txTo := *tx.To()
- for ds.state.FinalBlock < finalizedBlockNumber {
- toBlock := ds.state.FinalBlock + uint64(ds.batchSize)
- if toBlock > finalizedBlockNumber {
- toBlock = finalizedBlockNumber
- }
-
- query := ethereum.FilterQuery{
- FromBlock: big.NewInt(0).SetUint64(ds.state.FinalBlock + 1),
- ToBlock: big.NewInt(0).SetUint64(toBlock),
- Addresses: []common.Address{
- ds.depositContract,
- },
- }
+ requestTx.BlockTime = header.Time
+ requestTx.TxSender = txFrom[:]
+ requestTx.TxTarget = txTo[:]
- logs, err := ds.loadFilteredLogs(ctx, client, query)
- if err != nil {
- return fmt.Errorf("error fetching deposit contract logs: %v", err)
- }
-
- var txHash []byte
- var txDetails *types.Transaction
- var txBlockHeader *types.Header
-
- depositTxs := []*dbtypes.DepositTx{}
-
- ds.logger.Infof("received deposit log for block %v - %v: %v events", ds.state.FinalBlock, toBlock, len(logs))
-
- for idx := range logs {
- log := &logs[idx]
- if !bytes.Equal(log.Topics[0][:], ds.depositEventTopic) {
- continue
- }
-
- event, err := ds.depositContractAbi.Unpack("DepositEvent", log.Data)
- if err != nil {
- return fmt.Errorf("error decoding deposit event (%v): %v", log.TxHash, err)
-
- }
-
- if txHash == nil || !bytes.Equal(txHash, log.TxHash[:]) {
- txDetails, err = ds.loadTransactionByHash(ctx, client, log.TxHash)
- if err != nil {
- return fmt.Errorf("could not load tx details (%v): %v", log.TxHash, err)
- }
-
- txBlockHeader, err = ds.loadHeaderByNumber(ctx, client, log.BlockNumber)
- if err != nil {
- return fmt.Errorf("could not load block details (%v): %v", log.TxHash, err)
- }
-
- txHash = log.TxHash[:]
- }
-
- txFrom, err := types.Sender(types.LatestSignerForChainID(txDetails.ChainId()), txDetails)
- if err != nil {
- return fmt.Errorf("could not decode tx sender (%v): %v", log.TxHash, err)
- }
- txTo := *txDetails.To()
-
- depositTx := &dbtypes.DepositTx{
- Index: binary.LittleEndian.Uint64(event[4].([]byte)),
- BlockNumber: log.BlockNumber,
- BlockTime: txBlockHeader.Time,
- BlockRoot: log.BlockHash[:],
- PublicKey: event[0].([]byte),
- WithdrawalCredentials: event[1].([]byte),
- Amount: binary.LittleEndian.Uint64(event[2].([]byte)),
- Signature: event[3].([]byte),
- TxHash: log.TxHash[:],
- TxSender: txFrom[:],
- TxTarget: txTo[:],
- }
- ds.checkDepositValidity(depositTx)
- depositTxs = append(depositTxs, depositTx)
- }
-
- if len(depositTxs) > 0 {
- ds.logger.Infof("crawled deposits for block %v - %v: %v deposits", ds.state.FinalBlock, toBlock, len(depositTxs))
-
- depositCount := len(depositTxs)
- for depositIdx := 0; depositIdx < depositCount; depositIdx += 500 {
- endIdx := depositIdx + 500
- if endIdx > depositCount {
- endIdx = depositCount
- }
-
- err = ds.persistFinalizedDepositTxs(toBlock, depositTxs[depositIdx:endIdx])
- if err != nil {
- return fmt.Errorf("could not persist deposit txs: %v", err)
- }
- }
-
- for _, depositTx := range depositTxs {
- if ds.unfinalizedDeposits[depositTx.Index] != nil {
- delete(ds.unfinalizedDeposits, depositTx.Index)
- }
- }
-
- time.Sleep(1 * time.Second)
- } else {
- err = ds.persistFinalizedDepositTxs(toBlock, nil)
- if err != nil {
- return fmt.Errorf("could not persist deposit state: %v", err)
- }
- }
+ clBlock := ci.indexerCtx.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash))
+ if len(clBlock) > 0 {
+ requestTx.ForkId = uint64(clBlock[0].GetForkId())
+ } else {
+ requestTx.ForkId = uint64(fork.forkId)
}
- return nil
-}
-func (ds *DepositIndexer) processRecentBlocks() error {
- headForks := ds.indexer.getForksWithClients(execution.AnyClient)
- for _, headFork := range headForks {
- err := ds.processRecentBlocksForFork(headFork)
- if err != nil {
- if headFork.canonical {
- ds.logger.Errorf("could not process recent events from canonical fork %v: %v", headFork.forkId, err)
- } else {
- ds.logger.Warnf("could not process recent events from fork %v: %v", headFork.forkId, err)
- }
- }
- }
- return nil
+ return requestTx, nil
}
-func (ds *DepositIndexer) processRecentBlocksForFork(headFork *forkWithClients) error {
- elHeadBlock := ds.indexer.beaconIndexer.GetCanonicalHead(&headFork.forkId)
- if elHeadBlock == nil {
- return fmt.Errorf("head block not found")
+// parseDepositLog parses a deposit log and returns the corresponding deposit transaction
+func (ci *DepositIndexer) parseDepositLog(log *types.Log) *dbtypes.DepositTx {
+ if !bytes.Equal(log.Topics[0][:], ci.depositEventTopic) {
+ return nil
}
- elHeadBlockIndex := elHeadBlock.GetBlockIndex()
- if elHeadBlockIndex == nil {
- return fmt.Errorf("head block index not found")
+ event, err := ci.depositContractAbi.Unpack("DepositEvent", log.Data)
+ if err != nil {
+ ci.logger.Errorf("error decoding deposit event (%v): %v", log.TxHash, err)
+ return nil
}
- elHeadBlockNumber := elHeadBlockIndex.ExecutionNumber
-
- var resError error
- var ctxCancel context.CancelFunc
- defer func() {
- if ctxCancel != nil {
- ctxCancel()
- }
- }()
+ requestTx := &dbtypes.DepositTx{
+ Index: binary.LittleEndian.Uint64(event[4].([]byte)),
+ BlockNumber: log.BlockNumber,
+ BlockRoot: log.BlockHash[:],
+ PublicKey: event[0].([]byte),
+ WithdrawalCredentials: event[1].([]byte),
+ Amount: binary.LittleEndian.Uint64(event[2].([]byte)),
+ Signature: event[3].([]byte),
+ TxHash: log.TxHash[:],
+ }
+ ci.checkDepositValidity(requestTx)
- for retryCount := 0; retryCount < 3; retryCount++ {
- client := headFork.clients[retryCount%len(headFork.clients)]
+ return requestTx
+}
- if ctxCancel != nil {
- ctxCancel()
- }
- ctx, cancel := context.WithCancel(context.Background())
- ctxCancel = cancel
-
- query := ethereum.FilterQuery{
- FromBlock: big.NewInt(0).SetUint64(ds.state.FinalBlock + 1),
- ToBlock: big.NewInt(0).SetUint64(elHeadBlockNumber - 1),
- Addresses: []common.Address{
- ds.depositContract,
- },
+// persistDepositTxs is the callback for the contract indexer to persist deposit transactions to the database
+func (ci *DepositIndexer) persistDepositTxs(tx *sqlx.Tx, requests []*dbtypes.DepositTx) error {
+ requestCount := len(requests)
+ for requestIdx := 0; requestIdx < requestCount; requestIdx += 500 {
+ endIdx := requestIdx + 500
+ if endIdx > requestCount {
+ endIdx = requestCount
}
- logs, err := ds.loadFilteredLogs(ctx, client, query)
+ err := db.InsertDepositTxs(requests[requestIdx:endIdx], tx)
if err != nil {
- return fmt.Errorf("error fetching deposit contract logs: %v", err)
- }
-
- var txHash []byte
- var txDetails *types.Transaction
- var txBlockHeader *types.Header
-
- depositTxs := []*dbtypes.DepositTx{}
-
- for idx := range logs {
- log := &logs[idx]
- if !bytes.Equal(log.Topics[0][:], ds.depositEventTopic) {
- continue
- }
-
- event, err := ds.depositContractAbi.Unpack("DepositEvent", log.Data)
- if err != nil {
- return fmt.Errorf("error decoding deposit event (%v): %v", log.TxHash, err)
-
- }
-
- depositIndex := binary.LittleEndian.Uint64(event[4].([]byte))
- if ds.unfinalizedDeposits[depositIndex] != nil && ds.unfinalizedDeposits[depositIndex][log.BlockHash] {
- continue
- }
-
- if txHash == nil || !bytes.Equal(txHash, log.TxHash[:]) {
- txHash = log.TxHash[:]
-
- txDetails, err = ds.loadTransactionByHash(ctx, client, log.TxHash)
- if err != nil {
- return fmt.Errorf("could not load tx details (%v): %v", log.TxHash, err)
- }
-
- txBlockHeader, err = ds.loadHeaderByNumber(ctx, client, log.BlockNumber)
- if err != nil {
- return fmt.Errorf("could not load block details (%v): %v", log.TxHash, err)
- }
- }
-
- depositForkId := headFork.forkId
- beaconBlock := ds.indexer.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash))
- if len(beaconBlock) == 1 {
- depositForkId = beaconBlock[0].GetForkId()
- } else if len(beaconBlock) > 1 {
- depositForkId = beaconBlock[0].GetForkId()
- ds.logger.Warnf("found multiple beacon blocks for deposit block hash %v", log.BlockHash)
- }
-
- txFrom, err := types.Sender(types.LatestSignerForChainID(txDetails.ChainId()), txDetails)
- if err != nil {
- return fmt.Errorf("could not decode tx sender (%v): %v", log.TxHash, err)
- }
- txTo := *txDetails.To()
-
- depositTx := &dbtypes.DepositTx{
- Index: depositIndex,
- BlockNumber: log.BlockNumber,
- BlockTime: txBlockHeader.Time,
- BlockRoot: log.BlockHash[:],
- PublicKey: event[0].([]byte),
- WithdrawalCredentials: event[1].([]byte),
- Amount: binary.LittleEndian.Uint64(event[2].([]byte)),
- Signature: event[3].([]byte),
- Orphaned: true,
- ForkId: uint64(depositForkId),
- TxHash: log.TxHash[:],
- TxSender: txFrom[:],
- TxTarget: txTo[:],
- }
-
- ds.checkDepositValidity(depositTx)
- depositTxs = append(depositTxs, depositTx)
- }
-
- if len(depositTxs) > 0 {
- ds.logger.Infof("crawled recent deposits for fork %v since block %v: %v deposits", headFork.forkId, ds.state.FinalBlock, len(depositTxs))
-
- depositCount := len(depositTxs)
- for depositIdx := 0; depositIdx < depositCount; depositIdx += 500 {
- endIdx := depositIdx + 500
- if endIdx > depositCount {
- endIdx = depositCount
- }
-
- err = ds.persistRecentDepositTxs(depositTxs[depositIdx:endIdx])
- if err != nil {
- return fmt.Errorf("could not persist deposit txs: %v", err)
- }
- }
-
- for _, depositTx := range depositTxs {
- if ds.unfinalizedDeposits[depositTx.Index] == nil {
- ds.unfinalizedDeposits[depositTx.Index] = map[common.Hash]bool{}
- }
- ds.unfinalizedDeposits[depositTx.Index][common.Hash(depositTx.BlockRoot)] = true
- }
-
- time.Sleep(1 * time.Second)
+ return fmt.Errorf("error while inserting deposit txs: %v", err)
}
}
- return resError
+ return nil
}
+// checkDepositValidity checks if a deposit transaction has a valid signature
func (ds *DepositIndexer) checkDepositValidity(depositTx *dbtypes.DepositTx) {
depositMsg := &zrnt_common.DepositMessage{
Pubkey: zrnt_common.BLSPubkey(depositTx.PublicKey),
@@ -453,37 +206,3 @@ func (ds *DepositIndexer) checkDepositValidity(depositTx *dbtypes.DepositTx) {
depositTx.ValidSignature = true
}
}
-
-func (ds *DepositIndexer) persistFinalizedDepositTxs(toBlockNumber uint64, deposits []*dbtypes.DepositTx) error {
- return db.RunDBTransaction(func(tx *sqlx.Tx) error {
- if len(deposits) > 0 {
- err := db.InsertDepositTxs(deposits, tx)
- if err != nil {
- return fmt.Errorf("error while inserting deposit txs: %v", err)
- }
- }
-
- ds.state.FinalBlock = toBlockNumber
- if toBlockNumber > ds.state.HeadBlock {
- ds.state.HeadBlock = toBlockNumber
- }
-
- err := db.SetExplorerState("indexer.depositstate", ds.state, tx)
- if err != nil {
- return fmt.Errorf("error while updating deposit state: %v", err)
- }
-
- return nil
- })
-}
-
-func (ds *DepositIndexer) persistRecentDepositTxs(deposits []*dbtypes.DepositTx) error {
- return db.RunDBTransaction(func(tx *sqlx.Tx) error {
- err := db.InsertDepositTxs(deposits, tx)
- if err != nil {
- return fmt.Errorf("error while inserting deposit txs: %v", err)
- }
-
- return nil
- })
-}
diff --git a/indexer/execution/indexerctx.go b/indexer/execution/indexerctx.go
index d7d4e515..c4fa7a26 100644
--- a/indexer/execution/indexerctx.go
+++ b/indexer/execution/indexerctx.go
@@ -11,6 +11,7 @@ import (
"github.com/sirupsen/logrus"
)
+// IndexerCtx is the context for the execution indexer
type IndexerCtx struct {
logger logrus.FieldLogger
beaconIndexer *beacon.Indexer
@@ -20,6 +21,7 @@ type IndexerCtx struct {
executionClients map[*execution.Client]*indexerElClientInfo
}
+// indexerElClientInfo holds information about a client and its priority
type indexerElClientInfo struct {
priority int
archive bool
@@ -37,6 +39,7 @@ func NewIndexerCtx(logger logrus.FieldLogger, executionPool *execution.Pool, con
}
}
+// AddClientInfo adds client info to the indexer context
func (ictx *IndexerCtx) AddClientInfo(client *execution.Client, priority int, archive bool) {
ictx.executionClients[client] = &indexerElClientInfo{
priority: priority,
@@ -44,6 +47,7 @@ func (ictx *IndexerCtx) AddClientInfo(client *execution.Client, priority int, ar
}
}
+// getFinalizedClients returns a list of clients that have reached the finalized el block
func (ictx *IndexerCtx) getFinalizedClients(clientType execution.ClientType) []*execution.Client {
_, finalizedRoot := ictx.consensusPool.GetChainState().GetJustifiedCheckpoint()
@@ -62,6 +66,7 @@ func (ictx *IndexerCtx) getFinalizedClients(clientType execution.ClientType) []*
return finalizedClients
}
+// sortClients sorts clients by priority, but randomizes the order for equal priority
func (ictx *IndexerCtx) sortClients(clientA *execution.Client, clientB *execution.Client, preferArchive bool) bool {
clientAInfo := ictx.executionClients[clientA]
clientBInfo := ictx.executionClients[clientB]
@@ -77,6 +82,7 @@ func (ictx *IndexerCtx) sortClients(clientA *execution.Client, clientB *executio
return rand.IntN(2) == 0
}
+// forkWithClients holds information about a fork and the clients following it
type forkWithClients struct {
canonical bool
forkId beacon.ForkKey
@@ -84,6 +90,8 @@ type forkWithClients struct {
clients []*execution.Client
}
+// getForksWithClients returns a list of forks with their clients
+// the list is sorted by the canonical head and the number of clients
func (ictx *IndexerCtx) getForksWithClients(clientType execution.ClientType) []*forkWithClients {
forksWithClients := make([]*forkWithClients, 0)
forkHeadMap := map[beacon.ForkKey]*beacon.ForkHead{}
diff --git a/indexer/execution/transaction_matcher.go b/indexer/execution/transaction_matcher.go
new file mode 100644
index 00000000..33c33820
--- /dev/null
+++ b/indexer/execution/transaction_matcher.go
@@ -0,0 +1,195 @@
+package execution
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/jmoiron/sqlx"
+ "github.com/sirupsen/logrus"
+
+ "github.com/ethpandaops/dora/db"
+)
+
+// transactionMatcher is used to match transactions to requests in the database
+type transactionMatcher[MatchType any] struct {
+ indexer *IndexerCtx
+ logger logrus.FieldLogger
+ options *transactionMatcherOptions[MatchType]
+ state *transactionMatcherState
+}
+
+// transactionMatcherOptions are the options for the transaction matcher
+type transactionMatcherOptions[MatchType any] struct {
+ stateKey string
+ deployBlock uint64
+ timeLimit time.Duration
+
+ matchBlockRange func(fromBlock uint64, toBlock uint64) ([]*MatchType, error)
+ persistMatches func(tx *sqlx.Tx, matches []*MatchType) error
+}
+
+// transactionMatcherState is the state of the transaction matcher
+type transactionMatcherState struct {
+ MatchHeight uint64 `json:"match_height"`
+}
+
+// newTransactionMatcher creates a new transaction matcher
+func newTransactionMatcher[MatchType any](indexer *IndexerCtx, logger logrus.FieldLogger, options *transactionMatcherOptions[MatchType]) *transactionMatcher[MatchType] {
+ ci := &transactionMatcher[MatchType]{
+ indexer: indexer,
+ logger: logger,
+ options: options,
+ }
+
+ return ci
+}
+
+// GetMatcherHeight returns the current match height of the transaction matcher
+func (ds *transactionMatcher[MatchType]) GetMatcherHeight() uint64 {
+ if ds.state == nil {
+ ds.loadState()
+ }
+
+ return ds.state.MatchHeight
+}
+
+// runTransactionMatcher runs the transaction matcher logic for the next block ranges if its fully loaded and ready to be matched.
+func (ds *transactionMatcher[MatchType]) runTransactionMatcher(indexerBlock uint64) error {
+ // get matcher state
+ if ds.state == nil {
+ ds.loadState()
+ }
+
+ finalizedBlock := ds.getFinalizedBlockNumber()
+
+ matchTargetHeight := finalizedBlock
+ if indexerBlock < matchTargetHeight {
+ matchTargetHeight = indexerBlock
+ }
+
+ // check if the synchronization is running and if so, if its ahead of the block range we want to match
+ _, syncEpoch := ds.indexer.beaconIndexer.GetSynchronizerState()
+ indexerFinalizedEpoch, _ := ds.indexer.beaconIndexer.GetBlockCacheState()
+ if syncEpoch < indexerFinalizedEpoch {
+ // synchronization is behind head, check if our block range is synced
+ syncSlot := ds.indexer.chainState.EpochToSlot(syncEpoch)
+ syncBlockRoot := db.GetHighestRootBeforeSlot(uint64(syncSlot), false)
+ if syncBlockRoot == nil {
+ // no block found, not synced at all
+ return nil
+ }
+
+ syncBlock := db.GetSlotByRoot(syncBlockRoot)
+ if syncBlock == nil {
+ // block not found, not synced at all
+ return nil
+ }
+
+ if syncBlock.EthBlockNumber == nil {
+ // synced before belatrix
+ return nil
+ }
+
+ syncBlockNumber := *syncBlock.EthBlockNumber
+ if syncBlockNumber < matchTargetHeight {
+ matchTargetHeight = syncBlockNumber
+ }
+ }
+
+ t1 := time.Now()
+ persistedHeight := ds.state.MatchHeight
+
+ // match blocks until we reach the target height or the time limit is reached
+ for ds.state.MatchHeight < matchTargetHeight && time.Since(t1) < ds.options.timeLimit {
+ matchTarget := ds.state.MatchHeight + 200
+ if matchTarget > matchTargetHeight {
+ matchTarget = matchTargetHeight
+ }
+
+ matches, err := ds.options.matchBlockRange(ds.state.MatchHeight, matchTarget)
+ if err != nil {
+ return err
+ }
+
+ ds.state.MatchHeight = matchTarget
+
+ if len(matches) > 0 {
+ // persist matches to the database
+ err := db.RunDBTransaction(func(tx *sqlx.Tx) error {
+ ds.options.persistMatches(tx, matches)
+
+ persistedHeight = matchTarget
+ return ds.persistState(tx)
+ })
+
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ if ds.state.MatchHeight > persistedHeight {
+ err := db.RunDBTransaction(func(tx *sqlx.Tx) error {
+ return ds.persistState(tx)
+ })
+
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// loadState loads the state of the transaction matcher from the database
+func (ds *transactionMatcher[_]) loadState() {
+ syncState := transactionMatcherState{}
+ db.GetExplorerState(ds.options.stateKey, &syncState)
+ ds.state = &syncState
+
+ if ds.state.MatchHeight == 0 {
+ ds.state.MatchHeight = ds.options.deployBlock
+ }
+}
+
+// persistState persists the state of the transaction matcher to the database
+func (ds *transactionMatcher[_]) persistState(tx *sqlx.Tx) error {
+ err := db.SetExplorerState(ds.options.stateKey, ds.state, tx)
+ if err != nil {
+ return fmt.Errorf("error while updating tx matcher state: %v", err)
+ }
+
+ return nil
+}
+
+// getFinalizedBlockNumber gets highest available finalized el block number
+// available means that the block is processed by the finalization routine, so all request operations are available in the db
+func (ds *transactionMatcher[_]) getFinalizedBlockNumber() uint64 {
+ var finalizedBlockNumber uint64
+
+ finalizedEpoch, finalizedRoot := ds.indexer.chainState.GetFinalizedCheckpoint()
+ if finalizedBlock := ds.indexer.beaconIndexer.GetBlockByRoot(finalizedRoot); finalizedBlock != nil {
+ if indexVals := finalizedBlock.GetBlockIndex(); indexVals != nil {
+ finalizedBlockNumber = indexVals.ExecutionNumber
+ }
+ }
+
+ if finalizedBlockNumber == 0 {
+ // load from db
+ if finalizedBlock := db.GetSlotByRoot(finalizedRoot[:]); finalizedBlock != nil && finalizedBlock.EthBlockNumber != nil {
+ finalizedBlockNumber = *finalizedBlock.EthBlockNumber
+ }
+ }
+
+ for {
+ indexerFinalizedEpoch, _ := ds.indexer.beaconIndexer.GetBlockCacheState()
+ if indexerFinalizedEpoch >= finalizedEpoch {
+ break
+ }
+
+ // wait for finalization routine to catch up
+ time.Sleep(1 * time.Second)
+ }
+
+ return finalizedBlockNumber
+}
diff --git a/indexer/execution/withdrawal_indexer.go b/indexer/execution/withdrawal_indexer.go
new file mode 100644
index 00000000..a4f272bf
--- /dev/null
+++ b/indexer/execution/withdrawal_indexer.go
@@ -0,0 +1,284 @@
+package execution
+
+import (
+ "bytes"
+ "fmt"
+ "math/big"
+ "time"
+
+ "github.com/attestantio/go-eth2-client/spec/phase0"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/jmoiron/sqlx"
+ "github.com/sirupsen/logrus"
+
+ "github.com/ethpandaops/dora/db"
+ "github.com/ethpandaops/dora/dbtypes"
+ "github.com/ethpandaops/dora/indexer/beacon"
+ "github.com/ethpandaops/dora/utils"
+)
+
+const withdrawalContractAddr = "0x09Fc772D0857550724b07B850a4323f39112aAaA"
+
+// WithdrawalIndexer is the indexer for the eip-7002 consolidation system contract
+type WithdrawalIndexer struct {
+ indexerCtx *IndexerCtx
+ logger logrus.FieldLogger
+ indexer *contractIndexer[dbtypes.WithdrawalRequestTx]
+ matcher *transactionMatcher[withdrawalRequestMatch]
+}
+
+type withdrawalRequestMatch struct {
+ slotRoot []byte
+ slotIndex uint64
+ txHash []byte
+}
+
+// NewWithdrawalIndexer creates a new withdrawal contract indexer
+func NewWithdrawalIndexer(indexer *IndexerCtx) *WithdrawalIndexer {
+ batchSize := utils.Config.ExecutionApi.LogBatchSize
+ if batchSize == 0 {
+ batchSize = 1000
+ }
+
+ wi := &WithdrawalIndexer{
+ indexerCtx: indexer,
+ logger: indexer.logger.WithField("indexer", "withdrawals"),
+ }
+
+ specs := indexer.chainState.GetSpecs()
+
+ // create contract indexer for the withdrawal contract
+ wi.indexer = newContractIndexer(
+ indexer,
+ indexer.logger.WithField("contract-indexer", "withdrawals"),
+ &contractIndexerOptions[dbtypes.WithdrawalRequestTx]{
+ stateKey: "indexer.withdrawalindexer",
+ batchSize: batchSize,
+ contractAddress: common.HexToAddress(withdrawalContractAddr),
+ deployBlock: uint64(utils.Config.ExecutionApi.ElectraDeployBlock),
+ dequeueRate: specs.MaxWithdrawalRequestsPerPayload,
+
+ processFinalTx: wi.processFinalTx,
+ processRecentTx: wi.processRecentTx,
+ persistTxs: wi.persistWithdrawalTxs,
+ },
+ )
+
+ // create transaction matcher for the withdrawal contract
+ wi.matcher = newTransactionMatcher(
+ indexer,
+ indexer.logger.WithField("contract-matcher", "withdrawals"),
+ &transactionMatcherOptions[withdrawalRequestMatch]{
+ stateKey: "indexer.withdrawalmatcher",
+ deployBlock: uint64(utils.Config.ExecutionApi.ElectraDeployBlock),
+ timeLimit: 2 * time.Second,
+
+ matchBlockRange: wi.matchBlockRange,
+ persistMatches: wi.persistMatches,
+ },
+ )
+
+ go wi.runWithdrawalIndexerLoop()
+
+ return wi
+}
+
+// GetMatcherHeight returns the last processed el block number from the transaction matcher
+func (wi *WithdrawalIndexer) GetMatcherHeight() uint64 {
+ return wi.matcher.GetMatcherHeight()
+}
+
+// runWithdrawalIndexerLoop is the main loop for the withdrawal indexer
+func (wi *WithdrawalIndexer) runWithdrawalIndexerLoop() {
+ defer utils.HandleSubroutinePanic("WithdrawalIndexer.runWithdrawalIndexerLoop")
+
+ for {
+ time.Sleep(30 * time.Second)
+ wi.logger.Debugf("run withdrawal indexer logic")
+
+ err := wi.indexer.runContractIndexer()
+ if err != nil {
+ wi.logger.Errorf("indexer error: %v", err)
+ }
+
+ err = wi.matcher.runTransactionMatcher(wi.indexer.state.FinalBlock)
+ if err != nil {
+ wi.logger.Errorf("matcher error: %v", err)
+ }
+ }
+}
+
+// processFinalTx is the callback for the contract indexer to process final transactions
+// it parses the transaction and returns the corresponding withdrawal transaction
+func (wi *WithdrawalIndexer) processFinalTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64) (*dbtypes.WithdrawalRequestTx, error) {
+ requestTx := wi.parseRequestLog(log, nil)
+ if requestTx == nil {
+ return nil, fmt.Errorf("invalid withdrawal log")
+ }
+
+ txTo := *tx.To()
+
+ requestTx.BlockTime = header.Time
+ requestTx.TxSender = txFrom[:]
+ requestTx.TxTarget = txTo[:]
+ requestTx.DequeueBlock = dequeueBlock
+
+ return requestTx, nil
+}
+
+// processRecentTx is the callback for the contract indexer to process recent transactions
+// it parses the transaction and returns the corresponding withdrawal transaction
+func (wi *WithdrawalIndexer) processRecentTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64, fork *forkWithClients) (*dbtypes.WithdrawalRequestTx, error) {
+ requestTx := wi.parseRequestLog(log, &fork.forkId)
+ if requestTx == nil {
+ return nil, fmt.Errorf("invalid withdrawal log")
+ }
+
+ txTo := *tx.To()
+
+ requestTx.BlockTime = header.Time
+ requestTx.TxSender = txFrom[:]
+ requestTx.TxTarget = txTo[:]
+ requestTx.DequeueBlock = dequeueBlock
+
+ clBlock := wi.indexerCtx.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash))
+ if len(clBlock) > 0 {
+ requestTx.ForkId = uint64(clBlock[0].GetForkId())
+ } else {
+ requestTx.ForkId = uint64(fork.forkId)
+ }
+
+ return requestTx, nil
+}
+
+// parseRequestLog parses a withdrawal log and returns the corresponding withdrawal transaction
+func (wi *WithdrawalIndexer) parseRequestLog(log *types.Log, forkId *beacon.ForkKey) *dbtypes.WithdrawalRequestTx {
+ // data layout:
+ // 0-20: sender address (20 bytes)
+ // 20-68: validator pubkey (48 bytes)
+ // 68-76: amount (8 bytes)
+
+ if len(log.Data) < 76 {
+ wi.logger.Warnf("invalid withdrawal log data length: %v", len(log.Data))
+ return nil
+ }
+
+ senderAddr := log.Data[:20]
+ validatorPubkey := log.Data[20:68]
+ amount := big.NewInt(0).SetBytes(log.Data[68:76]).Uint64()
+
+ validatorSet := wi.indexerCtx.beaconIndexer.GetCanonicalValidatorSet(forkId)
+
+ var validatorIndex *uint64
+ for _, validator := range validatorSet {
+ if bytes.Equal(validator.Validator.PublicKey[:], validatorPubkey) {
+ index := uint64(validator.Index)
+ validatorIndex = &index
+ break
+ }
+ }
+
+ requestTx := &dbtypes.WithdrawalRequestTx{
+ BlockNumber: log.BlockNumber,
+ BlockIndex: uint64(log.Index),
+ BlockRoot: log.BlockHash[:],
+ SourceAddress: senderAddr,
+ ValidatorPubkey: validatorPubkey,
+ ValidatorIndex: validatorIndex,
+ Amount: amount,
+ TxHash: log.TxHash[:],
+ }
+
+ return requestTx
+}
+
+// persistWithdrawalTxs is the callback for the contract indexer to persist withdrawal transactions to the database
+func (wi *WithdrawalIndexer) persistWithdrawalTxs(tx *sqlx.Tx, requests []*dbtypes.WithdrawalRequestTx) error {
+ requestCount := len(requests)
+ for requestIdx := 0; requestIdx < requestCount; requestIdx += 500 {
+ endIdx := requestIdx + 500
+ if endIdx > requestCount {
+ endIdx = requestCount
+ }
+
+ err := db.InsertWithdrawalRequestTxs(requests[requestIdx:endIdx], tx)
+ if err != nil {
+ return fmt.Errorf("error while inserting withdrawal txs: %v", err)
+ }
+ }
+
+ return nil
+}
+
+// matchBlockRange is the callback for the transaction matcher to match withdrawal requests with their corresponding transactions
+func (wi *WithdrawalIndexer) matchBlockRange(fromBlock uint64, toBlock uint64) ([]*withdrawalRequestMatch, error) {
+ requestMatches := []*withdrawalRequestMatch{}
+
+ dequeueWithdrawalTxs := db.GetWithdrawalRequestTxsByDequeueRange(fromBlock, toBlock)
+ if len(dequeueWithdrawalTxs) > 0 {
+ firstBlock := dequeueWithdrawalTxs[0].DequeueBlock
+ lastBlock := dequeueWithdrawalTxs[len(dequeueWithdrawalTxs)-1].DequeueBlock
+
+ for _, withdrawalRequest := range db.GetWithdrawalRequestsByElBlockRange(firstBlock, lastBlock) {
+ if len(withdrawalRequest.TxHash) > 0 {
+ continue
+ }
+
+ parentForkIds := wi.indexerCtx.beaconIndexer.GetParentForkIds(beacon.ForkKey(withdrawalRequest.ForkId))
+ isParentFork := func(forkId uint64) bool {
+ if forkId == withdrawalRequest.ForkId {
+ return true
+ }
+ for _, parentForkId := range parentForkIds {
+ if uint64(parentForkId) == forkId {
+ return true
+ }
+ }
+ return false
+ }
+
+ matchingTxs := []*dbtypes.WithdrawalRequestTx{}
+ for _, tx := range dequeueWithdrawalTxs {
+ if tx.DequeueBlock == withdrawalRequest.BlockNumber && isParentFork(tx.ForkId) {
+ matchingTxs = append(matchingTxs, tx)
+ }
+ }
+
+ if len(matchingTxs) == 0 {
+ for _, tx := range dequeueWithdrawalTxs {
+ if tx.DequeueBlock == withdrawalRequest.BlockNumber {
+ matchingTxs = append(matchingTxs, tx)
+ }
+ }
+ }
+
+ if len(matchingTxs) < int(withdrawalRequest.SlotIndex)+1 {
+ continue
+ }
+
+ txHash := matchingTxs[withdrawalRequest.SlotIndex].TxHash
+ wi.logger.Debugf("Matched withdrawal request %d:%v with tx 0x%x", withdrawalRequest.SlotNumber, withdrawalRequest.SlotIndex, txHash)
+
+ requestMatches = append(requestMatches, &withdrawalRequestMatch{
+ slotRoot: withdrawalRequest.SlotRoot,
+ slotIndex: withdrawalRequest.SlotIndex,
+ txHash: txHash,
+ })
+ }
+ }
+
+ return requestMatches, nil
+}
+
+// persistMatches is the callback for the transaction matcher to persist matches to the database
+func (wi *WithdrawalIndexer) persistMatches(tx *sqlx.Tx, matches []*withdrawalRequestMatch) error {
+ for _, match := range matches {
+ err := db.UpdateWithdrawalRequestTxHash(match.slotRoot, match.slotIndex, match.txHash, tx)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/services/chainservice.go b/services/chainservice.go
index f29ef0af..0fe5729c 100644
--- a/services/chainservice.go
+++ b/services/chainservice.go
@@ -24,13 +24,16 @@ import (
)
type ChainService struct {
- logger logrus.FieldLogger
- consensusPool *consensus.Pool
- executionPool *execution.Pool
- beaconIndexer *beacon.Indexer
- validatorNames *ValidatorNames
- mevRelayIndexer *mevrelay.MevIndexer
- started bool
+ logger logrus.FieldLogger
+ consensusPool *consensus.Pool
+ executionPool *execution.Pool
+ beaconIndexer *beacon.Indexer
+ validatorNames *ValidatorNames
+ depositIndexer *execindexer.DepositIndexer
+ consolidationIndexer *execindexer.ConsolidationIndexer
+ withdrawalIndexer *execindexer.WithdrawalIndexer
+ mevRelayIndexer *mevrelay.MevIndexer
+ started bool
}
var GlobalBeaconService *ChainService
@@ -179,7 +182,9 @@ func (cs *ChainService) StartService() error {
cs.beaconIndexer.StartIndexer()
// add execution indexers
- execindexer.NewDepositIndexer(executionIndexerCtx)
+ cs.depositIndexer = execindexer.NewDepositIndexer(executionIndexerCtx)
+ cs.consolidationIndexer = execindexer.NewConsolidationIndexer(executionIndexerCtx)
+ cs.withdrawalIndexer = execindexer.NewWithdrawalIndexer(executionIndexerCtx)
// start MEV relay indexer
cs.mevRelayIndexer.StartUpdater()
@@ -191,6 +196,14 @@ func (bs *ChainService) GetBeaconIndexer() *beacon.Indexer {
return bs.beaconIndexer
}
+func (bs *ChainService) GetConsolidationIndexer() *execindexer.ConsolidationIndexer {
+ return bs.consolidationIndexer
+}
+
+func (bs *ChainService) GetWithdrawalIndexer() *execindexer.WithdrawalIndexer {
+ return bs.withdrawalIndexer
+}
+
func (bs *ChainService) GetConsensusClients() []*consensus.Client {
if bs == nil || bs.consensusPool == nil {
return nil
@@ -245,6 +258,10 @@ func (bs *ChainService) GetGenesis() (*v1.Genesis, error) {
return chainState.GetGenesis(), nil
}
+func (bs *ChainService) GetParentForkIds(forkId beacon.ForkKey) []beacon.ForkKey {
+ return bs.beaconIndexer.GetParentForkIds(forkId)
+}
+
type ConsensusClientFork struct {
Slot phase0.Slot
Root phase0.Root
diff --git a/services/chainservice_objects.go b/services/chainservice_objects.go
index bab1c214..4869447a 100644
--- a/services/chainservice_objects.go
+++ b/services/chainservice_objects.go
@@ -442,7 +442,12 @@ func (bs *ChainService) GetWithdrawalRequestsByFilter(filter *dbtypes.Withdrawal
}
// load older objects from db
- dbPage := pageIdx - cachedPages
+ var dbPage uint64
+ if pageIdx > cachedPages {
+ dbPage = pageIdx - cachedPages
+ } else {
+ dbPage = 0
+ }
dbCacheOffset := uint64(pageSize) - (cachedMatchesLen % uint64(pageSize))
var dbObjects []*dbtypes.WithdrawalRequest
@@ -571,7 +576,12 @@ func (bs *ChainService) GetConsolidationRequestsByFilter(filter *dbtypes.Consoli
}
// load older objects from db
- dbPage := pageIdx - cachedPages
+ var dbPage uint64
+ if pageIdx > cachedPages {
+ dbPage = pageIdx - cachedPages
+ } else {
+ dbPage = 0
+ }
dbCacheOffset := uint64(pageSize) - (cachedMatchesLen % uint64(pageSize))
var dbObjects []*dbtypes.ConsolidationRequest
diff --git a/templates/_layout/layout.html b/templates/_layout/layout.html
index 1d3d7049..574d13c9 100644
--- a/templates/_layout/layout.html
+++ b/templates/_layout/layout.html
@@ -46,6 +46,9 @@