Skip to content

Commit

Permalink
notification: make each reorg handler indicate when it is finished
Browse files Browse the repository at this point in the history
Do not leave OnReorganization before each handler sets the reorg flag.

Make reorg channels unbuffered.
  • Loading branch information
chappjc committed May 15, 2018
1 parent ce05eab commit 23c8c80
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 8 deletions.
4 changes: 4 additions & 0 deletions blockdata/chainmonitor.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Copyright (c) 2018, The Decred developers
// Copyright (c) 2017, Jonathan Chappelow
// See LICENSE for details.

Expand All @@ -17,6 +18,7 @@ type ReorgData struct {
OldChainHeight int32
NewChainHead chainhash.Hash
NewChainHeight int32
WG *sync.WaitGroup
}

// for getblock, ticketfeeinfo, estimatestakediff, etc.
Expand Down Expand Up @@ -233,6 +235,8 @@ out:
log.Infof("Reorganize started in blockdata. OLD head block %v at height %d.",
oldHash, oldHeight)

reorgData.WG.Done()

case _, ok := <-p.quit:
if !ok {
log.Debugf("Got quit signal. Exiting reorg notification handler.")
Expand Down
4 changes: 4 additions & 0 deletions db/dcrsqlite/chainmonitor.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Copyright (c) 2018, The Decred developers
// Copyright (c) 2017, Jonathan Chappelow
// See LICENSE for details.

Expand All @@ -17,6 +18,7 @@ type ReorgData struct {
OldChainHeight int32
NewChainHead chainhash.Hash
NewChainHeight int32
WG *sync.WaitGroup
}

// ChainMonitor handles change notifications from the node client
Expand Down Expand Up @@ -247,6 +249,8 @@ out:
log.Infof("Reorganize started. OLD head block %v at height %d.",
oldHash, oldHeight)

reorgData.WG.Done()

case _, ok := <-p.quit:
if !ok {
log.Debugf("Got quit signal. Exiting reorg notification handler.")
Expand Down
11 changes: 5 additions & 6 deletions ntfnchans.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Copyright (c) 2018, The Decred developers
// Copyright (c) 2017, Jonathan Chappelow
// See LICENSE for details.

Expand Down Expand Up @@ -25,14 +26,12 @@ const (
// expNewTxChanBuffer is the size of the new transaction buffer for explorer
expNewTxChanBuffer = 70

reorgBuffer = 2

// relevantMempoolTxChanBuffer is the size of the new transaction channel
// buffer, for relevant transactions that are added into mempool.
//relevantMempoolTxChanBuffer = 2048
)

// Channels are package-level variables for simplicity
// NtfnChans collects the chain server notification channels
var ntfnChans struct {
connectChan chan *chainhash.Hash
reorgChanBlockData chan *blockdata.ReorgData
Expand Down Expand Up @@ -63,9 +62,9 @@ func makeNtfnChans(cfg *config) {
ntfnChans.connectChanStakeDB = make(chan *chainhash.Hash)

// Reorg data channels
ntfnChans.reorgChanBlockData = make(chan *blockdata.ReorgData, reorgBuffer)
ntfnChans.reorgChanWiredDB = make(chan *dcrsqlite.ReorgData, reorgBuffer)
ntfnChans.reorgChanStakeDB = make(chan *stakedb.ReorgData, reorgBuffer)
ntfnChans.reorgChanBlockData = make(chan *blockdata.ReorgData)
ntfnChans.reorgChanWiredDB = make(chan *dcrsqlite.ReorgData)
ntfnChans.reorgChanStakeDB = make(chan *stakedb.ReorgData)

// To update app status
ntfnChans.updateStatusNodeHeight = make(chan uint32, blockConnChanBuffer)
Expand Down
13 changes: 13 additions & 0 deletions ntfnhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,36 +152,49 @@ func makeNodeNtfnHandlers(cfg *config) (*rpcclient.NotificationHandlers, *collec
},
OnReorganization: func(oldHash *chainhash.Hash, oldHeight int32,
newHash *chainhash.Hash, newHeight int32) {
wg := new(sync.WaitGroup)
// Send reorg data to dcrsqlite's monitor
wg.Add(1)
select {
case ntfnChans.reorgChanWiredDB <- &dcrsqlite.ReorgData{
OldChainHead: *oldHash,
OldChainHeight: oldHeight,
NewChainHead: *newHash,
NewChainHeight: newHeight,
WG: wg,
}:
default:
wg.Done()
}

// Send reorg data to blockdata's monitor (so that it stops collecting)
wg.Add(1)
select {
case ntfnChans.reorgChanBlockData <- &blockdata.ReorgData{
OldChainHead: *oldHash,
OldChainHeight: oldHeight,
NewChainHead: *newHash,
NewChainHeight: newHeight,
WG: wg,
}:
default:
wg.Done()
}

// Send reorg data to stakedb's monitor
wg.Add(1)
select {
case ntfnChans.reorgChanStakeDB <- &stakedb.ReorgData{
OldChainHead: *oldHash,
OldChainHeight: oldHeight,
NewChainHead: *newHash,
NewChainHeight: newHeight,
WG: wg,
}:
default:
wg.Done()
}
wg.Wait()
},

OnWinningTickets: func(blockHash *chainhash.Hash, blockHeight int64,
Expand Down
4 changes: 4 additions & 0 deletions stakedb/chainmonitor.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Copyright (c) 2018, The Decred developers
// Copyright (c) 2017, Jonathan Chappelow
// See LICENSE for details.

Expand All @@ -17,6 +18,7 @@ type ReorgData struct {
OldChainHeight int32
NewChainHead chainhash.Hash
NewChainHeight int32
WG *sync.WaitGroup
}

// ChainMonitor connects blocks to the stake DB as they come in.
Expand Down Expand Up @@ -241,6 +243,8 @@ out:
log.Infof("Reorganize started. OLD head block %v at height %d.",
oldHash, oldHeight)

reorgData.WG.Done()

case _, ok := <-p.quit:
if !ok {
log.Debugf("Got quit signal. Exiting reorg notification handler.")
Expand Down
17 changes: 15 additions & 2 deletions stakedb/stakedb.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2018, The dcrdata developers.
// Copyright (c) 2018, The Decred developers
// Copyright (c) 2018, The dcrdata developers
// Copyright (c) 2017, Jonathan Chappelow
// See LICENSE for details.

Expand Down Expand Up @@ -417,11 +418,15 @@ func (db *StakeDatabase) connectBlock(block *dcrutil.Block, spent []chainhash.Ha
return fmt.Errorf("unable to serialize block header: %v", err)
}

db.BestNode, err = db.BestNode.ConnectNode(stake.CalcHash256PRNGIV(hB),
bestNode, err := db.BestNode.ConnectNode(stake.CalcHash256PRNGIV(hB),
spent, revoked, maturing)
if err != nil {
return err
}
if bestNode == nil {
return fmt.Errorf("failed to ConnectNode at BestNode")
}
db.BestNode = bestNode

if err = db.StakeDB.Update(func(dbTx database.Tx) error {
return stake.WriteConnectedBestNode(dbTx, db.BestNode, *block.Hash())
Expand Down Expand Up @@ -506,6 +511,9 @@ func (db *StakeDatabase) disconnectBlock() error {
if err != nil {
return err
}
if parentStakeNode == nil {
return fmt.Errorf("failed to DisconnectNode at BestNode")
}
db.BestNode = parentStakeNode

return db.StakeDB.Update(func(dbTx database.Tx) error {
Expand Down Expand Up @@ -641,6 +649,11 @@ func (db *StakeDatabase) expires() ([]chainhash.Hash, []bool) {
// structure including ticket pool value, size, and average value.
func (db *StakeDatabase) PoolInfoBest() *apitypes.TicketPoolInfo {
db.nodeMtx.RLock()
if db.BestNode == nil {
db.nodeMtx.RUnlock()
log.Errorf("PoolInfoBest: BestNode is nil!")
return nil
}
//poolSize := db.BestNode.PoolSize()
liveTickets := db.BestNode.LiveTickets()
winningTickets := db.BestNode.Winners()
Expand Down

0 comments on commit 23c8c80

Please sign in to comment.