Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OrderedTxPool rework #1958

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.ergoplatform.nodeView.mempool

import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction}
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.utils.generators.ErgoTransactionGenerators
import org.scalameter.KeyValue
import org.scalameter.api._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.ergoplatform.nodeView.mempool

import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction}
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.nodeView.mempool
import org.ergoplatform.utils.generators.{ErgoGenerators, ErgoTransactionGenerators}
import org.scalacheck.Gen
import org.scalatest.propspec.AnyPropSpec
Expand All @@ -15,5 +16,5 @@ class MempoolPerformanceBench extends AnyPropSpec
override val memPoolGenerator: Gen[ErgoMemPool] = emptyMemPoolGen
override val transactionGenerator: Gen[ErgoTransaction] = invalidErgoTransactionGen
override val unconfirmedTxGenerator: Gen[UnconfirmedTransaction] =
invalidErgoTransactionGen.map(tx => UnconfirmedTransaction(tx, None))
invalidErgoTransactionGen.map(tx => mempool.UnconfirmedTransaction(tx, None))
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package org.ergoplatform.http.api
import akka.actor.ActorRef
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.{Directive1, ValidationRejection}
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction}
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.nodeView.ErgoReadersHolder.{GetReaders, Readers}
import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader
import org.ergoplatform.nodeView.mempool.{ErgoMemPoolReader, UnconfirmedTransaction}
import org.ergoplatform.nodeView.state.{ErgoStateReader, UtxoStateReader}
import org.ergoplatform.settings.{Algos, ErgoSettings}
import scorex.core.api.http.{ApiError, ApiRoute}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import akka.pattern.ask
import io.circe.Json
import io.circe.syntax._
import org.ergoplatform.ErgoBox.{BoxId, NonMandatoryRegisterId, TokenId}
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, ErgoTransactionSerializer, UnconfirmedTransaction}
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, ErgoTransactionSerializer}
import org.ergoplatform.nodeView.ErgoReadersHolder.{GetReaders, Readers}
import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader
import org.ergoplatform.nodeView.mempool.{ErgoMemPoolReader, UnconfirmedTransaction}
import org.ergoplatform.nodeView.mempool.HistogramStats.getFeeHistogram
import org.ergoplatform.nodeView.state.{ErgoStateReader, UtxoStateReader}
import org.ergoplatform.settings.{Algos, ErgoSettings}
Expand Down Expand Up @@ -135,7 +135,7 @@ case class TransactionsApiRoute(readersHolder: ActorRef,
val feeHistogramParameters: Directive[(Int, Long)] = parameters("bins".as[Int] ? 10, "maxtime".as[Long] ? (60*1000L))

def getFeeHistogramR: Route = (path("poolHistogram") & get & feeHistogramParameters) { (bins, maxtime) =>
ApiResponse(getMemPool.map(p => getFeeHistogram(System.currentTimeMillis(), bins, maxtime, p.weightedTransactionIds(Int.MaxValue)).asJson))
ApiResponse(getMemPool.map(p => getFeeHistogram(System.currentTimeMillis(), bins, maxtime, p.txTimesAndWeights).asJson))
}

val feeRequestParameters: Directive[(Int, Int)] = parameters("waitTime".as[Int] ? 1, "txSize".as[Int] ? 100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.circe.syntax._
import io.circe.{Encoder, Json}
import org.ergoplatform._
import org.ergoplatform.http.api.requests.HintExtractionRequest
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction}
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.nodeView.ErgoReadersHolder.{GetReaders, Readers}
import org.ergoplatform.nodeView.wallet._
import org.ergoplatform.nodeView.wallet.requests._
Expand All @@ -25,6 +25,7 @@ import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import akka.http.scaladsl.server.MissingQueryParamRejection
import org.ergoplatform.nodeView.mempool.UnconfirmedTransaction

case class WalletApiRoute(readersHolder: ActorRef,
nodeViewActorRef: ActorRef,
Expand Down
3 changes: 1 addition & 2 deletions src/main/scala/org/ergoplatform/local/CleanupWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ package org.ergoplatform.local
import akka.actor.{Actor, ActorRef}
import org.ergoplatform.local.CleanupWorker.RunCleanup
import org.ergoplatform.local.MempoolAuditor.CleanupDone
import org.ergoplatform.modifiers.mempool.UnconfirmedTransaction
import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader
import org.ergoplatform.nodeView.mempool.{ErgoMemPoolReader, UnconfirmedTransaction}
import org.ergoplatform.nodeView.state.UtxoStateReader
import org.ergoplatform.settings.NodeConfigurationSettings
import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.{EliminateTransactions, RecheckedTransactions}
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/org/ergoplatform/local/MempoolAuditor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorRef, ActorRefFactory, DeathPactException, OneForOneStrategy, Props}
import org.ergoplatform.local.CleanupWorker.RunCleanup
import org.ergoplatform.local.MempoolAuditor.CleanupDone
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction}
import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.nodeView.mempool.{ErgoMemPoolReader, UnconfirmedTransaction}
import org.ergoplatform.settings.ErgoSettings
import scorex.core.network.Broadcast
import scorex.core.network.NetworkController.ReceivableMessages.SendToNetwork
Expand Down Expand Up @@ -101,15 +101,15 @@ class MempoolAuditor(nodeViewHolderRef: ActorRef,
val stateToCheck = utxoState.withUnconfirmedTransactions(toBroadcast)
toBroadcast.foreach { unconfirmedTx =>
if (unconfirmedTx.transaction.inputIds.forall(inputBoxId => stateToCheck.boxById(inputBoxId).isDefined)) {
log.info(s"Rebroadcasting $unconfirmedTx")
log.info(s"Rebroadcasting ${unconfirmedTx.id}")
broadcastTx(unconfirmedTx)
} else {
log.info(s"Not rebroadcasting $unconfirmedTx as not all the inputs are in place")
log.info(s"Not rebroadcasting ${unconfirmedTx.id} as not all the inputs are in place")
}
}
case _ =>
toBroadcast.foreach { unconfirmedTx =>
log.warn(s"Rebroadcasting $unconfirmedTx while state is not ready or not UTXO set")
log.warn(s"Rebroadcasting ${unconfirmedTx.id} while state is not ready or not UTXO set")
broadcastTx(unconfirmedTx)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import org.ergoplatform.modifiers.history._
import org.ergoplatform.modifiers.history.extension.Extension
import org.ergoplatform.modifiers.history.header.{Header, HeaderWithoutPow}
import org.ergoplatform.modifiers.history.popow.NipopowAlgos
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction}
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages._
import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.{EliminateTransactions, LocallyGeneratedModifier}
import org.ergoplatform.nodeView.ErgoReadersHolder.{GetReaders, Readers}
import org.ergoplatform.nodeView.history.ErgoHistory.Height
import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoHistoryReader}
import org.ergoplatform.nodeView.mempool.ErgoMemPoolReader
import org.ergoplatform.nodeView.mempool.{ErgoMemPoolReader, UnconfirmedTransaction}
import org.ergoplatform.nodeView.state.{ErgoState, ErgoStateContext, StateType, UtxoStateReader}
import org.ergoplatform.settings.{ErgoSettings, ErgoValidationSettingsUpdate, Parameters}
import org.ergoplatform.sdk.wallet.Constants.MaxAssetsPerBox
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package org.ergoplatform.network
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorInitializationException, ActorKilledException, ActorRef, ActorRefFactory, DeathPactException, OneForOneStrategy, Props}
import org.ergoplatform.modifiers.history.header.Header
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, ErgoTransactionSerializer, UnconfirmedTransaction}
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, ErgoTransactionSerializer}
import org.ergoplatform.modifiers.{BlockSection, ManifestTypeId, NetworkObjectTypeId, SnapshotsInfoTypeId, UtxoSnapshotChunkTypeId}
import org.ergoplatform.nodeView.history.{ErgoSyncInfoV1, ErgoSyncInfoV2}
import org.ergoplatform.nodeView.history._
import ErgoNodeViewSynchronizer.{CheckModifiersToDownload, IncomingTxInfo, TransactionProcessingCacheRecord}
import org.ergoplatform.nodeView.ErgoNodeViewHolder.BlockAppliedTransactions
import org.ergoplatform.nodeView.history.{ErgoHistory, ErgoSyncInfo, ErgoSyncInfoMessageSpec}
import org.ergoplatform.nodeView.mempool.{ErgoMemPool, ErgoMemPoolReader}
import org.ergoplatform.nodeView.mempool.{ErgoMemPool, ErgoMemPoolReader, UnconfirmedTransaction}
import org.ergoplatform.settings.{Algos, Constants, ErgoSettings}
import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages._
import org.ergoplatform.nodeView.ErgoNodeViewHolder.ReceivableMessages.{ChainIsHealthy, ChainIsStuck, GetNodeViewChanges, IsChainHealthy, ModifiersFromRemote}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import akka.actor.{Actor, ActorRef, ActorSystem, OneForOneStrategy, Props}
import org.ergoplatform.ErgoApp
import org.ergoplatform.ErgoApp.CriticalSystemException
import org.ergoplatform.modifiers.history.header.Header
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction}
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.modifiers.{BlockSection, ErgoFullBlock, NetworkObjectTypeId}
import org.ergoplatform.nodeView.history.ErgoHistory
import org.ergoplatform.nodeView.mempool.ErgoMemPool
import org.ergoplatform.nodeView.mempool.{ErgoMemPool, UnconfirmedTransaction}
import org.ergoplatform.nodeView.mempool.ErgoMemPool.ProcessingOutcome
import org.ergoplatform.nodeView.state._
import org.ergoplatform.nodeView.wallet.ErgoWallet
Expand Down
83 changes: 33 additions & 50 deletions src/main/scala/org/ergoplatform/nodeView/mempool/ErgoMemPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package org.ergoplatform.nodeView.mempool

