Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rollup sync service): use CalldataBlobSource to retrieve data from L1 #1103

Open
wants to merge 21 commits into
base: omerfirmak/mpt
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,15 +1629,15 @@ func setEnableRollupVerify(ctx *cli.Context, cfg *ethconfig.Config) {
func setDA(ctx *cli.Context, cfg *ethconfig.Config) {
if ctx.IsSet(DASyncEnabledFlag.Name) {
cfg.EnableDASyncing = ctx.Bool(DASyncEnabledFlag.Name)
if ctx.IsSet(DABlobScanAPIEndpointFlag.Name) {
cfg.DA.BlobScanAPIEndpoint = ctx.String(DABlobScanAPIEndpointFlag.Name)
}
if ctx.IsSet(DABlockNativeAPIEndpointFlag.Name) {
cfg.DA.BlockNativeAPIEndpoint = ctx.String(DABlockNativeAPIEndpointFlag.Name)
}
if ctx.IsSet(DABeaconNodeAPIEndpointFlag.Name) {
cfg.DA.BeaconNodeAPIEndpoint = ctx.String(DABeaconNodeAPIEndpointFlag.Name)
}
}
if ctx.IsSet(DABlobScanAPIEndpointFlag.Name) {
cfg.DA.BlobScanAPIEndpoint = ctx.String(DABlobScanAPIEndpointFlag.Name)
}
if ctx.IsSet(DABlockNativeAPIEndpointFlag.Name) {
cfg.DA.BlockNativeAPIEndpoint = ctx.String(DABlockNativeAPIEndpointFlag.Name)
}
if ctx.IsSet(DABeaconNodeAPIEndpointFlag.Name) {
cfg.DA.BeaconNodeAPIEndpoint = ctx.String(DABeaconNodeAPIEndpointFlag.Name)
}
}

Expand Down
90 changes: 90 additions & 0 deletions common/heapmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package common

type HeapMap[K comparable, T Comparable[T]] struct {
h *Heap[T]
m *ShrinkingMap[K, *HeapElement[T]]
keyFromElement func(T) K
}

func NewHeapMap[K comparable, T Comparable[T]](keyFromElement func(T) K) *HeapMap[K, T] {
return &HeapMap[K, T]{
h: NewHeap[T](),
m: NewShrinkingMap[K, *HeapElement[T]](1000),
keyFromElement: keyFromElement,
}
}

func (hm *HeapMap[K, T]) Len() int {
return hm.h.Len()
}

func (hm *HeapMap[K, T]) Push(element T) bool {
k := hm.keyFromElement(element)

if hm.m.Has(k) {
return false
}

heapElement := hm.h.Push(element)
hm.m.Set(k, heapElement)

return true
}

func (hm *HeapMap[K, T]) Pop() T {
element := hm.h.Pop()
k := hm.keyFromElement(element.Value())
hm.m.Delete(k)

return element.Value()
}

func (hm *HeapMap[K, T]) Peek() T {
return hm.h.Peek().Value()
}

func (hm *HeapMap[K, T]) RemoveByElement(element T) bool {
key := hm.keyFromElement(element)
heapElement, exists := hm.m.Get(key)
if !exists {
return false
}

hm.h.Remove(heapElement)
hm.m.Delete(key)

return true
}

func (hm *HeapMap[K, T]) RemoveByKey(key K) bool {
heapElement, exists := hm.m.Get(key)
if !exists {
return false
}

hm.h.Remove(heapElement)
hm.m.Delete(key)

return true
}

func (hm *HeapMap[K, T]) Clear() {
hm.h.Clear()
hm.m = NewShrinkingMap[K, *HeapElement[T]](1000)
}

func (hm *HeapMap[K, T]) Keys() []K {
return hm.m.Keys()
}

func (hm *HeapMap[K, T]) Elements() []T {
var elements []T
for _, element := range hm.m.Values() {
elements = append(elements, element.Value())
}
return elements
}

func (hm *HeapMap[K, T]) Has(element T) bool {
return hm.m.Has(hm.keyFromElement(element))
}
16 changes: 16 additions & 0 deletions common/shrinkingmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ func (s *ShrinkingMap[K, V]) Delete(key K) (deleted bool) {
return true
}

func (s *ShrinkingMap[K, V]) Keys() []K {
var keys []K
for k := range s.m {
keys = append(keys, k)
}
return keys
}

func (s *ShrinkingMap[K, V]) Values() []V {
var values []V
for _, v := range s.m {
values = append(values, v)
}
return values
}

func (s *ShrinkingMap[K, V]) Size() (size int) {
return len(s.m)
}
Expand Down
5 changes: 3 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/scroll-tech/go-ethereum/rlp"
"github.com/scroll-tech/go-ethereum/rollup/ccc"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer"
"github.com/scroll-tech/go-ethereum/rollup/l1"
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
"github.com/scroll-tech/go-ethereum/rollup/sync_service"
"github.com/scroll-tech/go-ethereum/rpc"
Expand Down Expand Up @@ -109,7 +110,7 @@ type Ethereum struct {

// New creates a new Ethereum object (including the
// initialisation of the common Ethereum object)
func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthClient) (*Ethereum, error) {
func New(stack *node.Node, config *ethconfig.Config, l1Client l1.Client) (*Ethereum, error) {
// Ensure configuration values are compatible and sane
if config.SyncMode == downloader.LightSync {
return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
Expand Down Expand Up @@ -244,7 +245,7 @@ func New(stack *node.Node, config *ethconfig.Config, l1Client sync_service.EthCl

if config.EnableRollupVerify {
// initialize and start rollup event sync service
eth.rollupSyncService, err = rollup_sync_service.NewRollupSyncService(context.Background(), chainConfig, eth.chainDb, l1Client, eth.blockchain, stack)
eth.rollupSyncService, err = rollup_sync_service.NewRollupSyncService(context.Background(), chainConfig, eth.chainDb, l1Client, eth.blockchain, stack, config.DA)
if err != nil {
return nil, fmt.Errorf("cannot initialize rollup event sync service: %w", err)
}
Expand Down
16 changes: 4 additions & 12 deletions rollup/da_syncer/blob_client/beacon_node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ import (

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
"github.com/scroll-tech/go-ethereum/rollup/rollup_sync_service"
)

type BeaconNodeClient struct {
apiEndpoint string
l1Client *rollup_sync_service.L1Client
genesisTime uint64
secondsPerSlot uint64
}
Expand All @@ -28,7 +26,7 @@ var (
beaconNodeBlobEndpoint = "/eth/v1/beacon/blob_sidecars"
)

func NewBeaconNodeClient(apiEndpoint string, l1Client *rollup_sync_service.L1Client) (*BeaconNodeClient, error) {
func NewBeaconNodeClient(apiEndpoint string) (*BeaconNodeClient, error) {
// get genesis time
genesisPath, err := url.JoinPath(apiEndpoint, beaconNodeGenesisEndpoint)
if err != nil {
Expand Down Expand Up @@ -94,19 +92,13 @@ func NewBeaconNodeClient(apiEndpoint string, l1Client *rollup_sync_service.L1Cli

return &BeaconNodeClient{
apiEndpoint: apiEndpoint,
l1Client: l1Client,
genesisTime: genesisTime,
secondsPerSlot: secondsPerSlot,
}, nil
}

func (c *BeaconNodeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
// get block timestamp to calculate slot
header, err := c.l1Client.GetHeaderByNumber(blockNumber)
if err != nil {
return nil, fmt.Errorf("failed to get header by number, err: %w", err)
}
slot := (header.Time - c.genesisTime) / c.secondsPerSlot
func (c *BeaconNodeClient) GetBlobByVersionedHashAndBlockTime(ctx context.Context, versionedHash common.Hash, blockTime uint64) (*kzg4844.Blob, error) {
slot := (blockTime - c.genesisTime) / c.secondsPerSlot

// get blob sidecar for slot
blobSidecarPath, err := url.JoinPath(c.apiEndpoint, beaconNodeBlobEndpoint, fmt.Sprintf("%d", slot))
Expand Down Expand Up @@ -156,7 +148,7 @@ func (c *BeaconNodeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Cont
}
}

return nil, fmt.Errorf("missing blob %v in slot %d, block number %d", versionedHash, slot, blockNumber)
return nil, fmt.Errorf("missing blob %v in slot %d", versionedHash, slot)
}

type GenesisResp struct {
Expand Down
6 changes: 3 additions & 3 deletions rollup/da_syncer/blob_client/blob_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
)

type BlobClient interface {
GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error)
GetBlobByVersionedHashAndBlockTime(ctx context.Context, versionedHash common.Hash, blockTime uint64) (*kzg4844.Blob, error)
}

type BlobClients struct {
Expand All @@ -32,13 +32,13 @@ func NewBlobClients(blobClients ...BlobClient) *BlobClients {
}
}

func (c *BlobClients) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
func (c *BlobClients) GetBlobByVersionedHashAndBlockTime(ctx context.Context, versionedHash common.Hash, blockTime uint64) (*kzg4844.Blob, error) {
if len(c.list) == 0 {
return nil, fmt.Errorf("BlobClients.GetBlobByVersionedHash: list of BlobClients is empty")
}

for i := 0; i < len(c.list); i++ {
blob, err := c.list[c.curPos].GetBlobByVersionedHashAndBlockNumber(ctx, versionedHash, blockNumber)
blob, err := c.list[c.curPos].GetBlobByVersionedHashAndBlockTime(ctx, versionedHash, blockTime)
if err == nil {
return blob, nil
}
Expand Down
2 changes: 1 addition & 1 deletion rollup/da_syncer/blob_client/blob_scan_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewBlobScanClient(apiEndpoint string) *BlobScanClient {
}
}

func (c *BlobScanClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
func (c *BlobScanClient) GetBlobByVersionedHashAndBlockTime(ctx context.Context, versionedHash common.Hash, blockTime uint64) (*kzg4844.Blob, error) {
// blobscan api docs https://api.blobscan.com/#/blobs/blob-getByBlobId
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion rollup/da_syncer/blob_client/block_native_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewBlockNativeClient(apiEndpoint string) *BlockNativeClient {
}
}

func (c *BlockNativeClient) GetBlobByVersionedHashAndBlockNumber(ctx context.Context, versionedHash common.Hash, blockNumber uint64) (*kzg4844.Blob, error) {
func (c *BlockNativeClient) GetBlobByVersionedHashAndBlockTime(ctx context.Context, versionedHash common.Hash, blockTime uint64) (*kzg4844.Blob, error) {
// blocknative api docs https://docs.blocknative.com/blocknative-data-archive/blob-archive
path, err := url.JoinPath(c.apiEndpoint, versionedHash.String())
if err != nil {
Expand Down
Loading
Loading