Skip to content

Commit

Permalink
op-node: batch_decoder: Parallel tx fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
pcw109550 committed Jul 26, 2023
1 parent 3891726 commit d73314f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 10 deletions.
1 change: 0 additions & 1 deletion op-node/cmd/batch_decoder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ jq '.batches|del(.[]|.Transactions)' $CHANNEL_FILE

## Roadmap

- Parallel transaction fetching (CLI-3563)
- Pull the batches out of channels & store that information inside the ChannelWithMetadata (CLI-3565)
- Transaction Bytes used
- Total uncompressed (different from tx bytes) + compressed bytes
Expand Down
34 changes: 25 additions & 9 deletions op-node/cmd/batch_decoder/fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"math/big"
"os"
"path"
"sync"
"sync/atomic"
"time"

"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
Expand Down Expand Up @@ -37,31 +39,45 @@ type Config struct {
BatchInbox common.Address
BatchSenders map[common.Address]struct{}
OutDirectory string
Parallel uint64
}

// Batches fetches & stores all transactions sent to the batch inbox address in
// the given block range (inclusive to exclusive).
// The transactions & metadata are written to the out directory.
func Batches(client *ethclient.Client, config Config) (totalValid, totalInvalid int) {
func Batches(client *ethclient.Client, config Config) (totalValid, totalInvalid uint64) {
if err := os.MkdirAll(config.OutDirectory, 0750); err != nil {
log.Fatal(err)
}
number := new(big.Int).SetUint64(config.Start)
signer := types.LatestSignerForChainID(config.ChainID)
for i := config.Start; i < config.End; i++ {
valid, invalid := fetchBatchesPerBlock(client, number, signer, config)
totalValid += valid
totalInvalid += invalid
number = number.Add(number, common.Big1)
parallel := config.Parallel

var wg sync.WaitGroup
for i := config.Start; i < config.End; i += parallel {
end := i + parallel - 1
if end >= config.End {
end = config.End - 1
}
for j := i; j <= end; j++ {
wg.Add(1)
number := j
go func() {
defer wg.Done()
valid, invalid := fetchBatchesPerBlock(client, number, signer, config)
atomic.AddUint64(&totalValid, valid)
atomic.AddUint64(&totalInvalid, invalid)
}()
}
wg.Wait()
}
return
}

// fetchBatchesPerBlock gets a block & the parses all of the transactions in the block.
func fetchBatchesPerBlock(client *ethclient.Client, number *big.Int, signer types.Signer, config Config) (validBatchCount, invalidBatchCount int) {
func fetchBatchesPerBlock(client *ethclient.Client, number uint64, signer types.Signer, config Config) (validBatchCount, invalidBatchCount uint64) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
block, err := client.BlockByNumber(ctx, number)
block, err := client.BlockByNumber(ctx, new(big.Int).SetUint64(number))
if err != nil {
log.Fatal(err)
}
Expand Down
6 changes: 6 additions & 0 deletions op-node/cmd/batch_decoder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func main() {
Usage: "L1 RPC URL",
EnvVars: []string{"L1_RPC"},
},
&cli.IntFlag{
Name: "parallel",
Value: 10,
Usage: "Concurrency level when fetching L1",
},
},
Action: func(cliCtx *cli.Context) error {
client, err := ethclient.Dial(cliCtx.String("l1"))
Expand All @@ -76,6 +81,7 @@ func main() {
},
BatchInbox: common.HexToAddress(cliCtx.String("inbox")),
OutDirectory: cliCtx.String("out"),
Parallel: uint64(cliCtx.Int("parallel")),
}
totalValid, totalInvalid := fetch.Batches(client, config)
fmt.Printf("Fetched batches in range [%v,%v). Found %v valid & %v invalid batches\n", config.Start, config.End, totalValid, totalInvalid)
Expand Down

0 comments on commit d73314f

Please sign in to comment.