diff --git a/op-node/cmd/batch_decoder/README.md b/op-node/cmd/batch_decoder/README.md index 38f97a2d4f3e6..522725359a784 100644 --- a/op-node/cmd/batch_decoder/README.md +++ b/op-node/cmd/batch_decoder/README.md @@ -63,7 +63,6 @@ jq '.batches|del(.[]|.Transactions)' $CHANNEL_FILE ## Roadmap -- Parallel transaction fetching (CLI-3563) - Pull the batches out of channels & store that information inside the ChannelWithMetadata (CLI-3565) - Transaction Bytes used - Total uncompressed (different from tx bytes) + compressed bytes diff --git a/op-node/cmd/batch_decoder/fetch/fetch.go b/op-node/cmd/batch_decoder/fetch/fetch.go index 188db7f2a695e..96ec64b4d598f 100644 --- a/op-node/cmd/batch_decoder/fetch/fetch.go +++ b/op-node/cmd/batch_decoder/fetch/fetch.go @@ -8,6 +8,8 @@ import ( "math/big" "os" "path" + "sync" + "sync/atomic" "time" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -37,31 +39,45 @@ type Config struct { BatchInbox common.Address BatchSenders map[common.Address]struct{} OutDirectory string + Parallel uint64 } // Batches fetches & stores all transactions sent to the batch inbox address in // the given block range (inclusive to exclusive). // The transactions & metadata are written to the out directory. -func Batches(client *ethclient.Client, config Config) (totalValid, totalInvalid int) { +func Batches(client *ethclient.Client, config Config) (totalValid, totalInvalid uint64) { if err := os.MkdirAll(config.OutDirectory, 0750); err != nil { log.Fatal(err) } - number := new(big.Int).SetUint64(config.Start) signer := types.LatestSignerForChainID(config.ChainID) - for i := config.Start; i < config.End; i++ { - valid, invalid := fetchBatchesPerBlock(client, number, signer, config) - totalValid += valid - totalInvalid += invalid - number = number.Add(number, common.Big1) + parallel := config.Parallel + + var wg sync.WaitGroup + for i := config.Start; i < config.End; i += parallel { + end := i + parallel - 1 + if end >= config.End { + end = config.End - 1 + } + for j := i; j <= end; j++ { + wg.Add(1) + number := j + go func() { + defer wg.Done() + valid, invalid := fetchBatchesPerBlock(client, number, signer, config) + atomic.AddUint64(&totalValid, valid) + atomic.AddUint64(&totalInvalid, invalid) + }() + } + wg.Wait() } return } // fetchBatchesPerBlock gets a block & the parses all of the transactions in the block. -func fetchBatchesPerBlock(client *ethclient.Client, number *big.Int, signer types.Signer, config Config) (validBatchCount, invalidBatchCount int) { +func fetchBatchesPerBlock(client *ethclient.Client, number uint64, signer types.Signer, config Config) (validBatchCount, invalidBatchCount uint64) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - block, err := client.BlockByNumber(ctx, number) + block, err := client.BlockByNumber(ctx, new(big.Int).SetUint64(number)) if err != nil { log.Fatal(err) } diff --git a/op-node/cmd/batch_decoder/main.go b/op-node/cmd/batch_decoder/main.go index fd96deefc5ef9..b8673ea458113 100644 --- a/op-node/cmd/batch_decoder/main.go +++ b/op-node/cmd/batch_decoder/main.go @@ -55,6 +55,11 @@ func main() { Usage: "L1 RPC URL", EnvVars: []string{"L1_RPC"}, }, + &cli.IntFlag{ + Name: "parallel", + Value: 10, + Usage: "Concurrency level when fetching L1", + }, }, Action: func(cliCtx *cli.Context) error { client, err := ethclient.Dial(cliCtx.String("l1")) @@ -76,6 +81,7 @@ func main() { }, BatchInbox: common.HexToAddress(cliCtx.String("inbox")), OutDirectory: cliCtx.String("out"), + Parallel: uint64(cliCtx.Int("parallel")), } totalValid, totalInvalid := fetch.Batches(client, config) fmt.Printf("Fetched batches in range [%v,%v). Found %v valid & %v invalid batches\n", config.Start, config.End, totalValid, totalInvalid)