Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement request log crawler for consolidation & withdrawal request transactions #141

Merged
merged 29 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
26b9d3e
implement consolidation request log crawler
pk910 Oct 8, 2024
88db084
add pgsql schema
pk910 Oct 8, 2024
7fb76b1
add withdrawal indexer
pk910 Oct 8, 2024
8c48ada
start consolidation & withdrawal indexers
pk910 Oct 8, 2024
dfe4b80
add consolidation & withdrawal matchers
pk910 Oct 8, 2024
eb46ea3
remove dead code
pk910 Oct 8, 2024
b12c885
Merge branch 'master' into pk910/pectra-log-crawler
pk910 Oct 9, 2024
a0203d1
deduplicate contract indexer logic
pk910 Oct 9, 2024
16a8c3d
refactoring
pk910 Oct 9, 2024
00d00b6
generalize transaction matcher logic
pk910 Oct 9, 2024
e502857
lookup validator indexes for request transactions
pk910 Oct 9, 2024
0d9551e
show transaction hashes on withdrawal & consolidation requests pages
pk910 Oct 9, 2024
e9443f7
cleanup & docs
pk910 Oct 10, 2024
aa99d4b
show more transaction details as popover for consolidation/withdrawal…
pk910 Oct 10, 2024
3e06f4d
fix queue calculation in contract indexer
pk910 Oct 10, 2024
1297c87
cleanup
pk910 Oct 10, 2024
ef41830
add missing settings to example & default config
pk910 Oct 10, 2024
a61b59c
update eip7002 & eip7251 contract addresses
pk910 Oct 15, 2024
e378602
revert contract address update for testing with devnet3 client images
pk910 Oct 15, 2024
60a1bf1
fix dequeue rate for withdrawal contract
pk910 Oct 15, 2024
64038db
fix index underflow in GetWithdrawalRequestsByFilter / GetConsolidati…
pk910 Oct 16, 2024
fffe1c4
rename `DepositLogBatchSize` setting
pk910 Oct 16, 2024
ca6a836
fix comment
pk910 Oct 16, 2024
0d7d7e3
Merge commit 'c720fbdf4a9efd607b560984cb099e79cabae772' into pk910/pe…
pk910 Oct 16, 2024
d6aa153
load dequeue rate from specs
pk910 Oct 16, 2024
af34759
improve log fields for contract indexer/matcher
pk910 Oct 24, 2024
8bfc4b9
update contract addresses for devnet 4
pk910 Oct 24, 2024
52a45a2
Merge commit 'a53497d48937d0154275376d59f7cb71bfe47c27' into pk910/pe…
pk910 Oct 24, 2024
ebad165
Merge branch 'master' into pk910/pectra-log-crawler
pk910 Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .hack/devnet/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ fi

