Skip to content

Commit

Permalink
Merge pull request #647 from nspcc-dev/fix-mempool-and-chain-locking
Browse files Browse the repository at this point in the history
Fix mempool and chain locking

This allows us easily make 1000 Tx/s in 4-nodes privnet, fixes potential
double spends and improves mempool testing coverage.
  • Loading branch information
roman-khimov authored Feb 6, 2020
2 parents 1381196 + 7445655 commit ab14a46
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 243 deletions.
6 changes: 5 additions & 1 deletion pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,11 @@ func (s *service) processBlock(b block.Block) {
bb.Script = *(s.getBlockWitness(bb))

if err := s.Chain.AddBlock(bb); err != nil {
s.log.Warn("error on add block", zap.Error(err))
// The block might already be added via the regular network
// interaction.
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
s.log.Warn("error on add block", zap.Error(err))
}
} else {
s.Config.RelayBlock(bb)
}
Expand Down
15 changes: 4 additions & 11 deletions pkg/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/CityOfZion/neo-go/config"
"github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/core/mempool"
"github.com/CityOfZion/neo-go/pkg/core/storage"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/crypto/keys"
Expand All @@ -22,8 +21,7 @@ func TestNewService(t *testing.T) {
Type: transaction.MinerType,
Data: &transaction.MinerTX{Nonce: 12345},
}
item := mempool.NewPoolItem(tx, new(feer))
srv.Chain.GetMemPool().TryAdd(tx.Hash(), item)
require.NoError(t, srv.Chain.PoolTx(tx))

var txx []block.Transaction
require.NotPanics(t, func() { txx = srv.getVerifiedTx(1) })
Expand All @@ -40,10 +38,7 @@ func TestService_GetVerified(t *testing.T) {
newMinerTx(3),
newMinerTx(4),
}
pool := srv.Chain.GetMemPool()
item := mempool.NewPoolItem(txs[3], new(feer))

require.True(t, pool.TryAdd(txs[3].Hash(), item))
require.NoError(t, srv.Chain.PoolTx(txs[3]))

hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()}

