Skip to content

Commit

Permalink
Txpool optimization: disable cache if --mine is not enabled (#245)
Browse files Browse the repository at this point in the history
Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com>
Co-authored-by: andyzhang2023 <andyzhang2023@gmail.com>
  • Loading branch information
3 people authored Jan 7, 2025
1 parent 84232cb commit 1168abd
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 100 deletions.
3 changes: 0 additions & 3 deletions accounts/abi/bind/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func TestWaitDeployed(t *testing.T) {

// Send and mine the transaction.
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()

select {
Expand Down Expand Up @@ -118,7 +117,6 @@ func TestWaitDeployedCornerCases(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
backend.Commit()
notContractCreation := errors.New("tx is not contract creation")
if _, err := bind.WaitDeployed(ctx, backend.Client(), tx); err.Error() != notContractCreation.Error() {
Expand All @@ -137,6 +135,5 @@ func TestWaitDeployedCornerCases(t *testing.T) {
}()

backend.Client().SendTransaction(ctx, tx)
time.Sleep(500 * time.Millisecond) //wait for the tx to be mined
cancel()
}
4 changes: 4 additions & 0 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
cfg.Eth.OverrideVerkle = &v
}

if ctx.Bool(utils.MiningEnabledFlag.Name) {
cfg.Eth.TxPool.EnableCache = true
}

backend, eth := utils.RegisterEthService(stack, &cfg.Eth)

// Create gauge with geth system and build information
Expand Down
63 changes: 13 additions & 50 deletions core/txpool/legacypool/cache_for_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,35 @@ import (
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

var (
pendingCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/pending/cache", nil)
localCacheGauge = metrics.NewRegisteredGauge("txpool/legacypool/local/cache", nil)
)

type pendingCache interface {
add(types.Transactions, types.Signer)
del(types.Transactions, types.Signer)
dump() map[common.Address]types.Transactions
markLocal(common.Address)
flattenLocals() []common.Address
}

// copy of pending transactions
type cacheForMiner struct {
txLock sync.Mutex
pending map[common.Address]map[*types.Transaction]struct{}
locals map[common.Address]bool
addrLock sync.Mutex

allCache map[common.Address][]*txpool.LazyTransaction
filteredCache map[common.Address][]*txpool.LazyTransaction
cacheLock sync.Mutex
}

func newCacheForMiner() *cacheForMiner {
return &cacheForMiner{
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
allCache: make(map[common.Address][]*txpool.LazyTransaction),
filteredCache: make(map[common.Address][]*txpool.LazyTransaction),
pending: make(map[common.Address]map[*types.Transaction]struct{}),
locals: make(map[common.Address]bool),
}
}

Expand Down Expand Up @@ -75,9 +75,8 @@ func (pc *cacheForMiner) del(txs types.Transactions, signer types.Signer) {
}
}

func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs types.Transactions, addr common.Address) types.Transactions) {
func (pc *cacheForMiner) dump() map[common.Address]types.Transactions {
pending := make(map[common.Address]types.Transactions)

pc.txLock.Lock()
for addr, txlist := range pc.pending {
pending[addr] = make(types.Transactions, 0, len(txlist))
Expand All @@ -86,46 +85,10 @@ func (pc *cacheForMiner) sync2cache(pool txpool.LazyResolver, filter func(txs ty
}
}
pc.txLock.Unlock()

// convert pending to lazyTransactions
filteredLazy := make(map[common.Address][]*txpool.LazyTransaction)
allLazy := make(map[common.Address][]*txpool.LazyTransaction)
for addr, txs := range pending {
for _, txs := range pending {
// sorted by nonce
sort.Sort(types.TxByNonce(txs))
filterd := filter(txs, addr)
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i, tx := range txs {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: tx.Hash(),
Tx: tx,
Time: tx.Time(),
GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
GasTipCap: uint256.MustFromBig(tx.GasTipCap()),
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
}
}
allLazy[addr] = lazies
filteredLazy[addr] = lazies[:len(filterd)]
}
}

pc.cacheLock.Lock()
pc.filteredCache = filteredLazy
pc.allCache = allLazy
pc.cacheLock.Unlock()
}

func (pc *cacheForMiner) dump(filtered bool) map[common.Address][]*txpool.LazyTransaction {
pc.cacheLock.Lock()
pending := pc.allCache
if filtered {
pending = pc.filteredCache
}
pc.cacheLock.Unlock()
return pending
}

Expand All @@ -136,7 +99,7 @@ func (pc *cacheForMiner) markLocal(addr common.Address) {
pc.locals[addr] = true
}

func (pc *cacheForMiner) IsLocal(addr common.Address) bool {
func (pc *cacheForMiner) isLocal(addr common.Address) bool {
pc.addrLock.Lock()
defer pc.addrLock.Unlock()
return pc.locals[addr]
Expand Down
91 changes: 49 additions & 42 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ type BlockChain interface {

// Config are the configuration parameters of the transaction pool.
type Config struct {
EnableCache bool // enable pending cache for mining. Set as true only --mine option is enabled

Locals []common.Address // Addresses that should be treated by default as local
NoLocals bool // Whether local transaction handling should be disabled
Journal string // Journal of local transactions to survive node restarts
Expand Down Expand Up @@ -236,6 +238,12 @@ func (config *Config) sanitize() Config {
log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute)
conf.ReannounceTime = time.Minute
}
// log to inform user if the cache is enabled or not
if conf.EnableCache {
log.Info("legacytxpool Pending Cache is enabled")
} else {
log.Info("legacytxpool Pending Cache is disabled")
}
return conf
}

Expand Down Expand Up @@ -270,7 +278,7 @@ type LegacyPool struct {
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price

pendingCache *cacheForMiner //pending list cache for miner
pendingCache pendingCache //pending list cache for miner

reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
Expand Down Expand Up @@ -313,6 +321,9 @@ func New(config Config, chain BlockChain) *LegacyPool {
initDoneCh: make(chan struct{}),
pendingCache: newCacheForMiner(),
}
if !config.EnableCache {
pool.pendingCache = newNoneCacheForMiner(pool)
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
Expand Down Expand Up @@ -349,9 +360,6 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
// Set the basic pool parameters
pool.gasTip.Store(uint256.NewInt(gasTip))

// set dumper
pool.pendingCache.sync2cache(pool, pool.createFilter(pool.gasTip.Load().ToBig(), head.BaseFee))

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available (might occur when node is not
// fully synced).
Expand Down Expand Up @@ -386,27 +394,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
}
pool.wg.Add(1)
go pool.loop()
go pool.loopOfSync()
return nil
}

func (pool *LegacyPool) loopOfSync() {
ticker := time.NewTicker(400 * time.Millisecond)
for {
select {
case <-pool.reorgShutdownCh:
return
case <-ticker.C:
gasTip := pool.gasTip.Load()
currHead := pool.currentHead.Load()
if gasTip == nil || currHead == nil {
continue
}
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), currHead.BaseFee))
}
}
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -645,35 +635,56 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
empty := txpool.PendingFilter{}
if filter == empty {
// return all pending transactions, no filtering
return pool.pendingCache.dump(false)
}
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
return nil
}
defer func(t0 time.Time) {
getPendingDurationTimer.Update(time.Since(t0))
}(time.Now())
// It is a bit tricky here, we don't do the filtering here.
return pool.pendingCache.dump(true)
}

func (pool *LegacyPool) createFilter(gasPrice, baseFee *big.Int) func(txs types.Transactions, addr common.Address) types.Transactions {
return func(txs types.Transactions, addr common.Address) types.Transactions {
if !pool.pendingCache.IsLocal(addr) {
// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int
)
if filter.MinTip != nil {
minTipBig = filter.MinTip.ToBig()
}
if filter.BaseFee != nil {
baseFeeBig = filter.BaseFee.ToBig()
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, txs := range pool.pendingCache.dump() {

// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(gasPrice, baseFee) < 0 {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i]
break
}
}
}
return txs
if len(txs) > 0 {
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: uint256.MustFromBig(txs[i].GasFeeCap()),
GasTipCap: uint256.MustFromBig(txs[i].GasTipCap()),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
}
}
return pending
}

