Skip to content

Commit 0527dde

Browse files
committed
Added ScalaDoc, added basic test for iterateAVLTree
1 parent 31f1502 commit 0527dde

File tree

5 files changed

+78
-12
lines changed

5 files changed

+78
-12
lines changed

avldb/src/main/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorage.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package scorex.crypto.authds.avltree.batch
22

33
import com.google.common.primitives.Ints
4-
import org.ergoplatform.serialization.ManifestSerializer.MainnetManifestDepth
54
import scorex.crypto.authds.avltree.batch.Constants.{DigestType, HashFnType, hashFn}
65
import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage.{noStoreSerializer, topNodeHashKey, topNodeHeightKey}
76
import scorex.crypto.authds.avltree.batch.serialization.{BatchAVLProverManifest, BatchAVLProverSubtree, ProxyInternalNode}
@@ -155,7 +154,13 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
155154
}
156155
}
157156

158-
def iterateAVLTree(fromIndex: Int)(handleSubtree: BatchAVLProverSubtree[DigestType] => Unit): Unit =
157+
/**
158+
* Split the AVL+ tree to 2^depth number of subtrees and process them
159+
* @param fromIndex - only start processing subtrees from this index
160+
* @param depth - depth at whitch to split AVL+ tree to subtrees
161+
* @param handleSubtree - function to process subtree
162+
*/
163+
def iterateAVLTree(fromIndex: Int, depth: Int)(handleSubtree: BatchAVLProverSubtree[DigestType] => Unit): Unit =
159164
store.processSnapshot { dbReader =>
160165

161166
var current: Int = 0
@@ -172,7 +177,7 @@ class VersionedLDBAVLStorage(store: LDBVersionedStore)
172177

173178
def proxyLoop(label: Array[Byte], level: Int): Unit =
174179
noStoreSerializer.parseBytes(dbReader.get(label)) match {
175-
case in: ProxyInternalNode[DigestType] if level == MainnetManifestDepth =>
180+
case in: ProxyInternalNode[DigestType] if level == depth =>
176181
if(current >= fromIndex) handleSubtree(subtree(in.leftLabel))
177182
current += 1
178183
if(current >= fromIndex) handleSubtree(subtree(in.rightLabel))

avldb/src/test/scala/scorex/crypto/authds/avltree/batch/VersionedLDBAVLStorageSpecification.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import scorex.crypto.authds.{ADDigest, ADKey, ADValue, SerializedAdProof}
1212
import scorex.util.encode.Base16
1313
import scorex.crypto.hash.{Blake2b256, Digest32}
1414
import scorex.db.{LDBFactory, LDBVersionedStore}
15-
import scorex.util.ByteArrayBuilder
15+
import scorex.util.{ByteArrayBuilder, ModifierId, bytesToId}
1616
import scorex.util.serialization.VLQByteBufferWriter
1717
import scorex.utils.{Random => RandomBytes}
1818

19+
import scala.collection.mutable.ArrayBuffer
1920
import scala.concurrent.ExecutionContext.Implicits.global
2021
import scala.concurrent.duration._
2122
import scala.concurrent.{Await, Future}
@@ -370,4 +371,22 @@ class VersionedLDBAVLStorageSpecification
370371
}
371372
}
372373

374+
property("iterate AVL tree") {
375+
val prover = createPersistentProver()
376+
val current = 11
377+
val depth = 6
378+
blockchainWorkflowTest(prover)
379+
380+
val chunkBuffer: ArrayBuffer[(ModifierId,Array[Array[Byte]])] = ArrayBuffer.empty[(ModifierId,Array[Array[Byte]])]
381+
382+
prover.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current, depth) { subtree =>
383+
chunkBuffer += ((
384+
bytesToId(subtree.id),
385+
subtree.leafValues.toArray
386+
))
387+
}
388+
389+
chunkBuffer.size shouldBe math.pow(2, depth) - current
390+
}
391+
373392
}

