From 23c8c80c7956f1e5f46eaa1501d45c038105fe36 Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Thu, 10 May 2018 18:49:41 -0700 Subject: [PATCH] notification: make each reorg handler indicate when it is finished Do not leave OnReorganization before each handler sets the reorg flag. Make reorg channels unbuffered. --- blockdata/chainmonitor.go | 4 ++++ db/dcrsqlite/chainmonitor.go | 4 ++++ ntfnchans.go | 11 +++++------ ntfnhandlers.go | 13 +++++++++++++ stakedb/chainmonitor.go | 4 ++++ stakedb/stakedb.go | 17 +++++++++++++++-- 6 files changed, 45 insertions(+), 8 deletions(-) diff --git a/blockdata/chainmonitor.go b/blockdata/chainmonitor.go index c29ccf415..ee4f09a72 100644 --- a/blockdata/chainmonitor.go +++ b/blockdata/chainmonitor.go @@ -1,3 +1,4 @@ +// Copyright (c) 2018, The Decred developers // Copyright (c) 2017, Jonathan Chappelow // See LICENSE for details. @@ -17,6 +18,7 @@ type ReorgData struct { OldChainHeight int32 NewChainHead chainhash.Hash NewChainHeight int32 + WG *sync.WaitGroup } // for getblock, ticketfeeinfo, estimatestakediff, etc. @@ -233,6 +235,8 @@ out: log.Infof("Reorganize started in blockdata. OLD head block %v at height %d.", oldHash, oldHeight) + reorgData.WG.Done() + case _, ok := <-p.quit: if !ok { log.Debugf("Got quit signal. Exiting reorg notification handler.") diff --git a/db/dcrsqlite/chainmonitor.go b/db/dcrsqlite/chainmonitor.go index 6bb7600df..d2d784b37 100644 --- a/db/dcrsqlite/chainmonitor.go +++ b/db/dcrsqlite/chainmonitor.go @@ -1,3 +1,4 @@ +// Copyright (c) 2018, The Decred developers // Copyright (c) 2017, Jonathan Chappelow // See LICENSE for details. @@ -17,6 +18,7 @@ type ReorgData struct { OldChainHeight int32 NewChainHead chainhash.Hash NewChainHeight int32 + WG *sync.WaitGroup } // ChainMonitor handles change notifications from the node client @@ -247,6 +249,8 @@ out: log.Infof("Reorganize started. OLD head block %v at height %d.", oldHash, oldHeight) + reorgData.WG.Done() + case _, ok := <-p.quit: if !ok { log.Debugf("Got quit signal. Exiting reorg notification handler.") diff --git a/ntfnchans.go b/ntfnchans.go index 6b9acc47a..2332c2180 100644 --- a/ntfnchans.go +++ b/ntfnchans.go @@ -1,3 +1,4 @@ +// Copyright (c) 2018, The Decred developers // Copyright (c) 2017, Jonathan Chappelow // See LICENSE for details. @@ -25,14 +26,12 @@ const ( // expNewTxChanBuffer is the size of the new transaction buffer for explorer expNewTxChanBuffer = 70 - reorgBuffer = 2 - // relevantMempoolTxChanBuffer is the size of the new transaction channel // buffer, for relevant transactions that are added into mempool. //relevantMempoolTxChanBuffer = 2048 ) -// Channels are package-level variables for simplicity +// NtfnChans collects the chain server notification channels var ntfnChans struct { connectChan chan *chainhash.Hash reorgChanBlockData chan *blockdata.ReorgData @@ -63,9 +62,9 @@ func makeNtfnChans(cfg *config) { ntfnChans.connectChanStakeDB = make(chan *chainhash.Hash) // Reorg data channels - ntfnChans.reorgChanBlockData = make(chan *blockdata.ReorgData, reorgBuffer) - ntfnChans.reorgChanWiredDB = make(chan *dcrsqlite.ReorgData, reorgBuffer) - ntfnChans.reorgChanStakeDB = make(chan *stakedb.ReorgData, reorgBuffer) + ntfnChans.reorgChanBlockData = make(chan *blockdata.ReorgData) + ntfnChans.reorgChanWiredDB = make(chan *dcrsqlite.ReorgData) + ntfnChans.reorgChanStakeDB = make(chan *stakedb.ReorgData) // To update app status ntfnChans.updateStatusNodeHeight = make(chan uint32, blockConnChanBuffer) diff --git a/ntfnhandlers.go b/ntfnhandlers.go index a326d2b7c..71dc2dda7 100644 --- a/ntfnhandlers.go +++ b/ntfnhandlers.go @@ -152,36 +152,49 @@ func makeNodeNtfnHandlers(cfg *config) (*rpcclient.NotificationHandlers, *collec }, OnReorganization: func(oldHash *chainhash.Hash, oldHeight int32, newHash *chainhash.Hash, newHeight int32) { + wg := new(sync.WaitGroup) // Send reorg data to dcrsqlite's monitor + wg.Add(1) select { case ntfnChans.reorgChanWiredDB <- &dcrsqlite.ReorgData{ OldChainHead: *oldHash, OldChainHeight: oldHeight, NewChainHead: *newHash, NewChainHeight: newHeight, + WG: wg, }: default: + wg.Done() } + // Send reorg data to blockdata's monitor (so that it stops collecting) + wg.Add(1) select { case ntfnChans.reorgChanBlockData <- &blockdata.ReorgData{ OldChainHead: *oldHash, OldChainHeight: oldHeight, NewChainHead: *newHash, NewChainHeight: newHeight, + WG: wg, }: default: + wg.Done() } + // Send reorg data to stakedb's monitor + wg.Add(1) select { case ntfnChans.reorgChanStakeDB <- &stakedb.ReorgData{ OldChainHead: *oldHash, OldChainHeight: oldHeight, NewChainHead: *newHash, NewChainHeight: newHeight, + WG: wg, }: default: + wg.Done() } + wg.Wait() }, OnWinningTickets: func(blockHash *chainhash.Hash, blockHeight int64, diff --git a/stakedb/chainmonitor.go b/stakedb/chainmonitor.go index f5b44345e..fec2ee060 100644 --- a/stakedb/chainmonitor.go +++ b/stakedb/chainmonitor.go @@ -1,3 +1,4 @@ +// Copyright (c) 2018, The Decred developers // Copyright (c) 2017, Jonathan Chappelow // See LICENSE for details. @@ -17,6 +18,7 @@ type ReorgData struct { OldChainHeight int32 NewChainHead chainhash.Hash NewChainHeight int32 + WG *sync.WaitGroup } // ChainMonitor connects blocks to the stake DB as they come in. @@ -241,6 +243,8 @@ out: log.Infof("Reorganize started. OLD head block %v at height %d.", oldHash, oldHeight) + reorgData.WG.Done() + case _, ok := <-p.quit: if !ok { log.Debugf("Got quit signal. Exiting reorg notification handler.") diff --git a/stakedb/stakedb.go b/stakedb/stakedb.go index e68794ada..ff899ca5a 100644 --- a/stakedb/stakedb.go +++ b/stakedb/stakedb.go @@ -1,4 +1,5 @@ -// Copyright (c) 2018, The dcrdata developers. +// Copyright (c) 2018, The Decred developers +// Copyright (c) 2018, The dcrdata developers // Copyright (c) 2017, Jonathan Chappelow // See LICENSE for details. @@ -417,11 +418,15 @@ func (db *StakeDatabase) connectBlock(block *dcrutil.Block, spent []chainhash.Ha return fmt.Errorf("unable to serialize block header: %v", err) } - db.BestNode, err = db.BestNode.ConnectNode(stake.CalcHash256PRNGIV(hB), + bestNode, err := db.BestNode.ConnectNode(stake.CalcHash256PRNGIV(hB), spent, revoked, maturing) if err != nil { return err } + if bestNode == nil { + return fmt.Errorf("failed to ConnectNode at BestNode") + } + db.BestNode = bestNode if err = db.StakeDB.Update(func(dbTx database.Tx) error { return stake.WriteConnectedBestNode(dbTx, db.BestNode, *block.Hash()) @@ -506,6 +511,9 @@ func (db *StakeDatabase) disconnectBlock() error { if err != nil { return err } + if parentStakeNode == nil { + return fmt.Errorf("failed to DisconnectNode at BestNode") + } db.BestNode = parentStakeNode return db.StakeDB.Update(func(dbTx database.Tx) error { @@ -641,6 +649,11 @@ func (db *StakeDatabase) expires() ([]chainhash.Hash, []bool) { // structure including ticket pool value, size, and average value. func (db *StakeDatabase) PoolInfoBest() *apitypes.TicketPoolInfo { db.nodeMtx.RLock() + if db.BestNode == nil { + db.nodeMtx.RUnlock() + log.Errorf("PoolInfoBest: BestNode is nil!") + return nil + } //poolSize := db.BestNode.PoolSize() liveTickets := db.BestNode.LiveTickets() winningTickets := db.BestNode.Winners()