Expand All @@ -68,8 +63,7 @@ func TestService_GetVerified(t *testing.T) {

t.Run("more than half of the last proposal will be reused", func(t *testing.T) {
for _, tx := range txs[:2] {
item := mempool.NewPoolItem(tx, new(feer))
require.True(t, pool.TryAdd(tx.Hash(), item))
require.NoError(t, srv.Chain.PoolTx(tx))
}

txx := srv.getVerifiedTx(10)
Expand Down Expand Up @@ -119,8 +113,7 @@ func TestService_getTx(t *testing.T) {

require.Equal(t, nil, srv.getTx(h))

item := mempool.NewPoolItem(tx, new(feer))
srv.Chain.GetMemPool().TryAdd(h, item)
require.NoError(t, srv.Chain.PoolTx(tx))

got := srv.getTx(h)
require.NotNil(t, got)
Expand Down
107 changes: 97 additions & 10 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -41,6 +42,15 @@ const (
defaultMemPoolSize = 50000
)

var (
// ErrAlreadyExists is returned when trying to add some already existing
// transaction into the pool (not specifying whether it exists in the
// chain or mempool).
ErrAlreadyExists = errors.New("already exists")
// ErrOOM is returned when adding transaction to the memory pool because
// it reached its full capacity.
ErrOOM = errors.New("no space left in the memory pool")
)
var (
genAmount = []int{8, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
decrementInterval = 2000000
Expand All @@ -51,6 +61,19 @@ var (
type Blockchain struct {
config config.ProtocolConfiguration

// The only way chain state changes is by adding blocks, so we can't
// allow concurrent block additions. It differs from the next lock in
// that it's only for AddBlock method itself, the chain state is
// protected by the lock below, but holding it during all of AddBlock
// is too expensive (because the state only changes when persisting
// change cache).
addLock sync.Mutex

// This lock ensures blockchain immutability for operations that need
// that while performing their tasks. It's mostly used as a read lock
// with the only writer being the block addition logic.
lock sync.RWMutex

// Data access object for CRUD operations around storage.
dao *dao

Expand Down Expand Up @@ -251,6 +274,9 @@ func (bc *Blockchain) Close() {
// AddBlock accepts successive block for the Blockchain, verifies it and
// stores internally. Eventually it will be persisted to the backing storage.
func (bc *Blockchain) AddBlock(block *block.Block) error {
bc.addLock.Lock()
defer bc.addLock.Unlock()

expectedHeight := bc.BlockHeight() + 1
if expectedHeight != block.Index {
return fmt.Errorf("expected block %d, but passed block %d", expectedHeight, block.Index)
Expand Down Expand Up @@ -575,16 +601,17 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
}
}
}
bc.lock.Lock()
defer bc.lock.Unlock()

_, err := cache.Persist()
if err != nil {
return err
}
bc.topBlock.Store(block)
atomic.StoreUint32(&bc.blockHeight, block.Index)
updateBlockHeightMetric(block.Index)
for _, tx := range block.Transactions {
bc.memPool.Remove(tx.Hash())
}
bc.memPool.RemoveStale(bc.isTxStillRelevant)
return nil
}

Expand Down Expand Up @@ -963,8 +990,8 @@ func (bc *Blockchain) IsLowPriority(t *transaction.Transaction) bool {
}

// GetMemPool returns the memory pool of the blockchain.
func (bc *Blockchain) GetMemPool() mempool.Pool {
return bc.memPool
func (bc *Blockchain) GetMemPool() *mempool.Pool {
return &bc.memPool
}

// VerifyBlock verifies block against its current state.
Expand All @@ -982,11 +1009,8 @@ func (bc *Blockchain) VerifyBlock(block *block.Block) error {
return bc.verifyBlockWitnesses(block, prevHeader)
}

// VerifyTx verifies whether a transaction is bonafide or not. Block parameter
// is used for easy interop access and can be omitted for transactions that are
// not yet added into any block.
// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270).
func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error {
// verifyTx verifies whether a transaction is bonafide or not.
func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) error {
if io.GetVarSize(t) > transaction.MaxTransactionSize {
return errors.Errorf("invalid transaction size = %d. It shoud be less then MaxTransactionSize = %d", io.GetVarSize(t), transaction.MaxTransactionSize)
}
Expand Down Expand Up @@ -1017,6 +1041,69 @@ func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) e
return bc.verifyTxWitnesses(t, block)
}

// isTxStillRelevant is a callback for mempool transaction filtering after the
// new block addition. It returns false for transactions already present in the
// chain (added by the new block), transactions using some inputs that are
// already used (double spends) and does witness reverification for non-standard
// contracts. It operates under the assumption that full transaction verification
// was already done so we don't need to check basic things like size, input/output
// correctness, etc.
func (bc *Blockchain) isTxStillRelevant(t *transaction.Transaction) bool {
var recheckWitness bool

if bc.dao.HasTransaction(t.Hash()) {
return false
}
if bc.dao.IsDoubleSpend(t) {
return false
}
for i := range t.Scripts {
if !vm.IsStandardContract(t.Scripts[i].VerificationScript) {
recheckWitness = true
break
}
}
if recheckWitness {
return bc.verifyTxWitnesses(t, nil) == nil
}
return true

}

// VerifyTx verifies whether a transaction is bonafide or not. Block parameter
// is used for easy interop access and can be omitted for transactions that are
// not yet added into any block.
// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270).
func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error {
bc.lock.RLock()
defer bc.lock.RUnlock()
return bc.verifyTx(t, block)
}

// PoolTx verifies and tries to add given transaction into the mempool.
func (bc *Blockchain) PoolTx(t *transaction.Transaction) error {
bc.lock.RLock()
defer bc.lock.RUnlock()

if bc.HasTransaction(t.Hash()) {
return ErrAlreadyExists
}
if err := bc.verifyTx(t, nil); err != nil {
return err
}
if err := bc.memPool.Add(t, bc); err != nil {
switch err {
case mempool.ErrOOM:
return ErrOOM
case mempool.ErrConflict:
return ErrAlreadyExists
default:
return err
}
}
return nil
}

func (bc *Blockchain) verifyInputs(t *transaction.Transaction) bool {
for i := 1; i < len(t.Inputs); i++ {
for j := 0; j < i; j++ {
Expand Down
3 changes: 2 additions & 1 deletion pkg/core/blockchainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Blockchainer interface {
GetUnspentCoinState(util.Uint256) *UnspentCoinState
References(t *transaction.Transaction) map[transaction.Input]*transaction.Output
mempool.Feer // fee interface
PoolTx(*transaction.Transaction) error
VerifyTx(*transaction.Transaction, *block.Block) error
GetMemPool() mempool.Pool
GetMemPool() *mempool.Pool
}
Loading

0 comments on commit ab14a46

Please sign in to comment.