From 112044cba355f848579fcef7a7d945c2aa61850e Mon Sep 17 00:00:00 2001 From: Brett Boston Date: Wed, 10 Jan 2024 17:36:33 -0800 Subject: [PATCH] Draft: Recover SCP messages when replaying from history This PR enables updating the `scphistory` table during catchup from history. It allows users to specify which archives to use via the `SCP_HISTORY_ARCHIVES` config option. If a user specifies multiple archives, stellar-core will merge the messages from the archives. This is a draft PR because I'm looking for feedback on the approach, but still have some work to do before it is in a mergeable state. Most of the remaining work is to: * Document the new functionality * Write additional tests for: * Merging * Failed downloads * No `SCP_HISTORY_ARCHIVES` * Multiple `SCP_HISTORY_ARCHIVES` * Changes to `ReplayDebugMetaWork` * Integrate changes from #4121 * Address other `TODO`s in the changes --- Builds/VisualStudio/stellar-core.vcxproj | 2 + .../VisualStudio/stellar-core.vcxproj.filters | 6 + src/catchup/ApplyBufferedLedgersWork.cpp | 4 +- src/catchup/ApplyCheckpointWork.cpp | 88 +++++++++- src/catchup/ApplyCheckpointWork.h | 27 +++- src/catchup/ApplyLedgerWork.cpp | 13 +- src/catchup/ApplyLedgerWork.h | 9 +- src/catchup/CatchupWork.cpp | 35 +++- src/catchup/CatchupWork.h | 5 +- src/catchup/DownloadApplyTxsWork.cpp | 5 +- src/catchup/DownloadApplyTxsWork.h | 4 + src/catchup/ReplayDebugMetaWork.cpp | 22 ++- src/herder/HerderPersistence.h | 8 + src/herder/HerderPersistenceImpl.cpp | 153 +++++++++++++++--- src/herder/HerderPersistenceImpl.h | 10 ++ src/history/test/HistoryTests.cpp | 13 +- src/history/test/HistoryTestsUtils.cpp | 54 ++++++- src/history/test/HistoryTestsUtils.h | 1 + src/historywork/BatchDownloadWork.cpp | 4 +- src/historywork/BatchDownloadWork.h | 5 +- .../BestEffortBatchDownloadWork.cpp | 20 +++ src/historywork/BestEffortBatchDownloadWork.h | 27 ++++ src/historywork/FetchRecentQsetsWork.cpp | 4 +- src/historywork/FetchRecentQsetsWork.h | 2 +- .../WriteVerifiedCheckpointHashesWork.cpp | 2 +- src/main/Config.cpp | 6 + src/main/Config.h | 4 + src/work/BatchWork.cpp | 8 +- src/work/BatchWork.h | 4 + src/work/Work.cpp | 25 ++- src/work/Work.h | 4 + 31 files changed, 521 insertions(+), 53 deletions(-) create mode 100644 src/historywork/BestEffortBatchDownloadWork.cpp create mode 100644 src/historywork/BestEffortBatchDownloadWork.h diff --git a/Builds/VisualStudio/stellar-core.vcxproj b/Builds/VisualStudio/stellar-core.vcxproj index 6f0566be60..634074adac 100644 --- a/Builds/VisualStudio/stellar-core.vcxproj +++ b/Builds/VisualStudio/stellar-core.vcxproj @@ -506,6 +506,7 @@ exit /b 0 + @@ -938,6 +939,7 @@ exit /b 0 + diff --git a/Builds/VisualStudio/stellar-core.vcxproj.filters b/Builds/VisualStudio/stellar-core.vcxproj.filters index d34f209046..e07ad5f3ca 100644 --- a/Builds/VisualStudio/stellar-core.vcxproj.filters +++ b/Builds/VisualStudio/stellar-core.vcxproj.filters @@ -762,6 +762,9 @@ historyWork + + historyWork + historyWork @@ -1916,6 +1919,9 @@ historyWork + + historyWork + historyWork diff --git a/src/catchup/ApplyBufferedLedgersWork.cpp b/src/catchup/ApplyBufferedLedgersWork.cpp index 6b5865b344..41306794a5 100644 --- a/src/catchup/ApplyBufferedLedgersWork.cpp +++ b/src/catchup/ApplyBufferedLedgersWork.cpp @@ -55,7 +55,9 @@ ApplyBufferedLedgersWork::onRun() lcd.getTxSet()->sizeTxTotal(), lcd.getTxSet()->sizeOpTotalForLogging(), stellarValueToString(mApp.getConfig(), lcd.getValue())); - auto applyLedger = std::make_shared(mApp, lcd); + // Pass `nullptr` for `hEntries` because SCP messages of buffered ledgers + // have already been logged. + auto applyLedger = std::make_shared(mApp, lcd, nullptr); auto predicate = [](Application& app) { auto& bl = app.getBucketManager().getBucketList(); diff --git a/src/catchup/ApplyCheckpointWork.cpp b/src/catchup/ApplyCheckpointWork.cpp index 4cc8007263..9cfdb48522 100644 --- a/src/catchup/ApplyCheckpointWork.cpp +++ b/src/catchup/ApplyCheckpointWork.cpp @@ -24,10 +24,9 @@ namespace stellar { -ApplyCheckpointWork::ApplyCheckpointWork(Application& app, - TmpDir const& downloadDir, - LedgerRange const& range, - OnFailureCallback cb) +ApplyCheckpointWork::ApplyCheckpointWork( + Application& app, TmpDir const& downloadDir, LedgerRange const& range, + OnFailureCallback cb, std::shared_ptr& scpDownloadDirs) : BasicWork(app, "apply-ledgers-" + fmt::format(FMT_STRING("{}-{}"), range.mFirst, range.limit()), @@ -36,6 +35,7 @@ ApplyCheckpointWork::ApplyCheckpointWork(Application& app, , mLedgerRange(range) , mCheckpoint( app.getHistoryManager().checkpointContainingLedger(range.mFirst)) + , mSCPDownloadDirs(scpDownloadDirs) , mOnFailure(cb) { // Ledger range check to enforce application of a single checkpoint @@ -69,6 +69,11 @@ ApplyCheckpointWork::closeFiles() { mHdrIn.close(); mTxIn.close(); + for (auto& scpInfo : mSCPCheckpointInfo) + { + scpInfo.scpHistoryIn.close(); + } + mSCPCheckpointInfo.clear(); mFilesOpen = false; } @@ -83,8 +88,7 @@ void ApplyCheckpointWork::openInputFiles() { ZoneScoped; - mHdrIn.close(); - mTxIn.close(); + closeFiles(); FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, mCheckpoint); FileTransferInfo ti(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS, mCheckpoint); @@ -95,6 +99,32 @@ ApplyCheckpointWork::openInputFiles() mTxIn.open(ti.localPath_nogz()); mTxHistoryEntry = TransactionHistoryEntry(); mHeaderHistoryEntry = LedgerHeaderHistoryEntry(); + + if (mSCPDownloadDirs) + { + // Initialize `SCPCheckpointInfo`s for each SCP history download + // directory. + for (auto const& scpDir : *mSCPDownloadDirs) + { + FileTransferInfo si(*scpDir, HISTORY_FILE_TYPE_SCP, mCheckpoint); + CLOG_DEBUG(History, "Saving SCP messages from {}", + si.localPath_nogz()); + mSCPCheckpointInfo.emplace_back(); + SCPCheckpointInfo& scpInfo = mSCPCheckpointInfo.back(); + scpInfo.scpHistoryEntry = std::make_shared(); + try + { + scpInfo.scpHistoryIn.open(si.localPath_nogz()); + } + catch (FileSystemException const&) + { + // File doesn't exist for this checkpoint. That's ok. Skip it. + mSCPCheckpointInfo.pop_back(); + continue; + } + } + } + mFilesOpen = true; } @@ -251,6 +281,49 @@ ApplyCheckpointWork::getNextLedgerCloseData() std::make_optional(mHeaderHistoryEntry.hash)); } +std::unique_ptr +ApplyCheckpointWork::getNextSCPHistoryEntries() +{ + ZoneScoped; + auto ret = std::make_unique(); + uint32_t ledgerSeq = mApp.getLedgerManager().getLastClosedLedgerNum(); + for (SCPCheckpointInfo& info : mSCPCheckpointInfo) + { + XDRInputFileStream& in = info.scpHistoryIn; + std::shared_ptr& entry = info.scpHistoryEntry; + do + { + uint32 scpHistSeq = entry->v0().ledgerMessages.ledgerSeq; + + if (scpHistSeq <= ledgerSeq) + { + // Catching up to `ledgerSeq + 1` + CLOG_DEBUG(History, "Skipping SCP messages for ledger {}", + scpHistSeq); + } + else if (scpHistSeq == ledgerSeq + 1) + { + // Caught up + CLOG_DEBUG(History, "Loaded SCP messages for ledger {}", + ledgerSeq); + ret->push_back(entry); + break; + } + else + { + // Ahead. This archive does not have messages for `ledgerSeq+1` + // TODO: Log which archive is missing the messages (also applies + // to other logging statements in this function) + CLOG_WARNING(History, + "Archive missing SCP messages for ledger {}", + scpHistSeq); + break; + } + } while (in && in.readOne(*entry)); + } + return ret; +} + BasicWork::State ApplyCheckpointWork::onRun() { @@ -308,7 +381,8 @@ ApplyCheckpointWork::onRun() return State::WORK_RUNNING; } - auto applyLedger = std::make_shared(mApp, *lcd); + auto applyLedger = std::make_shared( + mApp, *lcd, getNextSCPHistoryEntries()); auto predicate = [](Application& app) { auto& bl = app.getBucketManager().getBucketList(); diff --git a/src/catchup/ApplyCheckpointWork.h b/src/catchup/ApplyCheckpointWork.h index 93c4168b21..4c7db14411 100644 --- a/src/catchup/ApplyCheckpointWork.h +++ b/src/catchup/ApplyCheckpointWork.h @@ -4,6 +4,7 @@ #pragma once +#include "catchup/ApplyLedgerWork.h" #include "herder/LedgerCloseData.h" #include "herder/TxSetFrame.h" #include "history/HistoryArchive.h" @@ -20,6 +21,18 @@ namespace stellar class TmpDir; struct LedgerHeaderHistoryEntry; +using TmpDirVec = std::vector>; + +// This struct stores information about SCP messages in a single history +// checkpoint +struct SCPCheckpointInfo +{ + // Input stream holding `SCPHistoryEntry`s + XDRInputFileStream scpHistoryIn; + // Most recent SCP history entry read from `scpHistoryIn` + std::shared_ptr scpHistoryEntry; +}; + /** * This class is responsible for applying transactions stored in files on * temporary directory (downloadDir) to local ledger. It requires two sets of @@ -45,11 +58,17 @@ class ApplyCheckpointWork : public BasicWork TmpDir const& mDownloadDir; LedgerRange const mLedgerRange; uint32_t const mCheckpoint; + // The directories containing downloaded SCP history. May be null if no such + // directories exist. + std::shared_ptr mSCPDownloadDirs; XDRInputFileStream mHdrIn; XDRInputFileStream mTxIn; TransactionHistoryEntry mTxHistoryEntry; LedgerHeaderHistoryEntry mHeaderHistoryEntry; + // Vector containing each archive's SCP messages for the current checkpoint + // being processed + std::vector mSCPCheckpointInfo; OnFailureCallback mOnFailure; bool mFilesOpen{false}; @@ -61,11 +80,17 @@ class ApplyCheckpointWork : public BasicWork std::shared_ptr getNextLedgerCloseData(); + // Returns a vector holding SCP messages from each archive for the ledger + // being processed. This vector may be smaller than the total number of + // archives if some archives did not contain messages for the ledger. + std::unique_ptr getNextSCPHistoryEntries(); + void closeFiles(); public: ApplyCheckpointWork(Application& app, TmpDir const& downloadDir, - LedgerRange const& range, OnFailureCallback cb); + LedgerRange const& range, OnFailureCallback cb, + std::shared_ptr& scpDownloadDirs); ~ApplyCheckpointWork() = default; std::string getStatus() const override; void onFailureRaise() override; diff --git a/src/catchup/ApplyLedgerWork.cpp b/src/catchup/ApplyLedgerWork.cpp index 5d910f8bf5..58811fce27 100644 --- a/src/catchup/ApplyLedgerWork.cpp +++ b/src/catchup/ApplyLedgerWork.cpp @@ -3,6 +3,7 @@ // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 #include "catchup/ApplyLedgerWork.h" +#include "herder/HerderPersistence.h" #include "ledger/LedgerManager.h" #include "main/Application.h" #include @@ -10,12 +11,15 @@ namespace stellar { -ApplyLedgerWork::ApplyLedgerWork(Application& app, - LedgerCloseData const& ledgerCloseData) +ApplyLedgerWork::ApplyLedgerWork( + Application& app, LedgerCloseData const& ledgerCloseData, + std::unique_ptr hEntries) : BasicWork( app, "apply-ledger-" + std::to_string(ledgerCloseData.getLedgerSeq()), BasicWork::RETRY_NEVER) + , mApp(app) , mLedgerCloseData(ledgerCloseData) + , mHEntries(std::move(hEntries)) { } @@ -24,6 +28,11 @@ ApplyLedgerWork::onRun() { ZoneScoped; mApp.getLedgerManager().closeLedger(mLedgerCloseData); + if (mHEntries) + { + mApp.getHerderPersistence().copySCPHistoryFromEntries( + *mHEntries, mLedgerCloseData.getLedgerSeq()); + } return BasicWork::State::WORK_SUCCESS; } diff --git a/src/catchup/ApplyLedgerWork.h b/src/catchup/ApplyLedgerWork.h index 558aad27e1..fbf965cc3a 100644 --- a/src/catchup/ApplyLedgerWork.h +++ b/src/catchup/ApplyLedgerWork.h @@ -6,16 +6,23 @@ #include "herder/LedgerCloseData.h" #include "work/Work.h" +#include namespace stellar { +using SCPHistoryEntryVec = std::vector>; + class ApplyLedgerWork : public BasicWork { + Application& mApp; LedgerCloseData const mLedgerCloseData; + // SCP messages for the ledger to be applied + std::unique_ptr mHEntries; public: - ApplyLedgerWork(Application& app, LedgerCloseData const& ledgerCloseData); + ApplyLedgerWork(Application& app, LedgerCloseData const& ledgerCloseData, + std::unique_ptr hEntries); std::string getStatus() const override; diff --git a/src/catchup/CatchupWork.cpp b/src/catchup/CatchupWork.cpp index 760c15436c..55ae1d9fe5 100644 --- a/src/catchup/CatchupWork.cpp +++ b/src/catchup/CatchupWork.cpp @@ -14,8 +14,10 @@ #include "catchup/VerifyLedgerChainWork.h" #include "herder/Herder.h" #include "history/FileTransferInfo.h" +#include "history/HistoryArchiveManager.h" #include "history/HistoryManager.h" #include "historywork/BatchDownloadWork.h" +#include "historywork/BestEffortBatchDownloadWork.h" #include "historywork/DownloadBucketsWork.h" #include "historywork/DownloadVerifyTxResultsWork.h" #include "historywork/GetAndUnzipRemoteFileWork.h" @@ -81,7 +83,7 @@ CatchupWork::CatchupWork(Application& app, std::shared_ptr archive) : Work(app, "catchup", BasicWork::RETRY_NEVER) , mLocalState{app.getLedgerManager().getLastClosedLedgerHAS()} - , mDownloadDir{std::make_unique( + , mDownloadDir{std::make_shared( mApp.getTmpDirManager().tmpDir(getName()))} , mCatchupConfiguration{catchupConfiguration} , mArchive{archive} @@ -150,6 +152,7 @@ CatchupWork::doReset() mHAS.reset(); mBucketHAS.reset(); mRetainedBuckets.clear(); + mSCPDownloadDirs->clear(); } void @@ -166,7 +169,7 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange, // Batch download has default retries ("a few") to ensure we rotate through // archives auto getLedgers = std::make_shared( - mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, *mDownloadDir, + mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, mDownloadDir, mArchive); mRangeEndPromise = std::promise(); mRangeEndFuture = mRangeEndPromise.get_future().share(); @@ -179,9 +182,32 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange, mApp, *mDownloadDir, verifyRange, mLastClosedLedgerHashPair, mRangeEndFuture, std::move(fatalFailurePromise)); + std::vector> seq{getLedgers, mVerifyLedgers}; + + Config const& cfg = mApp.getConfig(); + // TODO: Test with this flag set to `false`. An empty `mSCPDownloadDirs` + // should prevent any further stages from running. + if (cfg.MODE_STORES_HISTORY_MISC) + { + for (std::string const& scpHistoryArchive : cfg.SCP_HISTORY_ARCHIVES) + { + CLOG_DEBUG(History, "Downloading SCP history from {}", + scpHistoryArchive); + std::shared_ptr scpArchive = + mApp.getHistoryArchiveManager().getHistoryArchive( + scpHistoryArchive); + + auto scpDownloadDir = + std::make_shared(mApp.getTmpDirManager().tmpDir( + "scp-history-" + scpHistoryArchive)); + mSCPDownloadDirs->push_back(scpDownloadDir); + seq.emplace_back(std::make_shared( + mApp, checkpointRange, HISTORY_FILE_TYPE_SCP, scpDownloadDir, + scpArchive)); + } + } // Never retry the sequence: downloads already have retries, and there's no // point retrying verification - std::vector> seq{getLedgers, mVerifyLedgers}; mDownloadVerifyLedgersSeq = addWork( "download-verify-ledgers-seq", seq, BasicWork::RETRY_NEVER); mCurrentWork = mDownloadVerifyLedgersSeq; @@ -305,7 +331,8 @@ CatchupWork::downloadApplyTransactions(CatchupRange const& catchupRange) auto waitForPublish = mCatchupConfiguration.offline(); auto range = catchupRange.getReplayRange(); mTransactionsVerifyApplySeq = std::make_shared( - mApp, *mDownloadDir, range, mLastApplied, waitForPublish, mArchive); + mApp, *mDownloadDir, range, mLastApplied, waitForPublish, + mSCPDownloadDirs, mArchive); } BasicWork::State diff --git a/src/catchup/CatchupWork.h b/src/catchup/CatchupWork.h index ed36c75f5c..286f4eed8c 100644 --- a/src/catchup/CatchupWork.h +++ b/src/catchup/CatchupWork.h @@ -4,6 +4,7 @@ #pragma once +#include "catchup/ApplyCheckpointWork.h" #include "catchup/CatchupConfiguration.h" #include "catchup/VerifyLedgerChainWork.h" #include "history/HistoryArchive.h" @@ -46,8 +47,10 @@ class CatchupWork : public Work { protected: HistoryArchiveState mLocalState; - std::unique_ptr mDownloadDir; + std::shared_ptr mDownloadDir; std::map> mBuckets; + // Download directories for SCP message history + std::shared_ptr mSCPDownloadDirs = std::make_shared(); void doReset() override; BasicWork::State doWork() override; diff --git a/src/catchup/DownloadApplyTxsWork.cpp b/src/catchup/DownloadApplyTxsWork.cpp index 1746060d69..83188ed862 100644 --- a/src/catchup/DownloadApplyTxsWork.cpp +++ b/src/catchup/DownloadApplyTxsWork.cpp @@ -23,6 +23,7 @@ namespace stellar DownloadApplyTxsWork::DownloadApplyTxsWork( Application& app, TmpDir const& downloadDir, LedgerRange const& range, LedgerHeaderHistoryEntry& lastApplied, bool waitForPublish, + std::shared_ptr scpDownloadDirs, std::shared_ptr archive) : BatchWork(app, "download-apply-ledgers") , mRange(range) @@ -31,6 +32,7 @@ DownloadApplyTxsWork::DownloadApplyTxsWork( , mCheckpointToQueue( app.getHistoryManager().checkpointContainingLedger(range.mFirst)) , mWaitForPublish(waitForPublish) + , mSCPDownloadDirs(std::move(scpDownloadDirs)) , mArchive(archive) { } @@ -76,7 +78,8 @@ DownloadApplyTxsWork::yieldMoreWork() }; auto apply = std::make_shared( - mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb); + mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb, + mSCPDownloadDirs); std::vector> seq{getAndUnzip}; diff --git a/src/catchup/DownloadApplyTxsWork.h b/src/catchup/DownloadApplyTxsWork.h index e5a81d9240..97bee2bdc3 100644 --- a/src/catchup/DownloadApplyTxsWork.h +++ b/src/catchup/DownloadApplyTxsWork.h @@ -4,6 +4,7 @@ #pragma once +#include "catchup/ApplyCheckpointWork.h" #include "ledger/LedgerRange.h" #include "util/XDRStream.h" #include "work/BatchWork.h" @@ -29,6 +30,8 @@ class DownloadApplyTxsWork : public BatchWork uint32_t mCheckpointToQueue; std::shared_ptr mLastYieldedWork; bool const mWaitForPublish; + // Download directories for SCP message history + std::shared_ptr mSCPDownloadDirs; std::shared_ptr mArchive; public: @@ -36,6 +39,7 @@ class DownloadApplyTxsWork : public BatchWork LedgerRange const& range, LedgerHeaderHistoryEntry& lastApplied, bool waitForPublish, + std::shared_ptr mSCPDownloadDirs, std::shared_ptr archive = nullptr); std::string getStatus() const override; diff --git a/src/catchup/ReplayDebugMetaWork.cpp b/src/catchup/ReplayDebugMetaWork.cpp index 2d2dcd7fde..a461cb969c 100644 --- a/src/catchup/ReplayDebugMetaWork.cpp +++ b/src/catchup/ReplayDebugMetaWork.cpp @@ -19,6 +19,24 @@ namespace stellar { +namespace +{ +// Extract the `SCPHistoryEntry`s within a `LedgerCloseMeta` +std::unique_ptr +scpHistoryEntriesFromLedgerCloseMeta(LedgerCloseMeta const& lcm) +{ + xdr::xvector const& hist = + lcm.v() == 0 ? lcm.v0().scpInfo : lcm.v1().scpInfo; + + auto ret = std::make_unique(); + for (auto const& e : hist) + { + ret->emplace_back(std::make_shared(e)); + } + return ret; +} +} // namespace + // Helper class to apply ledgers from a single debug meta file class ApplyLedgersFromMetaWork : public Work { @@ -122,7 +140,9 @@ class ApplyLedgersFromMetaWork : public Work lh.header.scpValue); releaseAssert(!mApplyLedgerWork); - mApplyLedgerWork = addWork(ledgerCloseData); + // TODO: Test + mApplyLedgerWork = addWork( + ledgerCloseData, scpHistoryEntriesFromLedgerCloseMeta(lcm)); return BasicWork::State::WORK_RUNNING; } diff --git a/src/herder/HerderPersistence.h b/src/herder/HerderPersistence.h index 354e58c013..8ae2830ffc 100644 --- a/src/herder/HerderPersistence.h +++ b/src/herder/HerderPersistence.h @@ -4,6 +4,7 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#include "catchup/ApplyLedgerWork.h" #include "herder/QuorumTracker.h" #include "overlay/Peer.h" #include "xdr/Stellar-SCP.h" @@ -40,6 +41,13 @@ class HerderPersistence uint32_t ledgerSeq, uint32_t ledgerCount, XDROutputFileStream& scpHistory); + + // Given a set of SCP history entries from multiple archives, merge the + // entries--taking the newest entry for each node--and store the result in + // the database. + virtual void copySCPHistoryFromEntries(SCPHistoryEntryVec const& hEntries, + uint32_t ledgerSeq) = 0; + // quorum information lookup static std::optional getNodeQuorumSet(Database& db, soci::session& sess, NodeID const& nodeID); diff --git a/src/herder/HerderPersistenceImpl.cpp b/src/herder/HerderPersistenceImpl.cpp index 1afe36cc6a..3bb6ad643b 100644 --- a/src/herder/HerderPersistenceImpl.cpp +++ b/src/herder/HerderPersistenceImpl.cpp @@ -34,6 +34,22 @@ HerderPersistenceImpl::~HerderPersistenceImpl() { } +void +HerderPersistenceImpl::clearSCPHistoryAtSeq(uint32_t seq) +{ + auto& db = mApp.getDatabase(); + auto prepClean = + db.getPreparedStatement("DELETE FROM scphistory WHERE ledgerseq =:l"); + + auto& st = prepClean.statement(); + st.exchange(soci::use(seq)); + st.define_and_bind(); + { + ZoneNamedN(deleteSCPHistoryZone, "delete scphistory", true); + st.execute(true); + } +} + void HerderPersistenceImpl::saveSCPHistory(uint32_t seq, std::vector const& envs, @@ -49,19 +65,7 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, auto& db = mApp.getDatabase(); soci::transaction txscope(db.getSession()); - - { - auto prepClean = db.getPreparedStatement( - "DELETE FROM scphistory WHERE ledgerseq =:l"); - - auto& st = prepClean.statement(); - st.exchange(soci::use(seq)); - st.define_and_bind(); - { - ZoneNamedN(deleteSCPHistoryZone, "delete scphistory", true); - st.execute(true); - } - } + clearSCPHistoryAtSeq(seq); for (auto const& e : envs) { auto const& qHash = @@ -82,9 +86,9 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, "(:n, :l, :e)"); auto& st = prepEnv.statement(); - st.exchange(soci::use(nodeIDStrKey)); - st.exchange(soci::use(seq)); - st.exchange(soci::use(envelopeEncoded)); + st.exchange(soci::use(nodeIDStrKey, "n")); + st.exchange(soci::use(seq, "l")); + st.exchange(soci::use(envelopeEncoded, "e")); st.define_and_bind(); { ZoneNamedN(insertSCPHistoryZone, "insert scphistory", true); @@ -140,7 +144,18 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, } } // save quorum sets - for (auto const& p : usedQSets) + saveQuorumSets(seq, usedQSets); + + txscope.commit(); +} + +void +HerderPersistenceImpl::saveQuorumSets( + uint32_t seq, UnorderedMap const& qsets) +{ + ZoneScoped; + auto& db = mApp.getDatabase(); + for (auto const& p : qsets) { std::string qSetH = binToHex(p.first); @@ -207,8 +222,6 @@ HerderPersistenceImpl::saveSCPHistory(uint32_t seq, } } } - - txscope.commit(); } size_t @@ -294,9 +307,111 @@ HerderPersistence::copySCPHistoryToStream(Database& db, soci::session& sess, } } + // TODO: What about quoruminfo table? That doesn't seem to be recorded. Do + // we care about it? + return n; } +namespace +{ +// Merge the info in an `hEntry` with the info in `envs` and `qsets` by +// overwriting older entries (as determined by BallotProtocol::isNewerStatement) +// with newer ones. +void +mergeSCPHistory(SCPHistoryEntry const& hEntry, uint32_t ledgerSeq, + UnorderedMap& envs, + UnorderedMap& qsets) +{ + SCPHistoryEntryV0 const& hEntryV0 = hEntry.v0(); + releaseAssert(hEntryV0.ledgerMessages.ledgerSeq == ledgerSeq); + for (SCPEnvelope const& e : hEntryV0.ledgerMessages.messages) + { + NodeID const& nodeID = e.statement.nodeID; + SCPStatement const& statement = e.statement; + auto it = envs.find(nodeID); + if (it == envs.end()) + { + envs[nodeID] = e; + } + else if (BallotProtocol::isNewerStatement(it->second.statement, + statement)) + { + it->second = e; + } + } + + // Merge qset info + for (auto const& qset : hEntryV0.quorumSets) + { + qsets.try_emplace(xdrSha256(qset), + std::make_shared(qset)); + } +} +} // namespace + +void +HerderPersistenceImpl::copySCPHistoryFromEntries( + SCPHistoryEntryVec const& hEntries, uint32_t ledgerSeq) +{ + ZoneScoped; + + if (hEntries.empty()) + { + return; + } + + // Merge entries + UnorderedMap envs; + UnorderedMap qsets; + for (auto const& hEntry : hEntries) + { + mergeSCPHistory(*hEntry, ledgerSeq, envs, qsets); + } + + // TODO: Dedup with saveSCPHistory after changes to it merge + std::vector nodeIDs; + std::vector seqs(envs.size(), ledgerSeq); + std::vector envelopes; + for (auto const& kv : envs) + { + nodeIDs.emplace_back(KeyUtils::toStrKey(kv.first)); + envelopes.emplace_back( + decoder::encode_b64(xdr::xdr_to_opaque(kv.second))); + } + + CLOG_DEBUG(Herder, "Copying {} SCP history entries from ledger {}", + envs.size(), ledgerSeq); + + Database& db = mApp.getDatabase(); + soci::transaction txScope(db.getSession()); + clearSCPHistoryAtSeq(ledgerSeq); + if (!envs.empty()) + { + // Perform multi-row insert into scphistory + auto prepEnv = + db.getPreparedStatement("INSERT INTO scphistory " + "(nodeid, ledgerseq, envelope) VALUES " + "(:n, :l, :e)"); + auto& st = prepEnv.statement(); + st.exchange(soci::use(nodeIDs, "n")); + st.exchange(soci::use(seqs, "l")); + st.exchange(soci::use(envelopes, "e")); + st.define_and_bind(); + { + ZoneNamedN(insertSCPHistoryZone, "insert scphistory", true); + st.execute(true); + } + if (st.get_affected_rows() != envs.size()) + { + throw std::runtime_error("Could not update data in SQL"); + } + } + + saveQuorumSets(ledgerSeq, qsets); + txScope.commit(); +} + std::optional HerderPersistence::getNodeQuorumSet(Database& db, soci::session& sess, NodeID const& nodeID) diff --git a/src/herder/HerderPersistenceImpl.h b/src/herder/HerderPersistenceImpl.h index 968d66bb1a..688b2fdeeb 100644 --- a/src/herder/HerderPersistenceImpl.h +++ b/src/herder/HerderPersistenceImpl.h @@ -20,7 +20,17 @@ class HerderPersistenceImpl : public HerderPersistence void saveSCPHistory(uint32_t seq, std::vector const& envs, QuorumTracker::QuorumMap const& qmap) override; + void copySCPHistoryFromEntries(SCPHistoryEntryVec const& hEntries, + uint32_t ledgerSeq) override; + private: Application& mApp; + + // Save quorum sets at a given sequence number + void saveQuorumSets(uint32_t seq, + UnorderedMap const& qsets); + + // Delete `scphistory` entries at a given sequence number. + void clearSCPHistoryAtSeq(uint32_t seq); }; } diff --git a/src/history/test/HistoryTests.cpp b/src/history/test/HistoryTests.cpp index 7e4821f039..25681bab5c 100644 --- a/src/history/test/HistoryTests.cpp +++ b/src/history/test/HistoryTests.cpp @@ -375,14 +375,16 @@ TEST_CASE("Tx results verification", "[batching][resultsverification]") auto checkpointLedger = catchupSimulation.getLastCheckpointLedger(2); catchupSimulation.ensureOfflineCatchupPossible(checkpointLedger); - auto tmpDir = - catchupSimulation.getApp().getTmpDirManager().tmpDir("tx-results-test"); + auto tmpDirPtr = std::make_shared( + catchupSimulation.getApp().getTmpDirManager().tmpDir( + "tx-results-test")); + auto& tmpDir = *tmpDirPtr; auto& wm = catchupSimulation.getApp().getWorkScheduler(); CheckpointRange range{LedgerRange::inclusive(1, checkpointLedger), catchupSimulation.getApp().getHistoryManager()}; auto verifyHeadersWork = wm.executeWork( - range, HISTORY_FILE_TYPE_LEDGER, tmpDir); + range, HISTORY_FILE_TYPE_LEDGER, tmpDirPtr); REQUIRE(verifyHeadersWork->getState() == BasicWork::State::WORK_SUCCESS); SECTION("basic") { @@ -431,7 +433,7 @@ TEST_CASE("Tx results verification", "[batching][resultsverification]") SECTION("invalid result entries") { auto getResults = wm.executeWork( - range, HISTORY_FILE_TYPE_RESULTS, tmpDir); + range, HISTORY_FILE_TYPE_RESULTS, tmpDirPtr); REQUIRE(getResults->getState() == BasicWork::State::WORK_SUCCESS); FileTransferInfo ft(tmpDir, HISTORY_FILE_TYPE_RESULTS, range.last()); @@ -1490,6 +1492,9 @@ TEST_CASE("Catchup failure recovery with buffered checkpoint", TEST_CASE("Change ordering of buffered ledgers", "[history][catchup]") { + // TODO: Fix `cp` failures this test spits out. SCP history is optional so + // there shouldn't be errors when it doesn't exist. + CatchupSimulation catchupSimulation{}; auto app = catchupSimulation.createCatchupApplication( diff --git a/src/history/test/HistoryTestsUtils.cpp b/src/history/test/HistoryTestsUtils.cpp index 8eb6c39ad9..0d7928cf73 100644 --- a/src/history/test/HistoryTestsUtils.cpp +++ b/src/history/test/HistoryTestsUtils.cpp @@ -7,6 +7,7 @@ #include "catchup/CatchupRange.h" #include "crypto/Hex.h" #include "crypto/Random.h" +#include "herder/HerderPersistence.h" #include "herder/TxSetFrame.h" #include "history/FileTransferInfo.h" #include "history/HistoryArchiveManager.h" @@ -16,6 +17,7 @@ #include "ledger/LedgerTxnHeader.h" #include "lib/catch.hpp" #include "main/ApplicationUtils.h" +#include "scp/LocalNode.h" #include "test/TestAccount.h" #include "test/TestUtils.h" #include "test/TxTests.h" @@ -64,6 +66,8 @@ TmpDirHistoryConfigurator::configure(Config& cfg, bool writable) const } cfg.HISTORY[d] = HistoryArchiveConfiguration{d, getCmd, putCmd, mkdirCmd}; + // TODO: Probably want different variations of this vv + cfg.SCP_HISTORY_ARCHIVES = {d}; cfg.TESTING_SOROBAN_HIGH_LIMIT_OVERRIDE = true; return cfg; } @@ -408,6 +412,22 @@ CatchupSimulation::getLastCheckpointLedger(uint32_t checkpointIndex) const 1; } +namespace +{ +// Make an envelope with enough detail for +// `HerderPersistenceImpl::saveSCPHistory` +SCPEnvelope +makeEnvelope(HerderImpl& herder) +{ + SCPEnvelope result; + result.statement.nodeID = PubKeyUtils::pseudoRandomForTesting(); + result.statement.pledges.type(SCP_ST_EXTERNALIZE); + result.statement.pledges.externalize().commitQuorumSetHash = + herder.getSCP().getLocalNode()->getQuorumSetHash(); + return result; +} +} // namespace + void CatchupSimulation::generateRandomLedger(uint32_t version) { @@ -525,6 +545,21 @@ CatchupSimulation::generateRandomLedger(uint32_t version) mLedgerCloseDatas.emplace_back(ledgerSeq, txSet, sv); + mEnvelopes.emplace_back(); + auto& envs = mEnvelopes.back(); + auto& herder = dynamic_cast(mApp.getHerder()); + for (int i = 0; i < 5; ++i) + { + // Generate at least one envelope + if (i == 0 || rand_flip()) + { + envs.push_back(makeEnvelope(herder)); + } + } + // Save SCP history to the database + mApp.getHerderPersistence().saveSCPHistory(ledgerSeq, envs, + QuorumTracker::QuorumMap()); + auto& txsSucceeded = mApp.getMetrics().NewCounter({"ledger", "apply", "success"}); auto lastSucceeded = txsSucceeded.count(); @@ -903,7 +938,8 @@ void CatchupSimulation::externalizeLedger(HerderImpl& herder, uint32_t ledger) { // Remember the vectors count from 2, not 0. - if (ledger - 2 >= mLedgerCloseDatas.size()) + if (ledger - 2 >= mLedgerCloseDatas.size() || + ledger - 2 >= mEnvelopes.size()) { return; } @@ -1033,6 +1069,22 @@ CatchupSimulation::validateCatchup(Application::pointer app) CHECK(haveCarolSeq == wantCarolSeq); CHECK(haveEveSeq == wantEveSeq); CHECK(haveStrpSeq == wantStrpSeq); + + // Check that `scphistory` has the expected number of entries + size_t j; + size_t count; + auto st = mApp.getDatabase() + .getPreparedStatement( + "SELECT COUNT(*) FROM scphistory WHERE ledgerseq = :seq") + .statement(); + st.exchange(soci::into(count)); + st.exchange(soci::use(j)); + st.define_and_bind(); + for (j = 2; j < nextLedger - 1; ++j) + { + st.execute(true); + CHECK(count == mEnvelopes.at(j - 2).size()); + } } CatchupPerformedWork diff --git a/src/history/test/HistoryTestsUtils.h b/src/history/test/HistoryTestsUtils.h index 863a0f9fed..ba2eb36c6a 100644 --- a/src/history/test/HistoryTestsUtils.h +++ b/src/history/test/HistoryTestsUtils.h @@ -188,6 +188,7 @@ class CatchupSimulation BucketList mBucketListAtLastPublish; std::vector mLedgerCloseDatas; + std::vector> mEnvelopes; std::vector mLedgerSeqs; std::vector mLedgerHashes; diff --git a/src/historywork/BatchDownloadWork.cpp b/src/historywork/BatchDownloadWork.cpp index f74a9a6aa4..3e2655ee83 100644 --- a/src/historywork/BatchDownloadWork.cpp +++ b/src/historywork/BatchDownloadWork.cpp @@ -16,7 +16,7 @@ namespace stellar { BatchDownloadWork::BatchDownloadWork(Application& app, CheckpointRange range, std::string const& type, - TmpDir const& downloadDir, + std::shared_ptr downloadDir, std::shared_ptr archive) : BatchWork(app, fmt::format(FMT_STRING("batch-download-{:s}-{:08x}-{:08x}"), @@ -52,7 +52,7 @@ BatchDownloadWork::yieldMoreWork() return nullptr; } - FileTransferInfo ft(mDownloadDir, mFileType, mNext); + FileTransferInfo ft(*mDownloadDir, mFileType, mNext); CLOG_DEBUG(History, "Downloading and unzipping {} for checkpoint {}", mFileType, mNext); auto getAndUnzip = diff --git a/src/historywork/BatchDownloadWork.h b/src/historywork/BatchDownloadWork.h index 3e355599df..539075984f 100644 --- a/src/historywork/BatchDownloadWork.h +++ b/src/historywork/BatchDownloadWork.h @@ -23,12 +23,13 @@ class BatchDownloadWork : public BatchWork CheckpointRange const mRange; uint32_t mNext; std::string const mFileType; - TmpDir const& mDownloadDir; + std::shared_ptr mDownloadDir; std::shared_ptr mArchive; public: BatchDownloadWork(Application& app, CheckpointRange range, - std::string const& type, TmpDir const& downloadDir, + std::string const& type, + std::shared_ptr downloadDir, std::shared_ptr archive = nullptr); ~BatchDownloadWork() = default; std::string getStatus() const override; diff --git a/src/historywork/BestEffortBatchDownloadWork.cpp b/src/historywork/BestEffortBatchDownloadWork.cpp new file mode 100644 index 0000000000..24c1318553 --- /dev/null +++ b/src/historywork/BestEffortBatchDownloadWork.cpp @@ -0,0 +1,20 @@ +#include "historywork/BestEffortBatchDownloadWork.h" + +namespace stellar +{ +BestEffortBatchDownloadWork::BestEffortBatchDownloadWork( + Application& app, CheckpointRange range, std::string const& type, + std::shared_ptr downloadDir, + std::shared_ptr archive) + : BatchDownloadWork(app, range, type, downloadDir, archive) +{ +} + +BasicWork::State +BestEffortBatchDownloadWork::onChildFailure() +{ + // TODO: Emit a warning about the failed download + abortSuccess(); + return State::WORK_RUNNING; +} +} // namespace stellar \ No newline at end of file diff --git a/src/historywork/BestEffortBatchDownloadWork.h b/src/historywork/BestEffortBatchDownloadWork.h new file mode 100644 index 0000000000..042a2f710f --- /dev/null +++ b/src/historywork/BestEffortBatchDownloadWork.h @@ -0,0 +1,27 @@ +#pragma once + +// Copyright 2023 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "historywork/BatchDownloadWork.h" +#include "work/BasicWork.h" +#include + +namespace stellar +{ +// A version of BatchDownloadWork that does not fail if a download fails. +class BestEffortBatchDownloadWork : public BatchDownloadWork +{ + public: + BestEffortBatchDownloadWork( + Application& app, CheckpointRange range, std::string const& type, + std::shared_ptr downloadDir, + std::shared_ptr archive = nullptr); + ~BestEffortBatchDownloadWork() = default; + + protected: + State onChildFailure() override; +}; + +} // namespace stellar \ No newline at end of file diff --git a/src/historywork/FetchRecentQsetsWork.cpp b/src/historywork/FetchRecentQsetsWork.cpp index 6b38200061..2aad41bd06 100644 --- a/src/historywork/FetchRecentQsetsWork.cpp +++ b/src/historywork/FetchRecentQsetsWork.cpp @@ -28,7 +28,7 @@ FetchRecentQsetsWork::doReset() mGetHistoryArchiveStateWork.reset(); mDownloadSCPMessagesWork.reset(); mDownloadDir = - std::make_unique(mApp.getTmpDirManager().tmpDir(getName())); + std::make_shared(mApp.getTmpDirManager().tmpDir(getName())); } BasicWork::State @@ -62,7 +62,7 @@ FetchRecentQsetsWork::doWork() firstSeq, lastSeq); auto range = CheckpointRange::inclusive(firstSeq, lastSeq, step); mDownloadSCPMessagesWork = addWork( - range, HISTORY_FILE_TYPE_SCP, *mDownloadDir); + range, HISTORY_FILE_TYPE_SCP, mDownloadDir); return State::WORK_RUNNING; } else if (mDownloadSCPMessagesWork->getState() != State::WORK_SUCCESS) diff --git a/src/historywork/FetchRecentQsetsWork.h b/src/historywork/FetchRecentQsetsWork.h index 168341f191..4c4c376180 100644 --- a/src/historywork/FetchRecentQsetsWork.h +++ b/src/historywork/FetchRecentQsetsWork.h @@ -15,7 +15,7 @@ class TmpDir; class FetchRecentQsetsWork : public Work { - std::unique_ptr mDownloadDir; + std::shared_ptr mDownloadDir; uint32_t mLedgerNum; std::shared_ptr mGetHistoryArchiveStateWork; std::shared_ptr mDownloadSCPMessagesWork; diff --git a/src/historywork/WriteVerifiedCheckpointHashesWork.cpp b/src/historywork/WriteVerifiedCheckpointHashesWork.cpp index 200c0f9e64..537fff8096 100644 --- a/src/historywork/WriteVerifiedCheckpointHashesWork.cpp +++ b/src/historywork/WriteVerifiedCheckpointHashesWork.cpp @@ -122,7 +122,7 @@ WriteVerifiedCheckpointHashesWork::yieldMoreWork() auto tmpDir = std::make_shared( mApp.getTmpDirManager().tmpDir("verify-" + checkpointStr)); auto getWork = std::make_shared( - mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, *tmpDir, mArchive); + mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, tmpDir, mArchive); // When we have a previous-work, we grab a future attached to the promise it // will fulfill when it runs. This promise might not have a value _yet_ but diff --git a/src/main/Config.cpp b/src/main/Config.cpp index 703eeba1c8..939e7974f2 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -1349,6 +1349,12 @@ Config::processConfig(std::shared_ptr t) throw std::invalid_argument("incomplete HISTORY block"); } } + else if (item.first == "SCP_HISTORY_ARCHIVES") + { + // TODO: Check for entries that aren't in HISTORY + // TODO: Deduplicate. Maybe just make SCP_HISTORY_ARCHIVES a set + SCP_HISTORY_ARCHIVES = readArray(item); + } else if (item.first == "DATABASE") { DATABASE = SecretValue{readString(item)}; diff --git a/src/main/Config.h b/src/main/Config.h index 1a2da8b65b..b5980c3e84 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -532,6 +532,10 @@ class Config : public std::enable_shared_from_this // History config std::map HISTORY; + // Archives to use for downloading SCP history + // TODO: Document in user facing documentation + std::vector SCP_HISTORY_ARCHIVES; + // Database config SecretValue DATABASE; diff --git a/src/work/BatchWork.cpp b/src/work/BatchWork.cpp index 1a2d4a16f7..cd9f76b70f 100644 --- a/src/work/BatchWork.cpp +++ b/src/work/BatchWork.cpp @@ -29,7 +29,7 @@ BatchWork::doWork() ZoneScoped; if (anyChildRaiseFailure()) { - return State::WORK_FAILURE; + return onChildFailure(); } // Clean up completed children @@ -83,4 +83,10 @@ BatchWork::addMoreWorkIfNeeded() mBatch.insert(std::make_pair(w->getName(), w)); } } + +BasicWork::State +BatchWork::onChildFailure() +{ + return State::WORK_FAILURE; +} } diff --git a/src/work/BatchWork.h b/src/work/BatchWork.h index 76195fc12d..71c0c0470f 100644 --- a/src/work/BatchWork.h +++ b/src/work/BatchWork.h @@ -41,5 +41,9 @@ class BatchWork : public Work virtual bool hasNext() const = 0; virtual std::shared_ptr yieldMoreWork() = 0; virtual void resetIter() = 0; + + // Function to call when child work fails. Returns the state that `doWork` + // should return. + virtual State onChildFailure(); }; } diff --git a/src/work/Work.cpp b/src/work/Work.cpp index da53ab5c0c..46d8729162 100644 --- a/src/work/Work.cpp +++ b/src/work/Work.cpp @@ -52,7 +52,22 @@ Work::onRun() if (mAbortChildrenButNotSelf) { // Stop whatever work was doing, just wait for children to abort - return onAbort() ? State::WORK_FAILURE : State::WORK_RUNNING; + if (onAbort()) + { + if (mReportSuccessOnAbort) + { + clearChildren(); + return State::WORK_SUCCESS; + } + else + { + return State::WORK_FAILURE; + } + } + else + { + return State::WORK_RUNNING; + } } auto child = yieldNextRunningChild(); @@ -85,6 +100,14 @@ Work::onRun() } } +void +Work::abortSuccess() +{ + mReportSuccessOnAbort = true; + mAbortChildrenButNotSelf = true; + shutdownChildren(); +} + bool Work::onAbort() { diff --git a/src/work/Work.h b/src/work/Work.h index 7c946f62df..845a523404 100644 --- a/src/work/Work.h +++ b/src/work/Work.h @@ -123,6 +123,9 @@ class Work : public BasicWork // Provide additional cleanup logic for reset virtual void doReset(); + // Abort, but report success + void abortSuccess(); + private: std::list> mChildren; std::list>::const_iterator mNextChild; @@ -136,6 +139,7 @@ class Work : public BasicWork void shutdownChildren(); bool mAbortChildrenButNotSelf{false}; + bool mReportSuccessOnAbort{false}; }; namespace WorkUtils