## Run devnet using kurtosis
ENCLAVE_NAME="${ENCLAVE_NAME:-dora}"
ETHEREUM_PACKAGE="${ETHEREUM_PACKAGE:-github.com/ethpandaops/ethereum-package}"
if kurtosis enclave inspect "$ENCLAVE_NAME" > /dev/null; then
echo "Kurtosis enclave '$ENCLAVE_NAME' is already up."
else
kurtosis run github.com/ethpandaops/ethereum-package \
kurtosis run "$ETHEREUM_PACKAGE" \
--image-download always \
--enclave "$ENCLAVE_NAME" \
--args-file "${config_file}"
Expand Down
70 changes: 36 additions & 34 deletions clients/consensus/chainspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,42 @@ type ForkVersion struct {

// https://github.com/ethereum/consensus-specs/blob/dev/configs/mainnet.yaml
type ChainSpec struct {
PresetBase string `yaml:"PRESET_BASE"`
ConfigName string `yaml:"CONFIG_NAME"`
MinGenesisTime time.Time `yaml:"MIN_GENESIS_TIME"`
GenesisForkVersion phase0.Version `yaml:"GENESIS_FORK_VERSION"`
AltairForkVersion phase0.Version `yaml:"ALTAIR_FORK_VERSION"`
AltairForkEpoch *uint64 `yaml:"ALTAIR_FORK_EPOCH"`
BellatrixForkVersion phase0.Version `yaml:"BELLATRIX_FORK_VERSION"`
BellatrixForkEpoch *uint64 `yaml:"BELLATRIX_FORK_EPOCH"`
CapellaForkVersion phase0.Version `yaml:"CAPELLA_FORK_VERSION"`
CapellaForkEpoch *uint64 `yaml:"CAPELLA_FORK_EPOCH"`
DenebForkVersion phase0.Version `yaml:"DENEB_FORK_VERSION"`
DenebForkEpoch *uint64 `yaml:"DENEB_FORK_EPOCH"`
ElectraForkVersion phase0.Version `yaml:"ELECTRA_FORK_VERSION"`
ElectraForkEpoch *uint64 `yaml:"ELECTRA_FORK_EPOCH"`
Eip7594ForkVersion phase0.Version `yaml:"EIP7594_FORK_VERSION"`
Eip7594ForkEpoch *uint64 `yaml:"EIP7594_FORK_EPOCH"`
SecondsPerSlot time.Duration `yaml:"SECONDS_PER_SLOT"`
SlotsPerEpoch uint64 `yaml:"SLOTS_PER_EPOCH"`
EpochsPerHistoricalVector uint64 `yaml:"EPOCHS_PER_HISTORICAL_VECTOR"`
EpochsPerSlashingVector uint64 `yaml:"EPOCHS_PER_SLASHINGS_VECTOR"`
EpochsPerSyncCommitteePeriod uint64 `yaml:"EPOCHS_PER_SYNC_COMMITTEE_PERIOD"`
MinSeedLookahead uint64 `yaml:"MIN_SEED_LOOKAHEAD"`
ShuffleRoundCount uint64 `yaml:"SHUFFLE_ROUND_COUNT"`
MaxEffectiveBalance uint64 `yaml:"MAX_EFFECTIVE_BALANCE"`
MaxEffectiveBalanceElectra uint64 `yaml:"MAX_EFFECTIVE_BALANCE_ELECTRA"`
TargetCommitteeSize uint64 `yaml:"TARGET_COMMITTEE_SIZE"`
MaxCommitteesPerSlot uint64 `yaml:"MAX_COMMITTEES_PER_SLOT"`
MinPerEpochChurnLimit uint64 `yaml:"MIN_PER_EPOCH_CHURN_LIMIT"`
ChurnLimitQuotient uint64 `yaml:"CHURN_LIMIT_QUOTIENT"`
DomainBeaconProposer phase0.DomainType `yaml:"DOMAIN_BEACON_PROPOSER"`
DomainBeaconAttester phase0.DomainType `yaml:"DOMAIN_BEACON_ATTESTER"`
DomainSyncCommittee phase0.DomainType `yaml:"DOMAIN_SYNC_COMMITTEE"`
SyncCommitteeSize uint64 `yaml:"SYNC_COMMITTEE_SIZE"`
DepositContractAddress []byte `yaml:"DEPOSIT_CONTRACT_ADDRESS"`
PresetBase string `yaml:"PRESET_BASE"`
ConfigName string `yaml:"CONFIG_NAME"`
MinGenesisTime time.Time `yaml:"MIN_GENESIS_TIME"`
GenesisForkVersion phase0.Version `yaml:"GENESIS_FORK_VERSION"`
AltairForkVersion phase0.Version `yaml:"ALTAIR_FORK_VERSION"`
AltairForkEpoch *uint64 `yaml:"ALTAIR_FORK_EPOCH"`
BellatrixForkVersion phase0.Version `yaml:"BELLATRIX_FORK_VERSION"`
BellatrixForkEpoch *uint64 `yaml:"BELLATRIX_FORK_EPOCH"`
CapellaForkVersion phase0.Version `yaml:"CAPELLA_FORK_VERSION"`
CapellaForkEpoch *uint64 `yaml:"CAPELLA_FORK_EPOCH"`
DenebForkVersion phase0.Version `yaml:"DENEB_FORK_VERSION"`
DenebForkEpoch *uint64 `yaml:"DENEB_FORK_EPOCH"`
ElectraForkVersion phase0.Version `yaml:"ELECTRA_FORK_VERSION"`
ElectraForkEpoch *uint64 `yaml:"ELECTRA_FORK_EPOCH"`
Eip7594ForkVersion phase0.Version `yaml:"EIP7594_FORK_VERSION"`
Eip7594ForkEpoch *uint64 `yaml:"EIP7594_FORK_EPOCH"`
SecondsPerSlot time.Duration `yaml:"SECONDS_PER_SLOT"`
SlotsPerEpoch uint64 `yaml:"SLOTS_PER_EPOCH"`
EpochsPerHistoricalVector uint64 `yaml:"EPOCHS_PER_HISTORICAL_VECTOR"`
EpochsPerSlashingVector uint64 `yaml:"EPOCHS_PER_SLASHINGS_VECTOR"`
EpochsPerSyncCommitteePeriod uint64 `yaml:"EPOCHS_PER_SYNC_COMMITTEE_PERIOD"`
MinSeedLookahead uint64 `yaml:"MIN_SEED_LOOKAHEAD"`
ShuffleRoundCount uint64 `yaml:"SHUFFLE_ROUND_COUNT"`
MaxEffectiveBalance uint64 `yaml:"MAX_EFFECTIVE_BALANCE"`
MaxEffectiveBalanceElectra uint64 `yaml:"MAX_EFFECTIVE_BALANCE_ELECTRA"`
TargetCommitteeSize uint64 `yaml:"TARGET_COMMITTEE_SIZE"`
MaxCommitteesPerSlot uint64 `yaml:"MAX_COMMITTEES_PER_SLOT"`
MinPerEpochChurnLimit uint64 `yaml:"MIN_PER_EPOCH_CHURN_LIMIT"`
ChurnLimitQuotient uint64 `yaml:"CHURN_LIMIT_QUOTIENT"`
DomainBeaconProposer phase0.DomainType `yaml:"DOMAIN_BEACON_PROPOSER"`
DomainBeaconAttester phase0.DomainType `yaml:"DOMAIN_BEACON_ATTESTER"`
DomainSyncCommittee phase0.DomainType `yaml:"DOMAIN_SYNC_COMMITTEE"`
SyncCommitteeSize uint64 `yaml:"SYNC_COMMITTEE_SIZE"`
DepositContractAddress []byte `yaml:"DEPOSIT_CONTRACT_ADDRESS"`
MaxConsolidationRequestsPerPayload uint64 `yaml:"MAX_CONSOLIDATION_REQUESTS_PER_PAYLOAD"`
MaxWithdrawalRequestsPerPayload uint64 `yaml:"MAX_WITHDRAWAL_REQUESTS_PER_PAYLOAD"`

// EIP7594: PeerDAS
NumberOfColumns *uint64 `yaml:"NUMBER_OF_COLUMNS"`
Expand Down
4 changes: 3 additions & 1 deletion config/default.config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ executionapi:
- name: "local"
url: "http://127.0.0.1:8545"

depositLogBatchSize: 1000
logBatchSize: 1000
depositDeployBlock: 0 # el block number from where to crawl the deposit contract (should be <=, but close to the deposit contract deployment block)
electraDeployBlock: 0 # el block number from where to crawl the electra system contracts (should be <=, but close to electra fork activation block)

# indexer keeps track of the latest epochs in memory.
indexer:
Expand Down
243 changes: 243 additions & 0 deletions db/consolidation_request_txs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package db

import (
"fmt"
"strings"

"github.com/ethpandaops/dora/dbtypes"
"github.com/jmoiron/sqlx"
)

func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequestTx, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: "INSERT INTO consolidation_request_txs ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_request_txs ",
}),
"(block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, source_index, target_pubkey, target_index, tx_hash, tx_sender, tx_target, dequeue_block)",
" VALUES ",
)
argIdx := 0
fieldCount := 14