// Locals retrieves the accounts currently considered local by the pool.
Expand Down Expand Up @@ -1469,10 +1480,6 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.priced.SetBaseFee(pendingBaseFee)
}
}
gasTip, baseFee := pool.gasTip.Load(), pendingBaseFee
go func() {
pool.pendingCache.sync2cache(pool, pool.createFilter(gasTip.ToBig(), baseFee))
}()
// Update all accounts to the latest known pending nonce
nonces := make(map[common.Address]uint64, len(pool.pending))
for addr, list := range pool.pending {
Expand Down
26 changes: 25 additions & 1 deletion core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ var (
// sideeffects used during testing.
testTxPoolConfig Config

//
testTxPoolConfigEnableCache Config

enableCache bool

// eip1559Config is a chain config with EIP-1559 enabled at block 0.
eip1559Config *params.ChainConfig
)
Expand All @@ -55,6 +60,10 @@ func init() {
testTxPoolConfig = DefaultConfig
testTxPoolConfig.Journal = ""

testTxPoolConfigEnableCache = DefaultConfig
testTxPoolConfigEnableCache.Journal = ""
testTxPoolConfigEnableCache.EnableCache = true

cpy := *params.TestChainConfig
eip1559Config = &cpy
eip1559Config.BerlinBlock = common.Big0
Expand Down Expand Up @@ -163,7 +172,12 @@ func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.Privat
blockchain := newTestBlockChain(config, 10000000, statedb, new(event.Feed))

key, _ := crypto.GenerateKey()
pool := New(testTxPoolConfig, blockchain)
var pool *LegacyPool
if enableCache {
pool = New(testTxPoolConfigEnableCache, blockchain)
} else {
pool = New(testTxPoolConfig, blockchain)
}
if err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()); err != nil {
panic(err)
}
Expand Down Expand Up @@ -1534,12 +1548,22 @@ func TestMinGasPriceEnforced(t *testing.T) {
}
}

func TestRepricingDynamicFeeEnableCache(t *testing.T) {
enableCache = true
repricingDynamicFee(t)
enableCache = false
}

// Tests that setting the transaction pool gas price to a higher value correctly
// discards everything cheaper (legacy & dynamic fee) than that and moves any
// gapped transactions back from the pending pool to the queue.
//
// Note, local transactions are never allowed to be dropped.
func TestRepricingDynamicFee(t *testing.T) {
repricingDynamicFee(t)
}

func repricingDynamicFee(t *testing.T) {
t.Parallel()

// Create the pool to test the pricing enforcement with
Expand Down
Loading

0 comments on commit 1168abd

Please sign in to comment.