import org.ergoplatform.ErgoBox.BoxId
import org.ergoplatform.mining.emission.EmissionRules
import org.ergoplatform.modifiers.mempool.{ErgoTransaction, UnconfirmedTransaction}
import org.ergoplatform.nodeView.mempool.OrderedTxPool.WeightedTxId
import org.ergoplatform.modifiers.mempool.ErgoTransaction
import org.ergoplatform.nodeView.mempool.ErgoMemPool.SortingOption
import org.ergoplatform.nodeView.state.{ErgoState, UtxoState}
import org.ergoplatform.settings.{ErgoSettings, MonetarySettings, NodeConfigurationSettings}
import scorex.core.transaction.state.TransactionValidation
import scorex.util.{ModifierId, ScorexLogging, bytesToId}
import OrderedTxPool.weighted
import org.ergoplatform.nodeView.mempool.ErgoMemPool.SortingOption
import scorex.util.{ModifierId, ScorexLogging}
import spire.syntax.all.cfor

import scala.annotation.tailrec
Expand All @@ -32,13 +30,9 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
(implicit settings: ErgoSettings)
extends ErgoMemPoolReader with ScorexLogging {

import ErgoMemPool._
import EmissionRules.CoinsInOneErgo
import ErgoMemPool._

/**
* When there's no reason to re-check transactions immediately, we assign fake cost to them
*/
private val FakeCost = 1000

private val nodeSettings: NodeConfigurationSettings = settings.nodeSettings
private implicit val monetarySettings: MonetarySettings = settings.chainSettings.monetary
Expand All @@ -49,8 +43,8 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
pool.get(modifierId).map(unconfirmedTx => unconfirmedTx.transaction)
}

