[Storage Refactor] Refactor saving execution results#6906
[Storage Refactor] Refactor saving execution results#6906zhangchiqing merged 56 commits intomasterfrom
Conversation
f793dcf to
51b6fa0
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #6906 +/- ##
==========================================
- Coverage 41.26% 41.25% -0.02%
==========================================
Files 2155 2158 +3
Lines 189309 189421 +112
==========================================
+ Hits 78125 78150 +25
- Misses 104673 104754 +81
- Partials 6511 6517 +6
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
7aaf9ae to
868a31e
Compare
| events storage.Events | ||
| lightTransactionResults storage.LightTransactionResults | ||
| transactionResultErrorMessages storage.TransactionResultErrorMessages |
There was a problem hiding this comment.
Access node syncs events, transaction results from EN with indexer engine, I'm refactoring them altogether along with the execution node's saving results process.
| }). | ||
| Module("transaction results storage", func(node *cmd.NodeConfig) error { | ||
| builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize) | ||
| builder.lightTransactionResults = store.NewLightTransactionResults(node.Metrics.Cache, node.ProtocolDB, bstorage.DefaultCacheSize) |
There was a problem hiding this comment.
ProtocolDB is a storage abstraction, so that we can change in a single place to decide using badger or pebble.
| exeNode.txResults, | ||
| node.DB, | ||
| node.ProtocolDB, | ||
| getLatestFinalized, |
There was a problem hiding this comment.
This is for the execution state to implement getting highest finalized and executed block.
|
|
||
| var blockID flow.Identifier | ||
| var lastExecuted uint64 | ||
| err = db.View(procedure.GetLastExecutedBlock(&lastExecuted, &blockID)) |
There was a problem hiding this comment.
The GetLastExecutedBlock procedure is replaced by RetrieveExecutedBlock which returns only the block ID, and getting the block with protocol State.
| } | ||
|
|
||
| // RemoveStateCommitment removes the state commitment by block ID | ||
| func RemoveStateCommitment(w storage.Writer, blockID flow.Identifier) error { |
There was a problem hiding this comment.
This file is mostly identical with storage/badger/operation/commits.go, except BatchRemoveStateCommitment and BatchIndexStateCommitment are removed, since RemoveStateCommitment is always a batch updates.
| @@ -0,0 +1,31 @@ | |||
| package operation_test | |||
There was a problem hiding this comment.
this file is identical to the same file in storage/badger/operation/
| @@ -0,0 +1,166 @@ | |||
| package operation | |||
There was a problem hiding this comment.
this file is identical to the same file in storage/badger/operation/
Except the following functions are removed:
- BatchInsertTransactionResult
- BatchIndexTransactionResult
| @@ -0,0 +1,82 @@ | |||
| package store | |||
There was a problem hiding this comment.
this file is identical to the same file in storage/badger/operation/
storage/store/my_receipts.go
Outdated
| blockID := receipt.ExecutionResult.BlockID | ||
| receiptID := receipt.ID() | ||
| var myOwnReceiptExecutedBefore flow.Identifier | ||
| err := operation.LookupOwnExecutionReceipt(rw.GlobalReader(), blockID, &myOwnReceiptExecutedBefore) |
There was a problem hiding this comment.
This is where we need to pay attention to, the original implementation can check if an own receipt already exist during insert.
if errors.Is(err, storage.ErrAlreadyExists) {
var savedReceiptID flow.Identifier
err := operation.LookupOwnExecutionReceipt(blockID, &savedReceiptID)(tx)
if err != nil {
return err
}
The original implementation can do so because it's using badger transaction. However, the new storage abstraction uses badger updates, so we have to check it ourselves.
The new way it works is to acquire the indexingOwnReceipts and ensure no other process is storing own receipts, so that the check for if any own receipts for this block won't be stale.
storage/store/my_receipts_test.go
Outdated
| }) | ||
| }) | ||
|
|
||
| t.Run("store1 different receipt concurrent for same block should fail", func(t *testing.T) { |
There was a problem hiding this comment.
This file is the same as the original, except this added concurrent tests.
| block := unittest.BlockFixture() | ||
| receipt1 := unittest.ReceiptForBlockFixture(&block) | ||
|
|
||
| err := db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { |
There was a problem hiding this comment.
This file is the same as the deleted storage/badger/my_receipts_test.go, except the StoreMyReceipt call is replaced with BatchStoreMyReceipt, since we never call StoreMyReceipt alone.
peterargue
left a comment
There was a problem hiding this comment.
long, but straight forward. looks good to me
fxamacker
left a comment
There was a problem hiding this comment.
LGTM! I mostly focused on general Go programming.
| reader := badgerimpl.ToDB(db).Reader() | ||
| err = operation.RetrieveExecutedBlock(reader, &blockID) |
There was a problem hiding this comment.
findLastExecutedAndSealedHeight() (not added in this PR) appears to be very similar to latest.LatestSealedAndExecutedHeight(). Maybe reuse that function.
This PR updates the execution node and access node’s saving result process to use the new storage abstraction.
The following database modules are refactored into new storage abstraction: