Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func NewSyncComponents(
metrics *Metrics,
blockOpts BlockOptions,
) (*Components, error) {
logger.Info().Msg("Starting in sync-mode")
cacheManager, err := cache.NewManager(config, store, logger)
if err != nil {
return nil, fmt.Errorf("failed to create cache manager: %w", err)
Expand Down Expand Up @@ -200,6 +201,7 @@ func NewAggregatorComponents(
metrics *Metrics,
blockOpts BlockOptions,
) (*Components, error) {
logger.Info().Msg("Starting in aggregator-mode")
cacheManager, err := cache.NewManager(config, store, logger)
if err != nil {
return nil, fmt.Errorf("failed to create cache manager: %w", err)
Expand Down
30 changes: 1 addition & 29 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,35 +595,7 @@ func (e *Executor) validateBlock(lastState types.State, header *types.SignedHead
return fmt.Errorf("invalid header: %w", err)
}

// Validate header against data
if err := types.Validate(header, data); err != nil {
return fmt.Errorf("header-data validation failed: %w", err)
}

// Check chain ID
if header.ChainID() != lastState.ChainID {
return fmt.Errorf("chain ID mismatch: expected %s, got %s",
lastState.ChainID, header.ChainID())
}

// Check height
expectedHeight := lastState.LastBlockHeight + 1
if header.Height() != expectedHeight {
return fmt.Errorf("invalid height: expected %d, got %d",
expectedHeight, header.Height())
}

// Check timestamp
if header.Height() > 1 && lastState.LastBlockTime.After(header.Time()) {
return fmt.Errorf("block time must be strictly increasing")
}

// Check app hash
if !bytes.Equal(header.AppHash, lastState.AppHash) {
return fmt.Errorf("app hash mismatch")
}

return nil
return lastState.AssertValidForNextState(header, data)
}

// sendCriticalError sends a critical error to the error channel without blocking
Expand Down
2 changes: 1 addition & 1 deletion block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func newHeaderAndData(chainID string, height uint64, nonEmpty bool) (*types.Sign
h := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{ChainID: chainID, Height: height, Time: uint64(now.UnixNano())}, ProposerAddress: []byte{1}}}
d := &types.Data{Metadata: &types.Metadata{ChainID: chainID, Height: height, Time: uint64(now.UnixNano())}}
if nonEmpty {
d.Txs = types.Txs{types.Tx(fmt.Sprintf("any-unique-tx-%d", now.UnixNano()))}
d.Txs = types.Txs{types.Tx(fmt.Sprintf("any-unique-tx-%s-%d-%d", chainID, height, now.UnixNano()))}
}
return h, d
}
Expand Down
16 changes: 8 additions & 8 deletions block/internal/syncing/da_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) {
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data, nil)

events := r.processBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77)
require.Len(t, events, 1)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) {
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

// Header with no data hash present should trigger empty data creation (per current logic)
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil)
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil, nil)

events := r.processBlobs(context.Background(), [][]byte{hb}, 88)
require.Len(t, events, 1)
Expand All @@ -223,7 +223,7 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) {
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}
r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop())

hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil)
hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil, nil)
gotH := r.tryDecodeHeader(hb, 123)
require.NotNil(t, gotH)
assert.Equal(t, sh.Hash().String(), gotH.Hash().String())
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) {

// Prepare header/data blobs
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 9, addr, pub, signer, 1)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, &data.Data)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 9, addr, pub, signer, nil, &data.Data, nil)

cfg := config.DefaultConfig()
cfg.DA.Namespace = "nsHdr"
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {

// Create header and data for the same block height but from different DA heights
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data, nil)

// Process header from DA height 100 first
events1 := r.processBlobs(context.Background(), [][]byte{hdrBin}, 100)
Expand Down Expand Up @@ -361,9 +361,9 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
data4Bin, data4 := makeSignedDataBytes(t, gen.ChainID, 4, addr, pub, signer, 2)
data5Bin, data5 := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 1)

hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, &data3.Data)
hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, &data4.Data)
hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data)
hdr3Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, &data3.Data, nil)
hdr4Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 4, addr, pub, signer, nil, &data4.Data, nil)
hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data, nil)

// Process multiple headers from DA height 200 - should be stored as pending
events1 := r.processBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200)
Expand Down
37 changes: 21 additions & 16 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (s *Syncer) GetLastState() types.State {

stateCopy := *state
stateCopy.AppHash = bytes.Clone(state.AppHash)
stateCopy.LastHeaderHash = bytes.Clone(state.LastHeaderHash)

return stateCopy
}
Expand Down Expand Up @@ -379,7 +380,14 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
if err := s.trySyncNextBlock(event); err != nil {
s.logger.Error().Err(err).Msg("failed to sync next block")
// If the error is not due to an validation error, re-store the event as pending
if !errors.Is(err, errInvalidBlock) {
switch {
case errors.Is(err, errInvalidBlock):
// do not reschedule
case errors.Is(err, errInvalidState):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

s.logger.Fatal().Uint64("block_height", event.Header.Height()).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fatal send an os.Exit(1), which will shortcut all cleanup logic we have

Uint64("state_height", s.GetLastState().LastBlockHeight).Err(err).
Msg("Invalid state detected - block references do not match local state. Manual intervention required.")
default:
s.cache.SetPendingEvent(height, event)
}
return
Expand All @@ -396,8 +404,12 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
}
}