override def contains(modifierId: ModifierId): Boolean = {
pool.contains(modifierId)
def contains(uTx: UnconfirmedTransaction): Boolean = {
pool.contains(uTx)
}

override def take(limit: Int): Iterable[UnconfirmedTransaction] = {
Expand Down Expand Up @@ -93,7 +87,7 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
* @return Success(updatedPool), if transaction successfully added to the pool, Failure(_) otherwise
*/
def put(unconfirmedTx: UnconfirmedTransaction): ErgoMemPool = {
val updatedPool = pool.put(unconfirmedTx, feeFactor(unconfirmedTx))
val updatedPool = pool.put(unconfirmedTx)
new ErgoMemPool(updatedPool, stats, sortingOption)
}

Expand All @@ -102,14 +96,21 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
}

private def updateStatsOnRemoval(tx: ErgoTransaction): MemPoolStatistics = {
val wtx = pool.transactionsRegistry.get(tx.id)
wtx.map(wgtx => stats.add(System.currentTimeMillis(), wgtx))
pool.get(tx.id).map(tx => stats.add(System.currentTimeMillis(), tx)(sortingOption))
.getOrElse(MemPoolStatistics(System.currentTimeMillis(), 0, System.currentTimeMillis()))
}

/**
* Remove transaction from the pool
* Remove unconfirmed transaction from the pool
*/
def remove(utx: UnconfirmedTransaction): ErgoMemPool = {
log.debug(s"Removing unconfirmed transaction ${utx.id} from the mempool")
new ErgoMemPool(pool.remove(utx), updateStatsOnRemoval(utx.transaction), sortingOption)
}

/**
* Remove transaction from the pool
*/
def remove(tx: ErgoTransaction): ErgoMemPool = {
log.debug(s"Removing transaction ${tx.id} from the mempool")
new ErgoMemPool(pool.remove(tx), updateStatsOnRemoval(tx), sortingOption)
Expand All @@ -133,14 +134,8 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
pool.get(unconfirmedTransactionId) match {
case Some(utx) => invalidate(utx)
case None =>
log.warn(s"pool.get failed for $unconfirmedTransactionId")
pool.orderedTransactions.valuesIterator.find(_.id == unconfirmedTransactionId) match {
case Some(utx) =>
invalidate(utx)
case None =>
log.warn(s"Can't invalidate transaction $unconfirmedTransactionId as it is not in the pool")
this
}
log.warn(s"Can't invalidate transaction $unconfirmedTransactionId as it is not in the pool")
this
}
}

Expand All @@ -154,14 +149,6 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
*/
override def spentInputs: Iterator[BoxId] = pool.inputs.keysIterator

private def feeFactor(unconfirmedTransaction: UnconfirmedTransaction): Int = {
sortingOption match {
case SortingOption.FeePerByte =>
unconfirmedTransaction.transactionBytes.map(_.length).getOrElse(unconfirmedTransaction.transaction.size)
case SortingOption.FeePerCycle =>
unconfirmedTransaction.lastCost.getOrElse(FakeCost)
}
}

// Check if transaction is double-spending inputs spent in the mempool.
// If so, the new transacting is replacing older ones if it has bigger weight (fee/byte) than them on average.
Expand All @@ -170,31 +157,30 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
validationStartTime: Long): (ErgoMemPool, ProcessingOutcome) = {
val tx = unconfirmedTransaction.transaction

val doubleSpendingWtxs = tx.inputs.flatMap { inp =>
val doubleSpendingIds = tx.inputs.flatMap { inp =>
pool.inputs.get(inp.boxId)
}.toSet

val feeF = feeFactor(unconfirmedTransaction)
implicit val so: SortingOption = sortingOption

if (doubleSpendingWtxs.nonEmpty) {
val ownWtx = weighted(tx, feeF)
val doubleSpendingTotalWeight = doubleSpendingWtxs.map(_.weight).sum / doubleSpendingWtxs.size
if (ownWtx.weight > doubleSpendingTotalWeight) {
val doubleSpendingTxs = doubleSpendingWtxs.map(wtx => pool.orderedTransactions(wtx)).toSeq
val p = pool.put(unconfirmedTransaction, feeF).remove(doubleSpendingTxs)
if (doubleSpendingIds.nonEmpty) {
val doubleSpendingTxs = doubleSpendingIds.map(pool.get(_).get).toSeq
val doubleSpendingTotalWeight = doubleSpendingTxs.map(_.weight).sum / doubleSpendingTxs.size
if (unconfirmedTransaction.weight > doubleSpendingTotalWeight) {
val p = pool.remove(doubleSpendingTxs).put(unconfirmedTransaction)
val updPool = new ErgoMemPool(p, stats, sortingOption)
updPool -> new ProcessingOutcome.Accepted(unconfirmedTransaction, validationStartTime)
} else {
this -> new ProcessingOutcome.DoubleSpendingLoser(doubleSpendingWtxs.map(_.id), validationStartTime)
this -> new ProcessingOutcome.DoubleSpendingLoser(doubleSpendingIds, validationStartTime)
}
} else {
val poolSizeLimit = nodeSettings.mempoolCapacity
if (pool.size == poolSizeLimit &&
weighted(tx, feeF).weight <= pool.orderedTransactions.lastKey.weight) {
unconfirmedTransaction.weight <= pool.orderedTransactions.last._2.weight) {
val exc = new Exception("Transaction pays less than any other in the pool being full")
this -> new ProcessingOutcome.Declined(exc, validationStartTime)
} else {
val updPool = new ErgoMemPool(pool.put(unconfirmedTransaction, feeF), stats, sortingOption)
val updPool = new ErgoMemPool(pool.put(unconfirmedTransaction), stats, sortingOption)
updPool -> new ProcessingOutcome.Accepted(unconfirmedTransaction, validationStartTime)
}
}
Expand Down Expand Up @@ -253,7 +239,7 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
acceptIfNoDoubleSpend(unconfirmedTx, validationStartTime)
}
} else {
val msg = if (this.contains(tx.id)) {
val msg = if (contains(unconfirmedTx)) {
s"Pool can not accept transaction ${tx.id}, it is already in the mempool"
} else if (pool.size == settings.nodeSettings.mempoolCapacity) {
s"Pool can not accept transaction ${tx.id}, the mempool is full"
Expand All @@ -272,7 +258,7 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
}
}

def weightedTransactionIds(limit: Int): Seq[WeightedTxId] = pool.orderedTransactions.keysIterator.take(limit).toSeq
def txTimesAndWeights: Seq[(Long,Long)] = getAll.map(uTx => uTx.createdTime -> uTx.weight(monetarySettings, sortingOption))

private def extractFee(tx: ErgoTransaction): Long = {
tx.outputs
Expand Down Expand Up @@ -309,13 +295,10 @@ class ErgoMemPool private[mempool](private[mempool] val pool: OrderedTxPool,
* @return average time for this transaction to be placed in block
*/
def getExpectedWaitTime(txFee : Long, txSize : Int): Long = {
// Create dummy transaction entry
val feePerKb = txFee * 1024 / txSize
val dummyModifierId = bytesToId(Array.fill(32)(0.toByte))
val wtx = WeightedTxId(dummyModifierId, feePerKb, feePerKb, 0)

// Find position of entry in mempool
val posInPool = pool.orderedTransactions.keySet.until(wtx).size
val posInPool = pool.orderedTransactions.dropWhile(_._2.weight(monetarySettings, sortingOption) > feePerKb).size

// Time since statistics measurement interval (needed to calculate average tx rate)
val elapsed = System.currentTimeMillis() - stats.startMeasurement
Expand Down Expand Up @@ -445,7 +428,7 @@ object ErgoMemPool extends ScorexLogging {
case SortingOption.FeePerCycle => log.info("Sorting mempool by fee-per-cycle")
}
new ErgoMemPool(
OrderedTxPool.empty(settings),
OrderedTxPool.empty(settings, sortingOption),
MemPoolStatistics(System.currentTimeMillis(), 0, System.currentTimeMillis()),
sortingOption
)(settings)
Expand Down
Loading