diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index b49f3ace5..1f069b1c9 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -180,13 +180,27 @@ func (txmp *TxMempool) Unlock() { // Size returns the number of valid transactions in the mempool. It is // thread-safe. func (txmp *TxMempool) Size() int { - return txmp.SizeWithoutPending() + txmp.PendingSize() + return txmp.NumTxsNotPending() + txmp.PendingSize() } -func (txmp *TxMempool) SizeWithoutPending() int { +func (txmp *TxMempool) NumTxsNotPending() int { return txmp.txStore.Size() } +func (txmp *TxMempool) BytesNotPending() int64 { + txmp.txStore.mtx.RLock() + defer txmp.txStore.mtx.RUnlock() + totalBytes := int64(0) + for _, wrappedTx := range txmp.txStore.hashTxs { + totalBytes += int64(len(wrappedTx.tx)) + } + return totalBytes +} + +func (txmp *TxMempool) TotalTxsBytesSize() int64 { + return txmp.BytesNotPending() + int64(txmp.pendingTxs.sizeBytes) +} + // PendingSize returns the number of pending transactions in the mempool. func (txmp *TxMempool) PendingSize() int { return txmp.pendingTxs.Size() @@ -427,7 +441,7 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { ) var txs []types.Tx - if uint64(txmp.SizeWithoutPending()) < txmp.config.TxNotifyThreshold { + if uint64(txmp.NumTxsNotPending()) < txmp.config.TxNotifyThreshold { // do not reap anything if threshold is not met return txs } @@ -549,7 +563,8 @@ func (txmp *TxMempool) Update( } } - txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending())) + txmp.metrics.Size.Set(float64(txmp.NumTxsNotPending())) + txmp.metrics.TotalTxsSizeBytes.Set(float64(txmp.TotalTxsBytesSize())) txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) return nil } @@ -676,7 +691,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck "priority", wtx.priority, "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "height", txmp.height, - "num_txs", txmp.SizeWithoutPending(), + "num_txs", txmp.NumTxsNotPending(), ) txmp.notifyTxsAvailable() } @@ -772,13 +787,14 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT if txmp.recheckCursor == nil { txmp.logger.Debug("finished rechecking transactions") - if txmp.SizeWithoutPending() > 0 { + if txmp.NumTxsNotPending() > 0 { txmp.notifyTxsAvailable() } } - txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending())) + txmp.metrics.Size.Set(float64(txmp.NumTxsNotPending())) txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) + txmp.metrics.TotalTxsSizeBytes.Set(float64(txmp.TotalTxsBytesSize())) } // updateReCheckTxs updates the recheck cursors using the gossipIndex. For @@ -830,7 +846,7 @@ func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) { // the transaction can be inserted into the mempool. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { var ( - numTxs = txmp.SizeWithoutPending() + numTxs = txmp.NumTxsNotPending() sizeBytes = txmp.SizeBytes() ) @@ -869,9 +885,10 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool { if !inserted { return false } - txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) - txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending())) + txmp.metrics.TxSizeBytes.Add(float64(wtx.Size())) + txmp.metrics.Size.Set(float64(txmp.NumTxsNotPending())) txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) + txmp.metrics.TotalTxsSizeBytes.Set(float64(txmp.TotalTxsBytesSize())) if replacedTx != nil { txmp.removeTx(replacedTx, true, false, false) @@ -1013,7 +1030,7 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } func (txmp *TxMempool) notifyTxsAvailable() { - if txmp.SizeWithoutPending() == 0 { + if txmp.NumTxsNotPending() == 0 { return } diff --git a/internal/mempool/metrics.gen.go b/internal/mempool/metrics.gen.go index cfff9ad4c..c9dea0810 100644 --- a/internal/mempool/metrics.gen.go +++ b/internal/mempool/metrics.gen.go @@ -26,13 +26,17 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "pending_size", Help: "Number of pending transactions in mempool", }, labels).With(labelsAndValues...), - TxSizeBytes: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + TxSizeBytes: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, Name: "tx_size_bytes", - Help: "Histogram of transaction sizes in bytes.", - - Buckets: stdprometheus.ExponentialBuckets(1, 3, 7), + Help: "Accumulated transaction sizes in bytes.", + }, labels).With(labelsAndValues...), + TotalTxsSizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "total_txs_size_bytes", + Help: "Total current mempool uncommitted txs bytes", }, labels).With(labelsAndValues...), FailedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ Namespace: namespace, @@ -81,15 +85,16 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { func NopMetrics() *Metrics { return &Metrics{ - Size: discard.NewGauge(), - PendingSize: discard.NewGauge(), - TxSizeBytes: discard.NewHistogram(), - FailedTxs: discard.NewCounter(), - RejectedTxs: discard.NewCounter(), - EvictedTxs: discard.NewCounter(), - ExpiredTxs: discard.NewCounter(), - RecheckTimes: discard.NewCounter(), - RemovedTxs: discard.NewCounter(), - InsertedTxs: discard.NewCounter(), + Size: discard.NewGauge(), + PendingSize: discard.NewGauge(), + TxSizeBytes: discard.NewCounter(), + TotalTxsSizeBytes: discard.NewGauge(), + FailedTxs: discard.NewCounter(), + RejectedTxs: discard.NewCounter(), + EvictedTxs: discard.NewCounter(), + ExpiredTxs: discard.NewCounter(), + RecheckTimes: discard.NewCounter(), + RemovedTxs: discard.NewCounter(), + InsertedTxs: discard.NewCounter(), } } diff --git a/internal/mempool/metrics.go b/internal/mempool/metrics.go index fb68f7918..84cb95f5d 100644 --- a/internal/mempool/metrics.go +++ b/internal/mempool/metrics.go @@ -21,8 +21,11 @@ type Metrics struct { // Number of pending transactions in mempool PendingSize metrics.Gauge - // Histogram of transaction sizes in bytes. - TxSizeBytes metrics.Histogram `metrics_buckettype:"exp" metrics_bucketsizes:"1,3,7"` + // Accumulated transaction sizes in bytes. + TxSizeBytes metrics.Counter + + // Total current mempool uncommitted txs bytes + TotalTxsSizeBytes metrics.Gauge // Number of failed transactions. FailedTxs metrics.Counter