args := make([]any, len(consolidationTxs)*fieldCount)
for i, consolidationTx := range consolidationTxs {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "(")
for f := 0; f < fieldCount; f++ {
if f > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "$%v", argIdx+f+1)

}
fmt.Fprintf(&sql, ")")

args[argIdx+0] = consolidationTx.BlockNumber
args[argIdx+1] = consolidationTx.BlockIndex
args[argIdx+2] = consolidationTx.BlockTime
args[argIdx+3] = consolidationTx.BlockRoot
args[argIdx+4] = consolidationTx.ForkId
args[argIdx+5] = consolidationTx.SourceAddress
args[argIdx+6] = consolidationTx.SourcePubkey
args[argIdx+7] = consolidationTx.SourceIndex
args[argIdx+8] = consolidationTx.TargetPubkey
args[argIdx+9] = consolidationTx.TargetIndex
args[argIdx+10] = consolidationTx.TxHash
args[argIdx+11] = consolidationTx.TxSender
args[argIdx+12] = consolidationTx.TxTarget
args[argIdx+13] = consolidationTx.DequeueBlock
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (block_number, block_index) DO UPDATE SET source_index = excluded.source_index, target_index = excluded.target_index, fork_id = excluded.fork_id",
dbtypes.DBEngineSqlite: "",
}))

