Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: rework rescan logic - index side #23

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ BITCOIN_CORE_H = \
netmessagemaker.h \
node/coin.h \
node/psbt.h \
node/rescan.h \
node/transaction.h \
noui.h \
optional.h \
Expand Down Expand Up @@ -277,6 +278,7 @@ libbitcoin_server_a_SOURCES = \
net_processing.cpp \
node/coin.cpp \
node/psbt.cpp \
node/rescan.cpp \
node/transaction.cpp \
noui.cpp \
policy/fees.cpp \
Expand Down
8 changes: 4 additions & 4 deletions src/bitcoind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ const std::function<std::string(const char*)> G_TRANSLATION_FUN = nullptr;
* Use the buttons <code>Namespaces</code>, <code>Classes</code> or <code>Files</code> at the top of the page to start navigating the code.
*/

static void WaitForShutdown()
static void WaitForShutdown(InitInterfaces& interfaces)
{
while (!ShutdownRequested())
{
MilliSleep(200);
}
Interrupt();
Interrupt(interfaces);
}

//////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -172,9 +172,9 @@ static bool AppInit(int argc, char* argv[])

if (!fRet)
{
Interrupt();
Interrupt(interfaces);
} else {
WaitForShutdown();
WaitForShutdown(interfaces);
}
Shutdown(interfaces);

Expand Down
275 changes: 61 additions & 214 deletions src/index/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,116 +46,22 @@ void BaseIndex::DB::WriteBestBlock(CDBBatch& batch, const CBlockLocator& locator
batch.Write(DB_BEST_BLOCK, locator);
}

BaseIndex::~BaseIndex()
{
Interrupt();
Stop();
}

bool BaseIndex::Init()
{
CBlockLocator locator;
if (!GetDB().ReadBestBlock(locator)) {
locator.SetNull();
}

LOCK(cs_main);
if (locator.IsNull()) {
m_best_block_index = nullptr;
} else {
m_best_block_index = FindForkInGlobalIndex(::ChainActive(), locator);
if (!locator.IsNull()) {
LOCK(cs_main);
CBlockIndex* pindex = FindForkInGlobalIndex(::ChainActive(), locator);
m_last_block_processed_height = pindex->nHeight;
}
m_synced = m_best_block_index.load() == ::ChainActive().Tip();
m_chain->registerNotifications(*this, const_cast<CBlockLocator&>(locator));
return true;
}

static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
AssertLockHeld(cs_main);

if (!pindex_prev) {
return ::ChainActive().Genesis();
}

const CBlockIndex* pindex = ::ChainActive().Next(pindex_prev);
if (pindex) {
return pindex;
}

return ::ChainActive().Next(::ChainActive().FindFork(pindex_prev));
}

void BaseIndex::ThreadSync()
{
const CBlockIndex* pindex = m_best_block_index.load();
if (!m_synced) {
auto& consensus_params = Params().GetConsensus();

int64_t last_log_time = 0;
int64_t last_locator_write_time = 0;
while (true) {
if (m_interrupt) {
m_best_block_index = pindex;
// No need to handle errors in Commit. If it fails, the error will be already be
// logged. The best way to recover is to continue, as index cannot be corrupted by
// a missed commit to disk for an advanced index state.
Commit();
return;
}

{
LOCK(cs_main);
const CBlockIndex* pindex_next = NextSyncBlock(pindex);
if (!pindex_next) {
m_best_block_index = pindex;
m_synced = true;
// No need to handle errors in Commit. See rationale above.
Commit();
break;
}
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
FatalError("%s: Failed to rewind index %s to a previous chain tip",
__func__, GetName());
return;
}
pindex = pindex_next;
}

int64_t current_time = GetTime();
if (last_log_time + SYNC_LOG_INTERVAL < current_time) {
LogPrintf("Syncing %s with block chain from height %d\n",
GetName(), pindex->nHeight);
last_log_time = current_time;
}

if (last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) {
m_best_block_index = pindex;
last_locator_write_time = current_time;
// No need to handle errors in Commit. See rationale above.
Commit();
}

CBlock block;
if (!ReadBlockFromDisk(block, pindex, consensus_params)) {
FatalError("%s: Failed to read block %s from disk",
__func__, pindex->GetBlockHash().ToString());
return;
}
if (!WriteBlock(block, pindex)) {
FatalError("%s: Failed to write block %s to index database",
__func__, pindex->GetBlockHash().ToString());
return;
}
}
}

