Skip to content

Commit

Permalink
Cherry-pick 0xPolygonHermez#3240, 0xPolygonHermez#3137: Sequence batc…
Browse files Browse the repository at this point in the history
…h only when sanity check is done. Do fatal for some finalizer halt events. Several fixes. Optmize purge pool (0xPolygonHermez#3241)

* sequence batch only when sanity check is done. Do fatal for some halt events. Several fixes

* feat: optimize purge txn pool query (0xPolygonHermez#3137)

* feat: optimize purge txn pool query

* fix: add new method to state interface

* feat: generate new state mocks

* fix merge

* fix %w

---------

Co-authored-by: Idris Hanafi <ifh101@gmail.com>
  • Loading branch information
agnusmor and IdrisHanafi authored Feb 8, 2024
1 parent 8d135de commit 0e47e24
Show file tree
Hide file tree
Showing 23 changed files with 667 additions and 780 deletions.
7 changes: 7 additions & 0 deletions db/migrations/state/0016.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up
ALTER TABLE state.batch
ADD COLUMN IF NOT EXISTS checked BOOLEAN NOT NULL DEFAULT TRUE;

-- +migrate Down
ALTER TABLE state.batch
DROP COLUMN IF EXISTS checked;
64 changes: 64 additions & 0 deletions db/migrations/state/0016_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package migrations_test

import (
"database/sql"
"testing"

"github.com/stretchr/testify/assert"
)

type migrationTest0016 struct{}

func (m migrationTest0016) InsertData(db *sql.DB) error {
const insertBatch0 = `
INSERT INTO state.batch (batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num, wip)
VALUES (0,'0x0000', '0x0000', '0x0000', '0x0000', now(), '0x0000', null, null, true)`

// insert batch
_, err := db.Exec(insertBatch0)
if err != nil {
return err
}

return nil
}

func (m migrationTest0016) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) {
var result int

// Check column checked exists in state.batch table
const getCheckedColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='batch' and column_name='checked'`
row := db.QueryRow(getCheckedColumn)
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 1, result)

const insertBatch0 = `
INSERT INTO state.batch (batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num, wip, checked)
VALUES (1,'0x0001', '0x0001', '0x0001', '0x0001', now(), '0x0001', null, null, true, false)`

// insert batch 1
_, err := db.Exec(insertBatch0)
assert.NoError(t, err)

const insertBatch1 = `
INSERT INTO state.batch (batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num, wip, checked)
VALUES (2,'0x0002', '0x0002', '0x0002', '0x0002', now(), '0x0002', null, null, false, true)`

// insert batch 2
_, err = db.Exec(insertBatch1)
assert.NoError(t, err)
}

func (m migrationTest0016) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) {
var result int

// Check column wip doesn't exists in state.batch table
const getCheckedColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='batch' and column_name='checked'`
row := db.QueryRow(getCheckedColumn)
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 0, result)
}

func TestMigration0016(t *testing.T) {
runMigrationTest(t, 16, migrationTest0016{})
}
1 change: 1 addition & 0 deletions pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type storage interface {
MarkWIPTxsAsPending(ctx context.Context) error
GetAllAddressesBlocked(ctx context.Context) ([]common.Address, error)
MinL2GasPriceSince(ctx context.Context, timestamp time.Time) (uint64, error)
GetEarliestProcessedTx(ctx context.Context) (common.Hash, error)
}