// errInvalidBlock is returned when a block is failing validation
var errInvalidBlock = errors.New("invalid block")
var (
// errInvalidBlock is returned when a block is failing validation
errInvalidBlock = errors.New("invalid block")
// errInvalidState is returned when the state has diverged from the DA blocks
errInvalidState = errors.New("invalid state")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error isn't used anymore

)

// trySyncNextBlock attempts to sync the next available block
// the event is always the next block in sequence as processHeightEvent ensures it.
Expand All @@ -418,10 +430,12 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
// Compared to the executor logic where the current block needs to be applied first,
// here only the previous block needs to be applied to proceed to the verification.
// The header validation must be done before applying the block to avoid executing gibberish
if err := s.validateBlock(header, data); err != nil {
if err := s.validateBlock(currentState, data, header); err != nil {
// remove header as da included (not per se needed, but keep cache clean)
s.cache.RemoveHeaderDAIncluded(header.Hash().String())
return errors.Join(errInvalidBlock, fmt.Errorf("failed to validate block: %w", err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just keep this

if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) {
return errors.Join(errInvalidBlock, err)
}
}

// Apply block
Expand Down Expand Up @@ -527,24 +541,15 @@ func (s *Syncer) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, heade
// NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct
// or if the data was gibberish and somehow passed all validation prior but the header was correct
// we are still losing both in the pending event. This should never happen.
func (s *Syncer) validateBlock(
header *types.SignedHeader,
data *types.Data,
) error {
func (s *Syncer) validateBlock(currState types.State, data *types.Data, header *types.SignedHeader) error {
// Set custom verifier for aggregator node signature
header.SetCustomVerifierForSyncNode(s.options.SyncNodeSignatureBytesProvider)

// Validate header with data
if err := header.ValidateBasicWithData(data); err != nil {
return fmt.Errorf("invalid header: %w", err)
}

// Validate header against data
if err := types.Validate(header, data); err != nil {
return fmt.Errorf("header-data validation failed: %w", err)
}

return nil
return currState.AssertValidForNextState(header, data)
}

// sendCriticalError sends a critical error to the error channel without blocking
Expand Down
2 changes: 1 addition & 1 deletion block/internal/syncing/syncer_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) {
Return(nil, errors.New("temporary failure")).Once()

// Second call - success (should reset backoff and increment DA height)
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil)
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil)
data := &types.Data{
Metadata: &types.Metadata{
ChainID: gen.ChainID,
Expand Down
2 changes: 1 addition & 1 deletion block/internal/syncing/syncer_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay
heightEvents := make([]common.DAHeightEvent, totalHeights)
for i := uint64(0); i < totalHeights; i++ {
blockHeight, daHeight := i+gen.InitialHeight, i+daHeightOffset
_, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil)
_, sh := makeSignedHeaderBytes(b, gen.ChainID, blockHeight, addr, pub, signer, nil, nil, nil)
d := &types.Data{Metadata: &types.Metadata{ChainID: gen.ChainID, Height: blockHeight, Time: uint64(time.Now().UnixNano())}}
heightEvents[i] = common.DAHeightEvent{Header: sh, Data: d, DaHeight: daHeight}
}
Expand Down
45 changes: 30 additions & 15 deletions block/internal/syncing/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncing
import (
"context"
crand "crypto/rand"
"crypto/sha512"
"errors"
"testing"
"time"
Expand Down Expand Up @@ -44,7 +45,17 @@ func buildSyncTestSigner(tb testing.TB) (addr []byte, pub crypto.PubKey, signer
}

// makeSignedHeaderBytes builds a valid SignedHeader and returns its binary encoding and the object
func makeSignedHeaderBytes(tb testing.TB, chainID string, height uint64, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer, appHash []byte, data *types.Data) ([]byte, *types.SignedHeader) {
func makeSignedHeaderBytes(
tb testing.TB,
chainID string,
height uint64,
proposer []byte,
pub crypto.PubKey,
signer signerpkg.Signer,
appHash []byte,
data *types.Data,
lastHeaderHash []byte,
) ([]byte, *types.SignedHeader) {
time := uint64(time.Now().UnixNano())
dataHash := common.DataHashForEmptyTxs
if data != nil {
Expand All @@ -58,6 +69,7 @@ func makeSignedHeaderBytes(tb testing.TB, chainID string, height uint64, propose
AppHash: appHash,
DataHash: dataHash,
ProposerAddress: proposer,
LastHeaderHash: lastHeaderHash,
},
Signer: types.Signer{PubKey: pub, Address: proposer},
}
Expand Down Expand Up @@ -97,7 +109,6 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) {

cfg := config.DefaultConfig()
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr}

mockExec := testmocks.NewMockExecutor(t)

s := NewSyncer(
Expand All @@ -114,24 +125,24 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) {
common.DefaultBlockOptions(),
make(chan error, 1),
)

require.NoError(t, s.initializeState())
// Create header and data with correct hash
data := makeData(gen.ChainID, 1, 2) // non-empty
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data)
_, header := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, data, nil)

err = s.validateBlock(header, data)
err = s.validateBlock(s.GetLastState(), data, header)
require.NoError(t, err)

// Create header and data with mismatched hash
data = makeData(gen.ChainID, 1, 2) // non-empty
_, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil)
err = s.validateBlock(header, data)
_, header = makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, nil, nil, nil)
err = s.validateBlock(s.GetLastState(), data, header)
require.Error(t, err)