if (pindex) {
LogPrintf("%s is enabled at height %d\n", GetName(), pindex->nHeight);
} else {
LogPrintf("%s is enabled\n", GetName());
}
}

bool BaseIndex::Commit()
{
CDBBatch batch(GetDB());
Expand All @@ -168,156 +74,97 @@ bool BaseIndex::Commit()
bool BaseIndex::CommitInternal(CDBBatch& batch)
{
LOCK(cs_main);
GetDB().WriteBestBlock(batch, ::ChainActive().GetLocator(m_best_block_index));
const CBlockIndex *pindex = ::ChainActive()[m_last_block_processed_height];
GetDB().WriteBestBlock(batch, ::ChainActive().GetLocator(pindex));
return true;
}

bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip)
void BaseIndex::Rewind(int forked_height, int ancestor_height)
{
assert(current_tip == m_best_block_index);
assert(current_tip->GetAncestor(new_tip->nHeight) == new_tip);
assert(forked_height == m_last_block_processed_height);

// In the case of a reorg, ensure persisted block locator is not stale.
m_best_block_index = new_tip;
m_last_block_processed_height = ancestor_height;
if (!Commit()) {
// If commit fails, revert the best block index to avoid corruption.
m_best_block_index = current_tip;
return false;
// If commit fails, revert the best processed height to avoid corruption.
m_last_block_processed_height = forked_height;
}

return true;
}