_, err := tx.Exec(sql.String(), args...)
if err != nil {
return err
}
return nil
}

func GetConsolidationRequestTxsByDequeueRange(dequeueFirst uint64, dequeueLast uint64) []*dbtypes.ConsolidationRequestTx {
consolidationTxs := []*dbtypes.ConsolidationRequestTx{}

err := ReaderDb.Select(&consolidationTxs, `SELECT consolidation_request_txs.*
FROM consolidation_request_txs
WHERE dequeue_block >= $1 AND dequeue_block <= $2
ORDER BY dequeue_block ASC, block_number ASC, block_index ASC
`, dequeueFirst, dequeueLast)
if err != nil {
logger.Errorf("Error while fetching consolidation request txs: %v", err)
return nil
}

return consolidationTxs
}

func GetConsolidationRequestTxsByTxHashes(txHashes [][]byte) []*dbtypes.ConsolidationRequestTx {
var sql strings.Builder
args := []interface{}{}

fmt.Fprint(&sql, `SELECT consolidation_request_txs.*
FROM consolidation_request_txs
WHERE tx_hash IN (
`)

for idx, txHash := range txHashes {
if idx > 0 {
fmt.Fprintf(&sql, ", ")
}
args = append(args, txHash)
fmt.Fprintf(&sql, "$%v", len(args))
}
fmt.Fprintf(&sql, ")")

consolidationTxs := []*dbtypes.ConsolidationRequestTx{}
err := ReaderDb.Select(&consolidationTxs, sql.String(), args...)
if err != nil {
logger.Errorf("Error while fetching consolidation request txs: %v", err)
return nil
}

return consolidationTxs
}