// Create header and empty data
data = makeData(gen.ChainID, 1, 0) // empty
_, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil)
err = s.validateBlock(header, data)
_, header = makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, nil, nil)
err = s.validateBlock(s.GetLastState(), data, header)
require.Error(t, err)
}

Expand Down Expand Up @@ -169,7 +180,7 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) {
// Create signed header & data for height 1
lastState := s.GetLastState()
data := makeData(gen.ChainID, 1, 0)
_, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data)
_, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil)

// Expect ExecuteTxs call for height 1
mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash).
Expand Down Expand Up @@ -218,7 +229,7 @@ func TestSequentialBlockSync(t *testing.T) {
// Sync two consecutive blocks via processHeightEvent so ExecuteTxs is called and state stored
st0 := s.GetLastState()
data1 := makeData(gen.ChainID, 1, 1) // non-empty
_, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, st0.AppHash, data1)
_, hdr1 := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, st0.AppHash, data1, st0.LastHeaderHash)
// Expect ExecuteTxs call for height 1
mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, st0.AppHash).
Return([]byte("app1"), uint64(1024), nil).Once()
Expand All @@ -227,7 +238,7 @@ func TestSequentialBlockSync(t *testing.T) {

st1, _ := st.GetState(context.Background())
data2 := makeData(gen.ChainID, 2, 0) // empty data
_, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, st1.AppHash, data2)
_, hdr2 := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, st1.AppHash, data2, st1.LastHeaderHash)
// Expect ExecuteTxs call for height 2
mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(2), mock.Anything, st1.AppHash).
Return([]byte("app2"), uint64(1024), nil).Once()
Expand Down Expand Up @@ -365,22 +376,27 @@ func TestSyncLoopPersistState(t *testing.T) {
syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock

// with n da blobs fetched
var prevHeaderHash, prevAppHash []byte
for i := range myFutureDAHeight - myDAHeightOffset {
chainHeight, daHeight := i, i+myDAHeightOffset
chainHeight, daHeight := i+1, i+myDAHeightOffset
emptyData := &types.Data{
Metadata: &types.Metadata{
ChainID: gen.ChainID,
Height: chainHeight,
Time: uint64(time.Now().Add(time.Duration(chainHeight) * time.Second).UnixNano()),
},
}
_, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, nil, emptyData)
_, sigHeader := makeSignedHeaderBytes(t, gen.ChainID, chainHeight, addr, pub, signer, prevAppHash, emptyData, prevHeaderHash)
evts := []common.DAHeightEvent{{
Header: sigHeader,
Data: emptyData,
DaHeight: daHeight,
}}
daRtrMock.On("RetrieveFromDA", mock.Anything, daHeight).Return(evts, nil)
prevHeaderHash = sigHeader.Hash()
hasher := sha512.New()
hasher.Write(prevAppHash)
prevAppHash = hasher.Sum(nil)
}

// stop at next height
Expand All @@ -395,7 +411,6 @@ func TestSyncLoopPersistState(t *testing.T) {
Return(nil, coreda.ErrHeightFromFuture)

go syncerInst1.processLoop()

// dssync from DA until stop height reached
syncerInst1.syncLoop()
t.Log("syncLoop on instance1 completed")
Expand Down
1 change: 1 addition & 0 deletions proto/evnode/v1/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ message State {
google.protobuf.Timestamp last_block_time = 5;
uint64 da_height = 6;
bytes app_hash = 8;
bytes LastHeaderHash = 9;

reserved 7;
}
1 change: 1 addition & 0 deletions test/e2e/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/testnet/
1 change: 1 addition & 0 deletions test/e2e/evm_full_node_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ func restartSequencerAndFullNode(t *testing.T, sut *SystemUnderTest, sequencerHo
// Now restart the full node (without init - node already exists)
sut.ExecCmd(evmSingleBinaryPath,
"start",
"--evnode.log.format", "json",
"--home", fullNodeHome,
"--evm.jwt-secret", fullNodeJwtSecret,
"--evm.genesis-hash", genesisHash,
Expand Down
1 change: 1 addition & 0 deletions test/e2e/evm_test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ func restartDAAndSequencer(t *testing.T, sut *SystemUnderTest, sequencerHome, jw
// Then restart the sequencer node (without init - node already exists)
sut.ExecCmd(evmSingleBinaryPath,
"start",
"--evnode.log.format", "json",
"--evm.jwt-secret", jwtSecret,
"--evm.genesis-hash", genesisHash,
"--rollkit.node.block_time", DefaultBlockTime,
Expand Down
Loading
Loading