Skip to content

Commit

Permalink
Merge pull request #3110 from nspcc-dev/fix-tests
Browse files Browse the repository at this point in the history
Fix race caused by design of native Neo validators/committee cache
  • Loading branch information
roman-khimov authored Oct 10, 2023
2 parents 19c59c3 + d964420 commit 8fcf1ee
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 77 deletions.
22 changes: 9 additions & 13 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Ledger interface {
GetNextBlockValidators() ([]*keys.PublicKey, error)
GetStateRoot(height uint32) (*state.MPTRoot, error)
GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error)
GetValidators() ([]*keys.PublicKey, error)
ComputeNextBlockValidators() []*keys.PublicKey
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
SubscribeForBlocks(ch chan *coreb.Block)
UnsubscribeFromBlocks(ch chan *coreb.Block)
Expand Down Expand Up @@ -677,9 +677,15 @@ func (s *service) getValidators(txes ...block.Transaction) []crypto.PublicKey {
err error
)
if txes == nil {
// getValidators with empty args is used by dbft to fill the list of
// block's validators, thus should return validators from the current
// epoch without recalculation.
pKeys, err = s.Chain.GetNextBlockValidators()
} else {
pKeys, err = s.Chain.GetValidators()
// getValidators with non-empty args is used by dbft to fill block's
// NextConsensus field, ComputeNextBlockValidators will return proper
// value for NextConsensus wrt dBFT epoch start/end.
pKeys = s.Chain.ComputeNextBlockValidators()
}
if err != nil {
s.log.Error("error while trying to get validators", zap.Error(err))
Expand Down Expand Up @@ -721,17 +727,7 @@ func (s *service) newBlockFromContext(ctx *dbft.Context) block.Block {
block.PrevStateRoot = sr.Root
}

var validators keys.PublicKeys
var err error
cfg := s.Chain.GetConfig().ProtocolConfiguration
if cfg.ShouldUpdateCommitteeAt(ctx.BlockIndex) {
validators, err = s.Chain.GetValidators()
} else {
validators, err = s.Chain.GetNextBlockValidators()
}
if err != nil {
s.log.Fatal(fmt.Sprintf("failed to get validators: %s", err.Error()))
}
var validators = s.Chain.ComputeNextBlockValidators()
script, err := smartcontract.CreateDefaultMultiSigRedeemScript(validators)
if err != nil {
s.log.Fatal(fmt.Sprintf("failed to create multisignature script: %s", err.Error()))
Expand Down
23 changes: 17 additions & 6 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2697,12 +2697,23 @@ func (bc *Blockchain) GetCommittee() (keys.PublicKeys, error) {
return pubs, nil
}

// GetValidators returns current validators.
func (bc *Blockchain) GetValidators() ([]*keys.PublicKey, error) {
return bc.contracts.NEO.ComputeNextBlockValidators(bc.blockHeight, bc.dao)
}

// GetNextBlockValidators returns next block validators.
// ComputeNextBlockValidators returns current validators. Validators list
// returned from this method is updated once per CommitteeSize number of blocks.
// For the last block in the dBFT epoch this method returns the list of validators
// recalculated from the latest relevant information about NEO votes; in this case
// list of validators may differ from the one returned by GetNextBlockValidators.
// For the not-last block of dBFT epoch this method returns the same list as
// GetNextBlockValidators.
func (bc *Blockchain) ComputeNextBlockValidators() []*keys.PublicKey {
return bc.contracts.NEO.ComputeNextBlockValidators(bc.dao)
}

// GetNextBlockValidators returns next block validators. Validators list returned
// from this method is the sorted top NumOfCNs number of public keys from the
// committee of the current dBFT round (that was calculated once for the
// CommitteeSize number of blocks), thus, validators list returned from this
// method is being updated once per (committee size) number of blocks, but not
// every block.
func (bc *Blockchain) GetNextBlockValidators() ([]*keys.PublicKey, error) {
return bc.contracts.NEO.GetNextBlockValidatorsInternal(bc.dao), nil
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/core/blockchain_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,7 @@ func TestChainWithVolatileNumOfValidators(t *testing.T) {

priv0 := testchain.PrivateKeyByID(0)

vals, err := bc.GetValidators()
require.NoError(t, err)
vals := bc.ComputeNextBlockValidators()
script, err := smartcontract.CreateDefaultMultiSigRedeemScript(vals)
require.NoError(t, err)
curWit := transaction.Witness{
Expand All @@ -280,7 +279,7 @@ func TestChainWithVolatileNumOfValidators(t *testing.T) {
}
// Mimic consensus.
if bc.config.ShouldUpdateCommitteeAt(uint32(i)) {
vals, err = bc.GetValidators()
vals = bc.ComputeNextBlockValidators()
} else {
vals, err = bc.GetNextBlockValidators()
}
Expand Down
164 changes: 114 additions & 50 deletions pkg/core/native/native_neo.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,26 @@ type NeoCache struct {

votesChanged bool
nextValidators keys.PublicKeys
validators keys.PublicKeys
// newEpochNextValidators contains cached next block newEpochNextValidators. This list is updated once
// per dBFT epoch in PostPersist of the last block in the epoch if candidates
// votes ratio has been changed or register/unregister operation was performed
// within the last processed epoch. The updated value is being persisted
// following the standard layered DAO persist rules, so that external users
// will get the proper value with upper Blockchain's DAO (but this value is
// relevant only by the moment of first epoch block creation).
newEpochNextValidators keys.PublicKeys
// committee contains cached committee members and their votes.
// It is updated once in a while depending on committee size
// (every 28 blocks for mainnet). It's value
// is always equal to the value stored by `prefixCommittee`.
committee keysWithVotes
// newEpochCommittee contains cached committee members updated once per dBFT
// epoch in PostPersist of the last block in the epoch.
newEpochCommittee keysWithVotes
// committeeHash contains the script hash of the committee.
committeeHash util.Uint160
// newEpochCommitteeHash contains the script hash of the newEpochCommittee.
newEpochCommitteeHash util.Uint160

// gasPerVoteCache contains the last updated value of GAS per vote reward for candidates.
// It is set in state-modifying methods only and read in `PostPersist`, thus is not protected
Expand Down Expand Up @@ -125,11 +137,13 @@ func (c *NeoCache) Copy() dao.NativeContractCache {
func copyNeoCache(src, dst *NeoCache) {
dst.votesChanged = src.votesChanged
// Can safely omit copying because the new array is created each time
// validators list, nextValidators and committee are updated.
// newEpochNextValidators list, nextValidators and committee are updated.
dst.nextValidators = src.nextValidators
dst.validators = src.validators
dst.committee = src.committee
dst.committeeHash = src.committeeHash
dst.newEpochNextValidators = src.newEpochNextValidators
dst.newEpochCommittee = src.newEpochCommittee
dst.newEpochCommitteeHash = src.newEpochCommitteeHash

dst.registerPrice = src.registerPrice

Expand Down Expand Up @@ -300,6 +314,11 @@ func (n *NEO) Initialize(ic *interop.Context) error {
setIntWithKey(n.ID, ic.DAO, []byte{prefixRegisterPrice}, DefaultRegisterPrice)
cache.registerPrice = int64(DefaultRegisterPrice)

var numOfCNs = n.cfg.GetNumOfCNs(ic.Block.Index + 1)
err = n.updateCachedNewEpochValues(ic.DAO, cache, ic.BlockHeight(), numOfCNs)
if err != nil {
return fmt.Errorf("failed to update next block newEpoch* cache: %w", err)
}
return nil
}

Expand All @@ -325,6 +344,23 @@ func (n *NEO) InitializeCache(blockHeight uint32, d *dao.Simple) error {
cache.gasPerBlock = n.getSortedGASRecordFromDAO(d)
cache.registerPrice = getIntWithKey(n.ID, d, []byte{prefixRegisterPrice})

// Update newEpoch* cache for external users. It holds values for the previous
// dBFT epoch if the current one isn't yet finished.
if n.cfg.ShouldUpdateCommitteeAt(blockHeight + 1) {
var numOfCNs = n.cfg.GetNumOfCNs(blockHeight + 1)
err := n.updateCachedNewEpochValues(d, cache, blockHeight, numOfCNs)
if err != nil {
return fmt.Errorf("failed to update next block newEpoch* cache: %w", err)
}
} else {
// nextValidators, committee and committee hash are filled in by this moment
// via n.updateCache call.
cache.newEpochNextValidators = cache.nextValidators.Copy()
cache.newEpochCommittee = make(keysWithVotes, len(cache.committee))
copy(cache.newEpochCommittee, cache.committee)
cache.newEpochCommitteeHash = cache.committeeHash
}

d.SetCache(n.ID, cache)
return nil
}
Expand All @@ -340,7 +376,7 @@ func (n *NEO) initConfigCache(cfg config.ProtocolConfiguration) error {
func (n *NEO) updateCache(cache *NeoCache, cvs keysWithVotes, blockHeight uint32) error {
cache.committee = cvs

var committee = getCommitteeMembers(cache)
var committee = getCommitteeMembers(cache.committee)
script, err := smartcontract.CreateMajorityMultiSigRedeemScript(committee.Copy())
if err != nil {
return err
Expand All @@ -353,38 +389,44 @@ func (n *NEO) updateCache(cache *NeoCache, cvs keysWithVotes, blockHeight uint32
return nil
}

func (n *NEO) updateCommittee(cache *NeoCache, ic *interop.Context) error {
if !cache.votesChanged {
// We need to put in storage anyway, as it affects dumps
ic.DAO.PutStorageItem(n.ID, prefixCommittee, cache.committee.Bytes(ic.DAO.GetItemCtx()))
return nil
// updateCachedNewEpochValues sets newEpochNextValidators, newEpochCommittee and
// newEpochCommitteeHash cache that will be used by external users to retrieve
// next block validators list of the next dBFT epoch that wasn't yet started and
// will be used by corresponding values initialisation on the next epoch start.
// The updated new epoch cached values computed using the persisted blocks state
// of the latest epoch.
func (n *NEO) updateCachedNewEpochValues(d *dao.Simple, cache *NeoCache, blockHeight uint32, numOfCNs int) error {
committee, cvs, err := n.computeCommitteeMembers(blockHeight, d)
if err != nil {
return fmt.Errorf("failed to compute committee members: %w", err)
}
cache.newEpochCommittee = cvs

_, cvs, err := n.computeCommitteeMembers(ic.BlockHeight(), ic.DAO)
script, err := smartcontract.CreateMajorityMultiSigRedeemScript(committee.Copy())
if err != nil {
return err
}
if err := n.updateCache(cache, cvs, ic.BlockHeight()); err != nil {
return err
}
cache.votesChanged = false
ic.DAO.PutStorageItem(n.ID, prefixCommittee, cvs.Bytes(ic.DAO.GetItemCtx()))
cache.newEpochCommitteeHash = hash.Hash160(script)

nextVals := committee[:numOfCNs].Copy()
sort.Sort(nextVals)
cache.newEpochNextValidators = nextVals
return nil
}

// OnPersist implements the Contract interface.
func (n *NEO) OnPersist(ic *interop.Context) error {
if n.cfg.ShouldUpdateCommitteeAt(ic.Block.Index) {
cache := ic.DAO.GetRWCache(n.ID).(*NeoCache)
oldKeys := cache.nextValidators
oldCom := cache.committee
if n.cfg.GetNumOfCNs(ic.Block.Index) != len(oldKeys) ||
n.cfg.GetCommitteeSize(ic.Block.Index) != len(oldCom) {
cache.votesChanged = true
}
if err := n.updateCommittee(cache, ic); err != nil {
return err
}
// Cached newEpoch* values always have proper value set (either by PostPersist
// during the last epoch block handling or by initialization code).
cache.nextValidators = cache.newEpochNextValidators
cache.committee = cache.newEpochCommittee
cache.committeeHash = cache.newEpochCommitteeHash
cache.votesChanged = false

// We need to put in storage anyway, as it affects dumps
ic.DAO.PutStorageItem(n.ID, prefixCommittee, cache.committee.Bytes(ic.DAO.GetItemCtx()))
}
return nil
}
Expand All @@ -393,12 +435,13 @@ func (n *NEO) OnPersist(ic *interop.Context) error {
func (n *NEO) PostPersist(ic *interop.Context) error {
gas := n.GetGASPerBlock(ic.DAO, ic.Block.Index)
cache := ic.DAO.GetROCache(n.ID).(*NeoCache)
pubs := getCommitteeMembers(cache)
pubs := getCommitteeMembers(cache.committee)
committeeSize := n.cfg.GetCommitteeSize(ic.Block.Index)
index := int(ic.Block.Index) % committeeSize
committeeReward := new(big.Int).Mul(gas, bigCommitteeRewardRatio)
n.GAS.mint(ic, pubs[index].GetScriptHash(), committeeReward.Div(committeeReward, big100), false)

var isCacheRW bool
if n.cfg.ShouldUpdateCommitteeAt(ic.Block.Index) {
var voterReward = new(big.Int).Set(bigVoterRewardRatio)
voterReward.Mul(voterReward, gas)
Expand All @@ -408,9 +451,8 @@ func (n *NEO) PostPersist(ic *interop.Context) error {
voterReward.Div(voterReward, big100)

var (
cs = cache.committee
isCacheRW bool
key = make([]byte, 34)
cs = cache.committee
key = make([]byte, 34)
)
for i := range cs {
if cs[i].Votes.Sign() > 0 {
Expand All @@ -437,6 +479,27 @@ func (n *NEO) PostPersist(ic *interop.Context) error {
}
}
}
// Update newEpoch cache for external users and further committee, committeeHash
// and nextBlockValidators cache initialisation if committee should be updated in
// the next block.
if n.cfg.ShouldUpdateCommitteeAt(ic.Block.Index + 1) {
var (
h = ic.Block.Index // consider persisting block as stored to get _next_ block newEpochNextValidators
numOfCNs = n.cfg.GetNumOfCNs(h + 1)
)
if cache.votesChanged ||
numOfCNs != len(cache.newEpochNextValidators) ||
n.cfg.GetCommitteeSize(h+1) != len(cache.newEpochCommittee) {
if !isCacheRW {
cache = ic.DAO.GetRWCache(n.ID).(*NeoCache)
}
err := n.updateCachedNewEpochValues(ic.DAO, cache, h, numOfCNs)
if err != nil {
return fmt.Errorf("failed to update next block newEpoch* cache: %w", err)
}
}
}

return nil
}

Expand Down Expand Up @@ -775,7 +838,9 @@ func (n *NEO) UnregisterCandidateInternal(ic *interop.Context, pub *keys.PublicK
return nil
}
cache := ic.DAO.GetRWCache(n.ID).(*NeoCache)
cache.validators = nil
// Not only current committee/validators cache is interested in votesChanged, but also
// newEpoch cache, thus, modify votesChanged to update the latter.
cache.votesChanged = true
c := new(candidate).FromBytes(si)
emitEvent := c.Registered
c.Registered = false
Expand Down Expand Up @@ -901,7 +966,7 @@ func (n *NEO) ModifyAccountVotes(acc *state.NEOBalance, d *dao.Simple, value *bi
return nil
}
}
cache.validators = nil
cache.newEpochNextValidators = nil
return putConvertibleToDAO(n.ID, d, key, cd)
}
return nil
Expand Down Expand Up @@ -1042,24 +1107,24 @@ func (n *NEO) getAccountState(ic *interop.Context, args []stackitem.Item) stacki
return item
}

// ComputeNextBlockValidators returns an actual list of current validators.
func (n *NEO) ComputeNextBlockValidators(blockHeight uint32, d *dao.Simple) (keys.PublicKeys, error) {
numOfCNs := n.cfg.GetNumOfCNs(blockHeight + 1)
// Most of the time it should be OK with RO cache, thus try to retrieve
// validators without RW cache creation to avoid cached values copying.
// ComputeNextBlockValidators computes an actual list of current validators that is
// relevant for the latest processed dBFT epoch and based on the changes made by
// register/unregister/vote calls during the latest epoch.
// Note: this method isn't actually "computes" new committee list and calculates
// new validators list from it. Instead, it uses cache, and the cache itself is
// updated during the PostPersist of the last block of every epoch.
func (n *NEO) ComputeNextBlockValidators(d *dao.Simple) keys.PublicKeys {
// It should always be OK with RO cache if using lower-layered DAO with proper
// cache set.
cache := d.GetROCache(n.ID).(*NeoCache)
if vals := cache.validators; vals != nil && numOfCNs == len(vals) {
return vals.Copy(), nil
}
cache = d.GetRWCache(n.ID).(*NeoCache)
result, _, err := n.computeCommitteeMembers(blockHeight, d)
if err != nil {
return nil, err
}
result = result[:numOfCNs]
sort.Sort(result)
cache.validators = result
return result, nil
if vals := cache.newEpochNextValidators; vals != nil {
return vals.Copy()
}
// It's a caller's program error to call ComputeNextBlockValidators not having
// the right value in lower cache. With the current scheme of handling
// newEpochNextValidators cache this code is expected to be unreachable, but
// let's have this panic here just in case.
panic("bug: unexpected external call to newEpochNextValidators cache")
}

func (n *NEO) getCommittee(ic *interop.Context, _ []stackitem.Item) stackitem.Item {
Expand All @@ -1083,11 +1148,10 @@ func (n *NEO) modifyVoterTurnout(d *dao.Simple, amount *big.Int) error {
// GetCommitteeMembers returns public keys of nodes in committee using cached value.
func (n *NEO) GetCommitteeMembers(d *dao.Simple) keys.PublicKeys {
cache := d.GetROCache(n.ID).(*NeoCache)
return getCommitteeMembers(cache)
return getCommitteeMembers(cache.committee)
}

func getCommitteeMembers(cache *NeoCache) keys.PublicKeys {
var cvs = cache.committee
func getCommitteeMembers(cvs keysWithVotes) keys.PublicKeys {
var committee = make(keys.PublicKeys, len(cvs))
var err error
for i := range committee {
Expand Down
Loading

0 comments on commit 8fcf1ee

Please sign in to comment.