func GetConsolidationRequestTxsFiltered(offset uint64, limit uint32, canonicalForkIds []uint64, filter *dbtypes.ConsolidationRequestTxFilter) ([]*dbtypes.ConsolidationRequestTx, uint64, error) {
var sql strings.Builder
args := []interface{}{}
fmt.Fprint(&sql, `
WITH cte AS (
SELECT
block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, source_index, target_pubkey, target_index, tx_hash, tx_sender, tx_target, dequeue_block
FROM consolidation_request_txs
`)

if filter.SrcValidatorName != "" {
fmt.Fprint(&sql, `
LEFT JOIN validator_names AS source_names ON source_names."index" = consolidation_request_txs.source_index
`)
}
if filter.TgtValidatorName != "" {
fmt.Fprint(&sql, `
LEFT JOIN validator_names AS target_names ON target_names."index" = consolidation_request_txs.target_index
`)
}

filterOp := "WHERE"
if filter.MinDequeue > 0 {
args = append(args, filter.MinDequeue)
fmt.Fprintf(&sql, " %v dequeue_block >= $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.MaxDequeue > 0 {
args = append(args, filter.MaxDequeue)
fmt.Fprintf(&sql, " %v dequeue_block <= $%v", filterOp, len(args))
filterOp = "AND"
}
if len(filter.SourceAddress) > 0 {
args = append(args, filter.SourceAddress)
fmt.Fprintf(&sql, " %v source_address = $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.MinSrcIndex > 0 {
args = append(args, filter.MinSrcIndex)
fmt.Fprintf(&sql, " %v source_index >= $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.MaxSrcIndex > 0 {
args = append(args, filter.MaxSrcIndex)
fmt.Fprintf(&sql, " %v source_index <= $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.MinTgtIndex > 0 {
args = append(args, filter.MinTgtIndex)
fmt.Fprintf(&sql, " %v target_index >= $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.MaxTgtIndex > 0 {
args = append(args, filter.MaxTgtIndex)
fmt.Fprintf(&sql, " %v target_index <= $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.SrcValidatorName != "" {
args = append(args, "%"+filter.SrcValidatorName+"%")
fmt.Fprintf(&sql, " %v ", filterOp)
fmt.Fprintf(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: ` source_names.name ilike $%v `,
dbtypes.DBEngineSqlite: ` source_names.name LIKE $%v `,
}), len(args))
filterOp = "AND"
}
if filter.TgtValidatorName != "" {
args = append(args, "%"+filter.TgtValidatorName+"%")
fmt.Fprintf(&sql, " %v ", filterOp)
fmt.Fprintf(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: ` target_names.name ilike $%v `,
dbtypes.DBEngineSqlite: ` target_names.name LIKE $%v `,
}), len(args))
filterOp = "AND"
}

if filter.WithOrphaned != 1 {
forkIdStr := make([]string, len(canonicalForkIds))
for i, forkId := range canonicalForkIds {
forkIdStr[i] = fmt.Sprintf("%v", forkId)
}
if len(forkIdStr) == 0 {
forkIdStr = append(forkIdStr, "0")
}

if filter.WithOrphaned == 0 {
fmt.Fprintf(&sql, " %v fork_id IN (%v)", filterOp, strings.Join(forkIdStr, ","))
filterOp = "AND"
} else if filter.WithOrphaned == 2 {
fmt.Fprintf(&sql, " %v fork_id NOT IN (%v)", filterOp, strings.Join(forkIdStr, ","))
filterOp = "AND"
}
}

args = append(args, limit)
fmt.Fprintf(&sql, `)
SELECT
count(*) AS block_number,
0 AS block_index,
0 AS block_time,
null AS block_root,
0 AS fork_id,
null AS source_address,
0 AS source_index,
null AS source_pubkey,
0 AS target_index,
null AS target_pubkey,
null AS tx_hash,
null AS tx_sender,
null AS tx_target,
0 AS dequeue_block
FROM cte
UNION ALL SELECT * FROM (
SELECT * FROM cte
ORDER BY block_time DESC
LIMIT $%v
`, len(args))

if offset > 0 {
args = append(args, offset)
fmt.Fprintf(&sql, " OFFSET $%v ", len(args))
}
fmt.Fprintf(&sql, ") AS t1")

consolidationRequestTxs := []*dbtypes.ConsolidationRequestTx{}
err := ReaderDb.Select(&consolidationRequestTxs, sql.String(), args...)
if err != nil {
logger.Errorf("Error while fetching filtered consolidation request txs: %v", err)
return nil, 0, err
}

return consolidationRequestTxs[1:], consolidationRequestTxs[0].BlockNumber, nil
}
Loading
Loading