Skip to content

[Chunk Data Pack Pruner] Add block view index #6933

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7b3df54
add block view index
zhangchiqing Jan 24, 2025
b1f6f92
add headers.BlockIDByView methods
zhangchiqing Jan 24, 2025
fbe7d58
refactor index block by view
zhangchiqing Jan 31, 2025
b5d1cf6
Apply suggestions from code review
zhangchiqing Jan 31, 2025
d36f839
fix tests
zhangchiqing Jan 31, 2025
53a8271
fix lint
zhangchiqing Jan 31, 2025
0354fbe
skip duplicates
zhangchiqing Feb 1, 2025
f383dcb
rename IndexBlockHeight to IndexFinalizedBlockByHeight
zhangchiqing Feb 26, 2025
cd0ec0c
rename IndexBlockHeight to IndexFinalizedBlockByHeight
zhangchiqing Feb 26, 2025
04b58b8
added documentation for mutator around its assumption wrt concurrent …
AlexHentschel Feb 28, 2025
6cf38df
Merge branch 'leo/add-block-view-index' into alex/add-block-view-inde…
AlexHentschel Feb 28, 2025
13c527f
Merge pull request #7101 from onflow/alex/add-block-view-index_-_sugg…
zhangchiqing Feb 28, 2025
4095646
Merge branch 'master' into leo/add-block-view-index
zhangchiqing Feb 28, 2025
58c7e3a
fix lint
zhangchiqing Feb 28, 2025
ad62c7f
update blocks.ByView
zhangchiqing Feb 28, 2025
8e916f7
fix test to add BlockWithParentFixtureAndUniqueView
zhangchiqing Mar 3, 2025
5599405
Merge branch 'master' into leo/add-block-view-index
zhangchiqing Mar 6, 2025
b18c086
Merge branch 'master' into leo/add-block-view-index
zhangchiqing Apr 29, 2025
48d93aa
Merge branch 'master' into leo/add-block-view-index
zhangchiqing May 2, 2025
bd56282
Apply suggestions from code review
zhangchiqing May 2, 2025
fc55db3
Merge branch 'master' into leo/add-block-view-index
zhangchiqing Jul 10, 2025
7bd3f18
fix lint
zhangchiqing Jul 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (suite *Suite) TestGetBlockByIDAndHeight() {
require.NoError(suite.T(), all.Blocks.Store(&block2))

// the follower logic should update height index on the block storage when a block is finalized
err := db.Update(operation.IndexBlockHeight(block2.Header.Height, block2.ID()))
err := db.Update(operation.IndexFinalizedBlockByHeight(block2.Header.Height, block2.ID()))
require.NoError(suite.T(), err)

assertHeaderResp := func(
Expand Down Expand Up @@ -717,7 +717,7 @@ func (suite *Suite) TestGetSealedTransaction() {
err = all.Blocks.Store(block)
require.NoError(suite.T(), err)

err = db.Update(operation.IndexBlockHeight(block.Header.Height, block.ID()))
err = db.Update(operation.IndexFinalizedBlockByHeight(block.Header.Height, block.ID()))
require.NoError(suite.T(), err)

suite.sealedBlock = block.Header
Expand Down Expand Up @@ -935,7 +935,7 @@ func (suite *Suite) TestGetTransactionResult() {
require.NoError(suite.T(), err)
}
}
err = db.Update(operation.IndexBlockHeight(block.Header.Height, block.ID()))
err = db.Update(operation.IndexFinalizedBlockByHeight(block.Header.Height, block.ID()))
require.NoError(suite.T(), err)
finalSnapshot.On("Head").Return(block.Header, nil)

Expand Down Expand Up @@ -1168,7 +1168,7 @@ func (suite *Suite) TestExecuteScript() {
lastBlock := unittest.BlockWithParentFixture(prevBlock.Header)
err = all.Blocks.Store(lastBlock)
require.NoError(suite.T(), err)
err = db.Update(operation.IndexBlockHeight(lastBlock.Header.Height, lastBlock.ID()))
err = db.Update(operation.IndexFinalizedBlockByHeight(lastBlock.Header.Height, lastBlock.ID()))
require.NoError(suite.T(), err)
//update latest sealed block
suite.sealedBlock = lastBlock.Header
Expand All @@ -1182,7 +1182,7 @@ func (suite *Suite) TestExecuteScript() {

err = all.Blocks.Store(prevBlock)
require.NoError(suite.T(), err)
err = db.Update(operation.IndexBlockHeight(prevBlock.Header.Height, prevBlock.ID()))
err = db.Update(operation.IndexFinalizedBlockByHeight(prevBlock.Header.Height, prevBlock.ID()))
require.NoError(suite.T(), err)

// create execution receipts for each of the execution node and the previous block
Expand Down
2 changes: 1 addition & 1 deletion engine/execution/pruner/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestLoopPruneExecutionDataFromRootToLatestSealed(t *testing.T) {
})
chunks[i] = chunk // index by height
require.NoError(t, headers.Store(chunk.Header))
require.NoError(t, bdb.Update(operation.IndexBlockHeight(chunk.Header.Height, chunk.Header.ID())))
require.NoError(t, bdb.Update(operation.IndexFinalizedBlockByHeight(chunk.Header.Height, chunk.Header.ID())))
require.NoError(t, results.Store(chunk.Result))
require.NoError(t, results.Index(chunk.Result.BlockID, chunk.Result.ID()))
require.NoError(t, chunkDataPacks.Store([]*flow.ChunkDataPack{chunk.ChunkDataPack}))
Expand Down
2 changes: 1 addition & 1 deletion module/block_iterator/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestIterateHeight(t *testing.T) {

// index height
for _, b := range bs {
require.NoError(t, db.Update(operation.IndexBlockHeight(b.Height, b.ID())))
require.NoError(t, db.Update(operation.IndexFinalizedBlockByHeight(b.Height, b.ID())))
}

progress := &saveNextHeight{}
Expand Down
4 changes: 2 additions & 2 deletions module/builder/consensus/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,15 @@ func (bs *BuilderSuite) SetupTest() {

err := bs.db.Update(operation.InsertFinalizedHeight(final.Header.Height))
bs.Require().NoError(err)
err = bs.db.Update(operation.IndexBlockHeight(final.Header.Height, bs.finalID))
err = bs.db.Update(operation.IndexFinalizedBlockByHeight(final.Header.Height, bs.finalID))
bs.Require().NoError(err)

err = bs.db.Update(operation.InsertRootHeight(13))
bs.Require().NoError(err)

err = bs.db.Update(operation.InsertSealedHeight(first.Header.Height))
bs.Require().NoError(err)
err = bs.db.Update(operation.IndexBlockHeight(first.Header.Height, first.ID()))
err = bs.db.Update(operation.IndexFinalizedBlockByHeight(first.Header.Height, first.ID()))
bs.Require().NoError(err)

bs.sentinel = 1337
Expand Down
4 changes: 2 additions & 2 deletions module/finalizedreader/finalizedreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestFinalizedReader(t *testing.T) {
require.NoError(t, err)

// index the header
err = db.Update(operation.IndexBlockHeight(block.Header.Height, block.ID()))
err = db.Update(operation.IndexFinalizedBlockByHeight(block.Header.Height, block.ID()))
require.NoError(t, err)

// verify is able to reader the finalized block ID
Expand All @@ -44,7 +44,7 @@ func TestFinalizedReader(t *testing.T) {
// finalize one more block
block2 := unittest.BlockWithParentFixture(block.Header)
require.NoError(t, headers.Store(block2.Header))
err = db.Update(operation.IndexBlockHeight(block2.Header.Height, block2.ID()))
err = db.Update(operation.IndexFinalizedBlockByHeight(block2.Header.Height, block2.ID()))
require.NoError(t, err)
reader.BlockFinalized(block2.Header)

Expand Down
6 changes: 3 additions & 3 deletions module/finalizer/consensus/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestMakeFinalValidChain(t *testing.T) {
require.NoError(t, err)

// map the finalized height to the finalized block ID
err = db.Update(operation.IndexBlockHeight(final.Height, final.ID()))
err = db.Update(operation.IndexFinalizedBlockByHeight(final.Height, final.ID()))
require.NoError(t, err)

// insert the finalized block header into the DB
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestMakeFinalInvalidHeight(t *testing.T) {
require.NoError(t, err)

// map the finalized height to the finalized block ID
err = db.Update(operation.IndexBlockHeight(final.Height, final.ID()))
err = db.Update(operation.IndexFinalizedBlockByHeight(final.Height, final.ID()))
require.NoError(t, err)

// insert the finalized block header into the DB
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestMakeFinalDuplicate(t *testing.T) {
require.NoError(t, err)

// map the finalized height to the finalized block ID
err = db.Update(operation.IndexBlockHeight(final.Height, final.ID()))
err = db.Update(operation.IndexFinalizedBlockByHeight(final.Height, final.ID()))
require.NoError(t, err)

// insert the finalized block header into the DB
Expand Down
1 change: 1 addition & 0 deletions module/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
ResourceProposal = "proposal"
ResourceHeader = "header"
ResourceFinalizedHeight = "finalized_height"
ResourceCertifiedView = "certified_view"
ResourceIndex = "index"
ResourceIdentity = "identity"
ResourceGuarantee = "guarantee"
Expand Down
73 changes: 62 additions & 11 deletions state/protocol/badger/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@
// CAUTION:
// - This function expects that `certifyingQC ` has been validated. (otherwise, the state will be corrupted)
// - The parent block must already have been ingested.
// - Attempts to extend the state with the _same block concurrently_ are not allowed.
// (will not corrupt the state, but may lead to an exception)

Check failure on line 139 in state/protocol/badger/mutator.go

View workflow job for this annotation

GitHub Actions / Lint (./)

File is not properly formatted (goimports)
// Orphaned blocks are excepted.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Orphaned blocks are excepted.

I'm not clear on what this means. Excepted as in the above caution doesn't apply? Or do they trigger an exception? @AlexHentschel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it means "above caution does not apply", I don't think it is accurate. If it means "triggers an exception" I think we should say that instead.

//
// Per convention, the protocol state requires that the candidate's parent has already been ingested.
// Other than that, all valid extensions are accepted. Even if we have enough information to determine that
Expand All @@ -147,6 +150,13 @@
// determine it is orphaned and drop it, attempt to ingest Y re-request the unknown parent X and repeat
// potentially very often.
//
// To ensure that all ancestors of a candidate block are correct and known to the FollowerState, some external
// ordering and queuing of incoming blocks is generally necessary (responsibility of Compliance Layer). Once a block
// is successfully ingested, repeated extension requests with this block are no-ops. This is convenient for the
// Compliance Layer after a crash, so it doesn't have to worry about which blocks have already been ingested before
// the crash. However, while running it is very easy for the Compliance Layer to avoid concurrent extension requests
// with the same block. Hence, for simplicity, the FollowerState may reject such requests with an exception.
//
// No errors are expected during normal operations.
func (m *FollowerState) ExtendCertified(ctx context.Context, candidate *flow.Block, certifyingQC *flow.QuorumCertificate) error {
span, ctx := m.tracer.StartSpanFromContext(ctx, trace.ProtoStateMutatorHeaderExtend)
Expand Down Expand Up @@ -190,6 +200,9 @@

// Execute the deferred database operations as one atomic transaction and emit scheduled notifications on success.
// The `candidate` block _must be valid_ (otherwise, the state will be corrupted)!
// Therefore, they should be identified
// additional are just a no-op if they are identified as such early enough by the `checkBlockAlreadyProcessed` - though if the same extension is added concurrently, we might return a
// [storage.ErrAlreadyExists] error.
Comment on lines +204 to +206
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, they should be identified
additional are just a no-op if they are identified as such early enough by the `checkBlockAlreadyProcessed [...]

I'm not understanding this sentence... 😅 @AlexHentschel

err = operation.RetryOnConflictTx(m.db, transaction.Update, deferredDbOps.Pending()) // No errors are expected during normal operations
if err != nil {
return fmt.Errorf("failed to persist candidate block %v and its dependencies: %w", blockID, err)
Expand All @@ -201,8 +214,12 @@
// Extend extends the protocol state of a CONSENSUS PARTICIPANT. It checks
// the validity of the _entire block_ (header and full payload).
//
// CAUTION: per convention, the protocol state requires that the candidate's
// parent has already been ingested. Otherwise, an exception is returned.
// CAUTION:
// - per convention, the protocol state requires that the candidate's
// parent has already been ingested. Otherwise, an exception is returned.
// - Attempts to extend the state with the _same block concurrently_ are not allowed.
// (will not corrupt the state, but may lead to an exception)
// Orphaned blocks are excepted.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here - it isn't clear what "orphaned blocks are excepted" means.

//
// Per convention, the protocol state requires that the candidate's parent has already been ingested.
// Other than that, all valid extensions are accepted. Even if we have enough information to determine that
Expand All @@ -215,6 +232,13 @@
// determine it is orphaned and drop it, attempt to ingest Y re-request the unknown parent X and repeat
// potentially very often.
//
// To ensure that all ancestors of a candidate block are correct and known to the Protocol State, some external
// ordering and queuing of incoming blocks is generally necessary (responsibility of Compliance Layer). Once a block
// is successfully ingested, repeated extension requests with this block are no-ops. This is convenient for the
// Compliance Layer after a crash, so it doesn't have to worry about which blocks have already been ingested before
// the crash. However, while running it is very easy for the Compliance Layer to avoid concurrent extension requests
// with the same block. Hence, for simplicity, the Protocol State may reject such requests with an exception.
//
// Expected errors during normal operations:
// - state.OutdatedExtensionError if the candidate block is outdated (e.g. orphaned)
// - state.InvalidExtensionError if the candidate block is invalid
Expand Down Expand Up @@ -290,16 +314,26 @@
// If all checks pass, this method queues the following operations to persist the candidate block and
// schedules `BlockProcessable` notification to be emitted in order of increasing height:
//
// 5a. store QC embedded into the candidate block and emit `BlockProcessable` notification for the parent
// 5a. if and only if the candidate block's parent has not been certified yet:
// - store QC embedded into the candidate block
// - add the parent to the index of certified blocks (index: view → parent block's ID)
// - queue a `BlockProcessable` notification for the parent
// 5b. store candidate block and index it as a child of its parent (needed for recovery to traverse unfinalized blocks)
// 5c. if we are given a certifyingQC, store it and queue a `BlockProcessable` notification for the candidate block
// 5c. if and only if we are given a `certifyingQC`
// - store this QC certifying the candidate block
// - add candidate to the index of certified blocks (index: view → candidate block's ID)
// - queue a `BlockProcessable` notification for the candidate block
//
// If `headerExtend` is called by `ParticipantState.Extend` (full consensus participant) then `certifyingQC` will be nil,
// but the block payload will be validated. If `headerExtend` is called by `FollowerState.Extend` (consensus follower),
// then `certifyingQC` must be not nil which proves payload validity.
//
// If the candidate block has already been ingested, the deferred database operations returned by this function call
// will errors with the benign sentinel [storage.ErrAlreadyExists], aborting the database transaction (without corrupting
// the protocol state).
//
// Expected errors during normal operations:
// - state.InvalidExtensionError if the candidate block is invalid
// - [state.InvalidExtensionError] if the candidate block is invalid
func (m *FollowerState) headerExtend(ctx context.Context, candidate *flow.Block, certifyingQC *flow.QuorumCertificate, deferredDbOps *transaction.DeferredDbOps) error {
span, _ := m.tracer.StartSpanFromContext(ctx, trace.ProtoStateMutatorExtendCheckHeader)
defer span.End()
Expand Down Expand Up @@ -355,16 +389,26 @@
// STEP 5:
qc := candidate.Header.QuorumCertificate()
deferredDbOps.AddDbOp(func(tx *transaction.Tx) error {
// STEP 5a: Store QC for parent block and emit `BlockProcessable` notification if and only if
// - the QC for the parent has not been stored before (otherwise, we already emitted the notification) and
// - the parent block's height is larger than the finalized root height (the root block is already considered processed)
// Thereby, we reduce duplicated `BlockProcessable` notifications.
// STEP 5a: Deciding whether the candidate's parent has already been certified or not.
// Here, we populate the [storage.QuorumCertificates] index: certified block ID → QC. Except for bootstrapping, this is the
// only place where this index is updated. Therefore, the parent is certified if and only if [storage.QuorumCertificates]
// contains an entry for `qc.BlockID`. We optimistically attempt to add a new element to the index. We receive a
// [storage.ErrAlreadyExists] sentinel if and only if step 5a has already been executed for the parent.
// CAUTION: This approach only works for database backends that support reads and writes as part of the same atomic transaction!
// For Pebble, where only batch writes with asynchronous reads are supported, the following logic has to be reworked.
err := m.qcs.StoreTx(qc)(tx)
if err != nil {
// storage.ErrAlreadyExists guarantees that 5a has already been executed for the parent.
if !errors.Is(err, storage.ErrAlreadyExists) {
return fmt.Errorf("could not store incorporated qc: %w", err)
}
} else {
} else { // no error entails that 5a has never been executed for the parent block
// add parent to index of certified blocks:
err := transaction.WithTx(operation.IndexCertifiedBlockByView(parent.View, qc.BlockID))(tx)
if err != nil {
return fmt.Errorf("could not index certified block: %w", err)
}

// trigger BlockProcessable for parent block above root height
if parent.Height > m.finalizedRootHeight {
tx.OnSucceed(func() {
Expand All @@ -389,6 +433,13 @@
if err != nil {
return fmt.Errorf("could not store certifying qc: %w", err)
}

// add candidate to index of certified blocks:
err := transaction.WithTx(operation.IndexCertifiedBlockByView(candidate.Header.View, blockID))(tx)
if err != nil {
return fmt.Errorf("could not index certified block: %w", err)
}

tx.OnSucceed(func() { // queue a BlockProcessable event for candidate block, since it is certified
m.consumer.BlockProcessable(candidate.Header, certifyingQC)
})
Expand Down Expand Up @@ -735,7 +786,7 @@
// its payload, in which case the parent's seal is the same.
// * set the epoch fallback flag, if it is triggered
err = operation.RetryOnConflict(m.db.Update, func(tx *badger.Txn) error {
err = operation.IndexBlockHeight(header.Height, blockID)(tx)
err = operation.IndexFinalizedBlockByHeight(header.Height, blockID)(tx)
if err != nil {
return fmt.Errorf("could not insert number mapping: %w", err)
}
Expand Down
11 changes: 10 additions & 1 deletion state/protocol/badger/mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ func TestExtendValid(t *testing.T) {
consumer.On("BlockProcessable", block1.Header, mock.Anything).Once()
err := fullState.Extend(context.Background(), block2)
require.NoError(t, err)

// verify that block1's view is indexed
var indexedID flow.Identifier
require.NoError(t, db.View(operation.LookupCertifiedBlockByView(block1.Header.View, &indexedID)))
require.Equal(t, block1.ID(), indexedID)

// verify that block2's view is not indexed
err = db.View(operation.LookupCertifiedBlockByView(block2.Header.View, &indexedID))
require.ErrorIs(t, err, stoerr.ErrNotFound)
})
})
}
Expand Down Expand Up @@ -1530,7 +1539,7 @@ func TestExtendEpochCommitInvalid(t *testing.T) {
// swap consensus node for a new one for epoch 2
epoch2NewParticipant := unittest.IdentityFixture(unittest.WithRole(flow.RoleConsensus))
epoch2Participants := append(
participants.Filter(filter.Not[flow.Identity](filter.HasRole[flow.Identity](flow.RoleConsensus))),
participants.Filter(filter.Not(filter.HasRole[flow.Identity](flow.RoleConsensus))),
epoch2NewParticipant,
).Sort(flow.Canonical[flow.Identity]).ToSkeleton()

Expand Down
4 changes: 2 additions & 2 deletions state/protocol/badger/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func bootstrapSealingSegment(
if err != nil {
return fmt.Errorf("could not insert SealingSegment extra block: %w", err)
}
err = operation.IndexBlockHeight(height, blockID)(txn)
err = operation.IndexFinalizedBlockByHeight(height, blockID)(txn)
if err != nil {
return fmt.Errorf("could not index SealingSegment extra block (id=%x): %w", blockID, err)
}
Expand All @@ -340,7 +340,7 @@ func bootstrapSealingSegment(
if err != nil {
return fmt.Errorf("could not insert SealingSegment block: %w", err)
}
err = operation.IndexBlockHeight(height, blockID)(txn)
err = operation.IndexFinalizedBlockByHeight(height, blockID)(txn)
if err != nil {
return fmt.Errorf("could not index SealingSegment block (id=%x): %w", blockID, err)
}
Expand Down
Loading
Loading