void BaseIndex::BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex,
const std::vector<CTransactionRef>& txn_conflicted)
void BaseIndex::BlockConnected(const CBlock& block, const std::vector<CTransactionRef>& txn_conflicted,
int height, FlatFilePos block_pos)
{
if (!m_synced) {
if (m_last_block_processed_height == -1 && height != 0) {
FatalError("%s: First block connected is not the genesis block (height=%d)",
__func__, height);
return;
}
// In the new model, if we are relying on ThreadServiceRequests to get block connection, in case of fork,
// we are going to Rewind and restart rescan from then. If we rely on ValidationInterface (i.e we reach
// tip at least once), we should receive BlockDisconnected event. In case of reorg, we don't overwrite
// data committed in data base, so may have false elements but we don't miss right ones.

const CBlockIndex* best_block_index = m_best_block_index.load();
if (!best_block_index) {
if (pindex->nHeight != 0) {
FatalError("%s: First block connected is not the genesis block (height=%d)",
__func__, pindex->nHeight);
return;
}
} else {
// Ensure block connects to an ancestor of the current best block. This should be the case
// most of the time, but may not be immediately after the sync thread catches up and sets
// m_synced. Consider the case where there is a reorg and the blocks on the stale branch are
// in the ValidationInterface queue backlog even after the sync thread has caught up to the
// new chain tip. In this unlikely event, log a warning and let the queue clear.
if (best_block_index->GetAncestor(pindex->nHeight - 1) != pindex->pprev) {
LogPrintf("%s: WARNING: Block %s does not connect to an ancestor of " /* Continued */
"known best chain (tip=%s); not updating index\n",
__func__, pindex->GetBlockHash().ToString(),
best_block_index->GetBlockHash().ToString());
return;
}
if (best_block_index != pindex->pprev && !Rewind(best_block_index, pindex->pprev)) {
FatalError("%s: Failed to rewind index %s to a previous chain tip",
__func__, GetName());
return;
}
}

if (WriteBlock(*block, pindex)) {
m_best_block_index = pindex;
if (WriteBlock(block, height, block_pos, m_last_block_processed)) {
m_last_block_processed_height = height;
m_last_block_processed = block.GetBlockHeader().GetHash();
} else {
FatalError("%s: Failed to write block %s to index",
__func__, pindex->GetBlockHash().ToString());
__func__, block.GetBlockHeader().GetHash().ToString());
return;
}
// To avoid performance hit, we flush every SYNC_LOCATOR_WRITE_INTERVAL until catch up to tip,
// then after every block connection.
if (m_synced) {
int64_t current_time = GetTime();
if (m_last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) {
m_last_locator_write_time = current_time;
// No need to handle errors in Commit. If it fails, the error will be already be
// logged. The best way to recover is to continue, as index cannot be corrupted by
// a missed commit to disk for an advanced index state.
Commit();
}
} else {
Commit();
}
}


void BaseIndex::ChainStateFlushed(const CBlockLocator& locator)
{
if (!m_synced) {
return;
}

const uint256& locator_tip_hash = locator.vHave.front();
const CBlockIndex* locator_tip_index;
{
LOCK(cs_main);
locator_tip_index = LookupBlockIndex(locator_tip_hash);
}

if (!locator_tip_index) {
FatalError("%s: First block (hash=%s) in locator was not found",
__func__, locator_tip_hash.ToString());
return;
}

// This checks that ChainStateFlushed callbacks are received after BlockConnected. The check may fail
// immediately after the sync thread catches up and sets m_synced. Consider the case where
// there is a reorg and the blocks on the stale branch are in the ValidationInterface queue
// backlog even after the sync thread has caught up to the new chain tip. In this unlikely
// event, log a warning and let the queue clear.
const CBlockIndex* best_block_index = m_best_block_index.load();
if (best_block_index->GetAncestor(locator_tip_index->nHeight) != locator_tip_index) {
LogPrintf("%s: WARNING: Locator contains block (hash=%s) not on known best " /* Continued */
"chain (tip=%s); not writing index locator\n",
__func__, locator_tip_hash.ToString(),
best_block_index->GetBlockHash().ToString());
return;
}

// No need to handle errors in Commit. If it fails, the error will be already be logged. The
// best way to recover is to continue, as index cannot be corrupted by a missed commit to disk
// for an advanced index state.
// No need to handle errors in Commit. If it fails, the error will be already be
// logged. The best way to recover is to continue, as index cannot be corrupted by
// a missed commit to disk for an advanced index state.
Commit();
}

bool BaseIndex::BlockUntilSyncedToCurrentChain()
void BaseIndex::BlockDisconnected(const CBlock& block, int height)
{
AssertLockNotHeld(cs_main);

if (!m_synced) {
return false;
}

{
// Skip the queue-draining stuff if we know we're caught up with
// ::ChainActive().Tip().
LOCK(cs_main);
const CBlockIndex* chain_tip = ::ChainActive().Tip();
const CBlockIndex* best_block_index = m_best_block_index.load();
if (best_block_index->GetAncestor(chain_tip->nHeight) == chain_tip) {
return true;
}
}

LogPrintf("%s: %s is catching up on block notifications\n", __func__, GetName());
SyncWithValidationInterfaceQueue();
return true;
Rewind(height, height - 1);
// If commit fails in Rewind, don't update last block processed to avoid corruption
if (m_last_block_processed_height != height) m_last_block_processed = block.hashPrevBlock;
}

void BaseIndex::Interrupt()
void BaseIndex::UpdatedBlockTip()
{
m_interrupt();
// Starting from now, index synchronization state relies on CValidationInterface and not
// on ThreadServiceRequest anymore.
m_synced = true;
}

void BaseIndex::Start()
void BaseIndex::HandleNotifications()
{
// Need to register this ValidationInterface before running Init(), so that
// callbacks are not missed if Init sets m_synced to true.
RegisterValidationInterface(this);
if (!Init()) {
FatalError("%s: %s failed to initialize", __func__, GetName());
return;
}

m_thread_sync = std::thread(&TraceThread<std::function<void()>>, GetName(),
std::bind(&BaseIndex::ThreadSync, this));
m_chain_notifications_handler = m_chain->handleNotifications(*this);
}

void BaseIndex::Stop()
bool BaseIndex::BlockUntilSyncedToCurrentChain()
{
UnregisterValidationInterface(this);
AssertLockNotHeld(cs_main);

if (m_thread_sync.joinable()) {
m_thread_sync.join();
// If index wasn't synced at least once until tip, it's not receiving yet updates
// from validation interface, no need to hold.
if (!m_synced) {
return false;
}
m_chain->waitForNotificationsIfNewBlocksConnected(m_last_block_processed);
return true;
}
Loading