src/main/scala/org/ergoplatform/nodeView/history/ErgoHistory.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import org.ergoplatform.mining.AutolykosPowScheme
88
import org.ergoplatform.modifiers.history._
99
import org.ergoplatform.modifiers.history.header.{Header, PreGenesisHeader}
1010
import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock, NonHeaderBlockSection}
11-
import org.ergoplatform.nodeView.history.UtxoSetScanner.StartUtxoSetScanWithHistory
11+
import org.ergoplatform.nodeView.history.UtxoSetScanner.InitializeUtxoSetScannerWithHistory
1212
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.StartExtraIndexer
1313
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{IndexedHeightKey, NewestVersion, NewestVersionBytes, SchemaVersionKey, getIndex}
1414
import org.ergoplatform.nodeView.history.storage.HistoryStorage
@@ -304,7 +304,7 @@ object ErgoHistory extends ScorexLogging {
304304

305305
// set history for utxo set scanner, if bootstrapping by snapshot
306306
if(ergoSettings.nodeSettings.utxoSettings.utxoBootstrap) {
307-
context.system.eventStream.publish(StartUtxoSetScanWithHistory(history))
307+
context.system.eventStream.publish(InitializeUtxoSetScannerWithHistory(history))
308308
}
309309

310310
history

src/main/scala/org/ergoplatform/nodeView/history/UtxoSetScanner.scala

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.ergoplatform.nodeView.state.UtxoState
1212
import org.ergoplatform.nodeView.wallet.ErgoWallet
1313
import org.ergoplatform.nodeView.wallet.ErgoWalletActorMessages.ScanBoxesFromUtxoSnapshot
1414
import org.ergoplatform.serialization.ManifestSerializer
15+
import org.ergoplatform.serialization.ManifestSerializer.MainnetManifestDepth
1516
import org.ergoplatform.wallet.boxes.ErgoBoxSerializer
1617
import scorex.crypto.authds.avltree.batch.VersionedLDBAVLStorage
1718
import scorex.crypto.hash.Blake2b256
@@ -25,6 +26,12 @@ import scala.concurrent.Await
2526
import scala.concurrent.ExecutionContext.Implicits.global
2627
import scala.concurrent.duration.Duration
2728

29+
/**
30+
* This class is used to provide the current UTXO set for wallet scans when bootstrapping
31+
* by UTXO set snapshot. This is done by creating a snapshot of the UTXO set, deserializing
32+
* the raw bytes to ErgoBoxes and sending them to the wallet actor in chunks.
33+
* @param nodeView - NodeView actor to get wallet and UTXOs from
34+
*/
2835
class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging {
2936

3037
private var history: ErgoHistory = _
@@ -33,22 +40,39 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging {
3340
private implicit val timeout: Timeout = Timeout(10, TimeUnit.SECONDS)
3441
private implicit val duration: Duration = Duration.create(10, TimeUnit.SECONDS)
3542

43+
/**
44+
* Internal buffer that holds deserialized AVL subtrees until they are sent to wallet
45+
*/
3646
private val chunkBuffer: ArrayBuffer[(ModifierId,Array[ErgoBox])] = ArrayBuffer.empty[(ModifierId,Array[ErgoBox])]
3747

48+
/**
49+
* Reads the current progress of the scanner.
50+
* @return (current segment, total segments)
51+
*/
3852
private def readProgress(): (Int, Int) =
3953
historyStorage.getIndex(utxoSetScanProgressKey).map(ByteBuffer.wrap).map { buffer =>
4054
val current = buffer.getInt
4155
val total = buffer.getInt
4256
(current, total)
4357
}.getOrElse((0, 0))
4458

59+
/**
60+
* Writes progress to db.
61+
* @param current - current retrieved segment
62+
* @param total - total segment count
63+
*/
4564
private def writeProgress(current: Int, total: Int): Unit = {
4665
val buffer: ByteBuffer = ByteBuffer.allocate(8)
4766
buffer.putInt(current)
4867
buffer.putInt(total)
4968
historyStorage.insert(Array((utxoSetScanProgressKey, buffer.array)), Array.empty[BlockSection])
5069
}
5170

71+
/**
72+
* Send deserialized AVL subtrees to wallet for scanning.
73+
* @param wallet - wallet to send to
74+
* @param current - current retrieved segment
75+
*/
5276
private def sendBufferToWallet(wallet: ErgoWallet, current: Int): Unit = {
5377
wallet.scanUtxoSnapshot(ScanBoxesFromUtxoSnapshot(chunkBuffer, current, MainnetTotal))
5478
writeProgress(current, MainnetTotal)
@@ -69,11 +93,12 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging {
6993
)
7094

7195
val initialized: Boolean = Await.result(wallet.getWalletStatus.map(_.initialized), duration)
72-
if(!initialized) return
96+
if(!initialized) // wallet is not initialized
97+
return
7398

7499
log.info(s"Starting UTXO set snapshot scan for $total chunks")
75100

76-
state.persistentProver.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current) { subtree =>
101+
state.persistentProver.storage.asInstanceOf[VersionedLDBAVLStorage].iterateAVLTree(current, MainnetManifestDepth) { subtree =>
77102
current += 1
78103

79104
chunkBuffer += ((
@@ -93,6 +118,7 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging {
93118

94119
if(current == total) {
95120
log.info(s"Successfully scanned $total Utxo set subtrees")
121+
// send newest block to wallet, if blocks were applied since scan began it will go back to scan them
96122
wallet.scanPersistent(history.bestFullBlockOpt.get)
97123
}else {
98124
log.error(s"Inconsistent Utxo set scan state: $current scanned subtrees out of $total")
@@ -101,27 +127,40 @@ class UtxoSetScanner(nodeView: ActorRef) extends Actor with ScorexLogging {
101127
}
102128

103129
override def receive: Receive = {
104-
case StartUtxoSetScanWithHistory(history: ErgoHistory) =>
130+
case InitializeUtxoSetScannerWithHistory(history: ErgoHistory) =>
105131
this.history = history
106132
run()
107133
case StartUtxoSetScan(rescan: Boolean) =>
108-
if(readProgress()._1 == 0 || rescan) writeProgress(0, MainnetTotal)
134+
if(readProgress()._1 == 0 || //
135+
rescan) // start over UTXO set scan
136+
writeProgress(0, MainnetTotal)
109137
run()
110138
}
111139

112140
override def preStart(): Unit = {
113-
context.system.eventStream.subscribe(self, classOf[StartUtxoSetScanWithHistory])
141+
context.system.eventStream.subscribe(self, classOf[InitializeUtxoSetScannerWithHistory])
114142
context.system.eventStream.subscribe(self, classOf[StartUtxoSetScan])
115143
}
116144

117145
}
118146

119147
object UtxoSetScanner {
120148

121-
case class StartUtxoSetScanWithHistory(history: ErgoHistory)
149+
/**
150+
* Initialize UTXO set scanner with database and try continuing scan if possible
151+
* @param history - database handle
152+
*/
153+
case class InitializeUtxoSetScannerWithHistory(history: ErgoHistory)
122154

155+
/**
156+
* Start scanning UTXO set, or continue if the scan was interrupted, or start over if rescan = true
157+
* @param rescan - whether to start over or just continue scan
158+
*/
123159
case class StartUtxoSetScan(rescan: Boolean)
124160

161+
/**
162+
* Number of subtrees to divide AVL tree to
163+
*/
125164
private val MainnetTotal: Int = math.pow(2, ManifestSerializer.MainnetManifestDepth).toInt
126165

127166
private val utxoSetScanProgressKey: ByteArrayWrapper =

src/main/scala/org/ergoplatform/nodeView/wallet/WalletScanLogic.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ object WalletScanLogic extends ScorexLogging {
198198

199199
val scanRes = ScanResults(myOutputs, Seq.empty, Seq.empty)
200200

201+
/** Pass subtreeId as blockId; set height to 0 so when UTXO set scan is finished normal wallet scan
202+
* will start with the first non-pruned block (see [[ErgoWalletState.expectedNextBlockHeight]])
203+
*/
201204
updateRegistryAndOffchain(registry, offChainRegistry, outputsFilter, scanRes, subtreeId, 0)
202205
}
203206

0 commit comments

Comments
 (0)