type stateInterface interface {
Expand Down
20 changes: 20 additions & 0 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,3 +790,23 @@ func (p *PostgresPoolStorage) GetAllAddressesBlocked(ctx context.Context) ([]com

return addrs, nil
}

// GetEarliestProcessedTx gets the earliest processed tx from the pool. Mainly used for cleanup
func (p *PostgresPoolStorage) GetEarliestProcessedTx(ctx context.Context) (common.Hash, error) {
const getEarliestProcessedTxnFromTxnPool = `SELECT hash
FROM pool.transaction
WHERE
status = 'selected'
ORDER BY received_at ASC
LIMIT 1`

var txnHash string
err := p.db.QueryRow(ctx, getEarliestProcessedTxnFromTxnPool).Scan(&txnHash)
if errors.Is(err, pgx.ErrNoRows) {
return common.Hash{}, nil
} else if err != nil {
return common.Hash{}, err
}

return common.HexToHash(txnHash), nil
}
68 changes: 54 additions & 14 deletions sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,32 @@ func (w *Batch) isEmpty() bool {
return w.countOfL2Blocks == 0
}

// processBatchesPendingtoCheck performs a sanity check for batches closed but pending to be checked
func (f *finalizer) processBatchesPendingtoCheck(ctx context.Context) {
notCheckedBatches, err := f.stateIntf.GetNotCheckedBatches(ctx, nil)
if err != nil && err != state.ErrNotFound {
log.Fatalf("failed to get batches not checked, error: ", err)
}

if len(notCheckedBatches) == 0 {
return
}

log.Infof("executing sanity check for not checked batches")

prevBatchNumber := notCheckedBatches[0].BatchNumber - 1
prevBatch, err := f.stateIntf.GetBatchByNumber(ctx, prevBatchNumber, nil)
if err != nil {
log.Fatalf("failed to get batch %d, error: ", prevBatchNumber, err)
}
oldStateRoot := prevBatch.StateRoot

for _, notCheckedBatch := range notCheckedBatches {
_, _ = f.batchSanityCheck(ctx, notCheckedBatch.BatchNumber, oldStateRoot, notCheckedBatch.StateRoot)
oldStateRoot = notCheckedBatch.StateRoot
}
}

// setWIPBatch sets finalizer wip batch to the state batch passed as parameter
func (f *finalizer) setWIPBatch(ctx context.Context, wipStateBatch *state.Batch) (*Batch, error) {
// Retrieve prevStateBatch to init the initialStateRoot of the wip batch
Expand Down Expand Up @@ -89,7 +115,7 @@ func (f *finalizer) initWIPBatch(ctx context.Context) {
// Get the last batch in trusted state
lastStateBatch, err := f.stateIntf.GetBatchByNumber(ctx, lastBatchNum, nil)
if err != nil {
log.Fatalf("failed to get last batch, error: %v", err)
log.Fatalf("failed to get last batch %d, error: %v", lastBatchNum, err)
}

isClosed := !lastStateBatch.WIP
Expand All @@ -98,7 +124,7 @@ func (f *finalizer) initWIPBatch(ctx context.Context) {

if isClosed { //if the last batch is close then open a new wip batch
if lastStateBatch.BatchNumber+1 == f.cfg.HaltOnBatchNumber {
f.Halt(ctx, fmt.Errorf("finalizer reached stop sequencer on batch number: %d", f.cfg.HaltOnBatchNumber))
f.Halt(ctx, fmt.Errorf("finalizer reached stop sequencer on batch number: %d", f.cfg.HaltOnBatchNumber), false)
}

f.wipBatch, err = f.openNewWIPBatch(ctx, lastStateBatch.BatchNumber+1, lastStateBatch.StateRoot)
Expand Down Expand Up @@ -126,14 +152,14 @@ func (f *finalizer) finalizeWIPBatch(ctx context.Context, closeReason state.Clos
prevTimestamp := f.wipL2Block.timestamp
prevL1InfoTreeIndex := f.wipL2Block.l1InfoTreeExitRoot.L1InfoTreeIndex

// Close the wip L2 block if it has transactions, if not we keep it open to store it in the new wip batch
// Close the wip L2 block if it has transactions, otherwise we keep the wip L2 block to store it in the new wip batch
if !f.wipL2Block.isEmpty() {
f.closeWIPL2Block(ctx)
}

err := f.closeAndOpenNewWIPBatch(ctx, closeReason)
if err != nil {
f.Halt(ctx, fmt.Errorf("failed to create new WIP batch, error: %v", err))
f.Halt(ctx, fmt.Errorf("failed to create new WIP batch, error: %v", err), true)
}

// If we have closed the wipL2Block then we open a new one
Expand All @@ -142,8 +168,18 @@ func (f *finalizer) finalizeWIPBatch(ctx context.Context, closeReason state.Clos
}
}

// closeAndOpenNewWIPBatch closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new empty batch
// closeAndOpenNewWIPBatch closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new wip batch
func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context, closeReason state.ClosingReason) error {
f.nextForcedBatchesMux.Lock()
processForcedBatches := len(f.nextForcedBatches) > 0
f.nextForcedBatchesMux.Unlock()

// If we will process forced batches after we close the wip batch then we must close the current wip L2 block,
// since the processForcedBatches function needs to create new L2 blocks (cannot "reuse" the current wip L2 block if it's empty)
if processForcedBatches {
f.closeWIPL2Block(ctx)
}

// Wait until all L2 blocks are processed by the executor
startWait := time.Now()
f.pendingL2BlocksToProcessWG.Wait()
Expand All @@ -170,11 +206,7 @@ func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context, closeReason sta
// Reprocess full batch as sanity check
if f.cfg.SequentialBatchSanityCheck {
// Do the full batch reprocess now
_, err := f.batchSanityCheck(ctx, f.wipBatch.batchNumber, f.wipBatch.initialStateRoot, f.wipBatch.finalStateRoot)
if err != nil {
// There is an error reprocessing the batch. We halt the execution of the Sequencer at this point
return fmt.Errorf("halting sequencer because of error reprocessing full batch %d (sanity check), error: %v ", f.wipBatch.batchNumber, err)
}
_, _ = f.batchSanityCheck(ctx, f.wipBatch.batchNumber, f.wipBatch.initialStateRoot, f.wipBatch.finalStateRoot)
} else {
// Do the full batch reprocess in parallel
go func() {
Expand All @@ -183,15 +215,15 @@ func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context, closeReason sta
}

if f.wipBatch.batchNumber+1 == f.cfg.HaltOnBatchNumber {
f.Halt(ctx, fmt.Errorf("finalizer reached stop sequencer on batch number: %d", f.cfg.HaltOnBatchNumber))
f.Halt(ctx, fmt.Errorf("finalizer reached stop sequencer on batch number: %d", f.cfg.HaltOnBatchNumber), false)
}

// Metadata for the next batch
stateRoot := f.wipBatch.finalStateRoot
lastBatchNumber := f.wipBatch.batchNumber

// Process forced batches
if len(f.nextForcedBatches) > 0 {
if processForcedBatches {
lastBatchNumber, stateRoot = f.processForcedBatches(ctx, lastBatchNumber, stateRoot)
// We must init/reset the wip L2 block from the state since processForcedBatches can created new L2 blocks
f.initWIPL2Block(ctx)
Expand Down Expand Up @@ -315,12 +347,13 @@ func (f *finalizer) batchSanityCheck(ctx context.Context, batchNum uint64, initi
// Log batch detailed info
log.Errorf("batch %d sanity check error: initialStateRoot: %s, expectedNewStateRoot: %s", batch.BatchNumber, initialStateRoot, expectedNewStateRoot)
for i, rawL2block := range rawL2Blocks.Blocks {
log.Infof("block[%d], txs: %d, deltaTimestamp: %d, l1InfoTreeIndex: %d", i, len(rawL2block.Transactions), rawL2block.DeltaTimestamp, rawL2block.IndexL1InfoTree)
for j, rawTx := range rawL2block.Transactions {
log.Infof("batch %d, block position: %d, tx position: %d, tx hash: %s", batch.BatchNumber, i, j, rawTx.Tx.Hash())
log.Infof("block[%d].tx[%d]: %s, egpPct: %d", batch.BatchNumber, i, j, rawTx.Tx.Hash(), rawTx.EfficiencyPercentage)
}
}

f.Halt(ctx, fmt.Errorf("batch sanity check error. Check previous errors in logs to know which was the cause"))
f.Halt(ctx, fmt.Errorf("batch sanity check error. Check previous errors in logs to know which was the cause"), false)
}

log.Debugf("batch %d sanity check: initialStateRoot: %s, expectedNewStateRoot: %s", batchNum, initialStateRoot, expectedNewStateRoot)
Expand Down Expand Up @@ -404,6 +437,13 @@ func (f *finalizer) batchSanityCheck(ctx context.Context, batchNum uint64, initi
return nil, ErrStateRootNoMatch
}

err = f.stateIntf.UpdateBatchAsChecked(ctx, batch.BatchNumber, nil)
if err != nil {
log.Errorf("failed to update batch %d as checked, error: %v", batch.BatchNumber, err)
reprocessError(batch)
return nil, ErrUpdateBatchAsChecked
}

log.Infof("successful sanity check for batch %d, initialStateRoot: %s, stateRoot: %s, l2Blocks: %d, time: %v, used counters: %s",
batch.BatchNumber, initialStateRoot, batchResponse.NewStateRoot.String(), len(batchResponse.BlockResponses),
endProcessing.Sub(startProcessing), f.logZKCounters(batchResponse.UsedZkCounters))
Expand Down
2 changes: 2 additions & 0 deletions sequencer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ var (
ErrReplacedTransaction = errors.New("replaced transaction")
// ErrGetBatchByNumber happens when we get an error trying to get a batch by number (GetBatchByNumber)
ErrGetBatchByNumber = errors.New("get batch by number error")
// ErrUpdateBatchAsChecked happens when we get an error trying to update a batch as checked (UpdateBatchAsChecked)
ErrUpdateBatchAsChecked = errors.New("update batch as checked error")
// ErrDecodeBatchL2Data happens when we get an error trying to decode BatchL2Data (DecodeTxs)
ErrDecodeBatchL2Data = errors.New("decoding BatchL2Data error")
// ErrProcessBatch happens when we get an error trying to process (executor) a batch
Expand Down
20 changes: 13 additions & 7 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func (f *finalizer) Start(ctx context.Context) {
mockL1InfoRoot[i] = byte(i)
}

// Do sanity check for batches closed but pending to be checked
f.processBatchesPendingtoCheck(ctx)

// Update L1InfoRoot
go f.checkL1InfoTreeUpdate(ctx)

Expand Down Expand Up @@ -390,14 +393,13 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first

// If EffectiveGasPrice >= txGasPrice, we process the tx with tx.GasPrice
if tx.EffectiveGasPrice.Cmp(txGasPrice) >= 0 {
tx.EffectiveGasPrice.Set(txGasPrice)

loss := new(big.Int).Sub(tx.EffectiveGasPrice, txGasPrice)
// If loss > 0 the warning message indicating we loss fee for thix tx
if loss.Cmp(new(big.Int).SetUint64(0)) == 1 {
log.Warnf("egp-loss: gasPrice: %d, effectiveGasPrice1: %d, loss: %d, tx: %s", txGasPrice, tx.EffectiveGasPrice, loss, tx.HashStr)
}

tx.EffectiveGasPrice.Set(txGasPrice)
tx.IsLastExecution = true
}
}
Expand Down Expand Up @@ -438,7 +440,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
return nil, err
} else if err == nil && !batchResponse.IsRomLevelError && len(batchResponse.BlockResponses) == 0 {
err = fmt.Errorf("executor returned no errors and no responses for tx %s", tx.HashStr)
f.Halt(ctx, err)
f.Halt(ctx, err, false)
} else if batchResponse.IsExecutorLevelError {
log.Errorf("error received from executor, error: %v", err)
// Delete tx from the worker
Expand Down Expand Up @@ -721,7 +723,7 @@ func (f *finalizer) logZKCounters(counters state.ZKCounters) string {
}

// Halt halts the finalizer
func (f *finalizer) Halt(ctx context.Context, err error) {
func (f *finalizer) Halt(ctx context.Context, err error, isFatal bool) {
f.haltFinalizer.Store(true)

event := &event.Event{
Expand All @@ -738,8 +740,12 @@ func (f *finalizer) Halt(ctx context.Context, err error) {
log.Errorf("error storing finalizer halt event, error: %v", eventErr)
}

for {
log.Errorf("halting finalizer, fatal error: %v", err)
time.Sleep(5 * time.Second) //nolint:gomnd
if isFatal {
log.Fatalf("fatal error on finalizer, error: %v", err)
} else {
for {
log.Errorf("halting finalizer, error: %v", err)
time.Sleep(5 * time.Second) //nolint:gomnd
}
}
}
18 changes: 10 additions & 8 deletions sequencer/forcedbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,18 @@ func (f *finalizer) processForcedBatch(ctx context.Context, forcedBatch state.Fo
return rollbackOnError(fmt.Errorf("error closing state batch %d for forced batch %d, error: %v", newBatchNumber, forcedBatch.ForcedBatchNumber, err))
}

if len(batchResponse.BlockResponses) > 0 && !batchResponse.IsRomOOCError {
err = f.handleProcessForcedBatchResponse(ctx, newBatchNumber, batchResponse, dbTx)
if err != nil {
return rollbackOnError(fmt.Errorf("error when handling batch response for forced batch %d, error: %v", forcedBatch.ForcedBatchNumber, err))
}
}

err = dbTx.Commit(ctx)
if err != nil {
return rollbackOnError(fmt.Errorf("error when commit dbTx when processing forced batch %d, error: %v", forcedBatch.ForcedBatchNumber, err))
}

if len(batchResponse.BlockResponses) > 0 && !batchResponse.IsRomOOCError {
err = f.handleProcessForcedBatchResponse(ctx, batchResponse, dbTx)
return rollbackOnError(fmt.Errorf("error when handling batch response for forced batch %d, error: %v", forcedBatch.ForcedBatchNumber, err))
}

return newBatchNumber, batchResponse.NewStateRoot, nil
}

Expand All @@ -157,7 +159,7 @@ func (f *finalizer) addForcedTxToWorker(forcedBatchResponse *state.ProcessBatchR
}

// handleProcessForcedTxsResponse handles the block/transactions responses for the processed forced batch.
func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, batchResponse *state.ProcessBatchResponse, dbTx pgx.Tx) error {
func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, newBatchNumber uint64, batchResponse *state.ProcessBatchResponse, dbTx pgx.Tx) error {
f.addForcedTxToWorker(batchResponse)

f.updateFlushIDs(batchResponse.FlushID, batchResponse.StoredFlushID)
Expand All @@ -177,7 +179,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, batchR
// process L2 blocks responses for the forced batch
for _, forcedL2BlockResponse := range batchResponse.BlockResponses {
// Store forced L2 blocks in the state
err := f.stateIntf.StoreL2Block(ctx, batchResponse.NewBatchNumber, forcedL2BlockResponse, nil, dbTx)
err := f.stateIntf.StoreL2Block(ctx, newBatchNumber, forcedL2BlockResponse, nil, dbTx)
if err != nil {
return fmt.Errorf("database error on storing L2 block %d, error: %v", forcedL2BlockResponse.BlockNumber, err)
}
Expand All @@ -195,7 +197,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, batchR
}

// Send L2 block to data streamer
err = f.DSSendL2Block(batchResponse.NewBatchNumber, forcedL2BlockResponse, 0)
err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d to data streamer, error: %v", forcedL2BlockResponse.BlockNumber, err)
Expand Down
Loading

0 comments on commit 0e47e24

Please sign in to comment.