Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Recover SCP messages when replaying from history #4156

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Builds/VisualStudio/stellar-core.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ exit /b 0
<ClCompile Include="..\..\src\herder\TxSetUtils.cpp" />
<ClCompile Include="..\..\src\herder\Upgrades.cpp" />
<ClCompile Include="..\..\src\historywork\BatchDownloadWork.cpp" />
<ClCompile Include="..\..\src\historywork\BestEffortBatchDownloadWork.cpp" />
<ClCompile Include="..\..\src\historywork\CheckSingleLedgerHeaderWork.cpp" />
<ClCompile Include="..\..\src\historywork\DownloadBucketsWork.cpp" />
<ClCompile Include="..\..\src\historywork\DownloadVerifyTxResultsWork.cpp" />
Expand Down Expand Up @@ -938,6 +939,7 @@ exit /b 0
<ClInclude Include="..\..\src\herder\TxSetUtils.h" />
<ClInclude Include="..\..\src\herder\Upgrades.h" />
<ClInclude Include="..\..\src\historywork\BatchDownloadWork.h" />
<ClInclude Include="..\..\src\historywork\BestEffortBatchDownloadWork.h" />
<ClInclude Include="..\..\src\historywork\CheckSingleLedgerHeaderWork.h" />
<ClInclude Include="..\..\src\historywork\DownloadBucketsWork.h" />
<ClInclude Include="..\..\src\historywork\DownloadVerifyTxResultsWork.h" />
Expand Down
6 changes: 6 additions & 0 deletions Builds/VisualStudio/stellar-core.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,9 @@
<ClCompile Include="..\..\src\historywork\BatchDownloadWork.cpp">
<Filter>historyWork</Filter>
</ClCompile>
<ClCompile Include="..\..\src\historywork\BestEffortBatchDownloadWork.cpp">
<Filter>historyWork</Filter>
</ClCompile>
<ClCompile Include="..\..\src\historywork\CheckSingleLedgerHeaderWork.cpp">
<Filter>historyWork</Filter>
</ClCompile>
Expand Down Expand Up @@ -1916,6 +1919,9 @@
<ClInclude Include="..\..\src\historywork\BatchDownloadWork.h">
<Filter>historyWork</Filter>
</ClInclude>
<ClInclude Include="..\..\src\historywork\BestEffortBatchDownloadWork.h">
<Filter>historyWork</Filter>
</ClInclude>
<ClInclude Include="..\..\src\historywork\CheckSingleLedgerHeaderWork.h">
<Filter>historyWork</Filter>
</ClInclude>
Expand Down
4 changes: 3 additions & 1 deletion src/catchup/ApplyBufferedLedgersWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ ApplyBufferedLedgersWork::onRun()
lcd.getTxSet()->sizeTxTotal(), lcd.getTxSet()->sizeOpTotalForLogging(),
stellarValueToString(mApp.getConfig(), lcd.getValue()));

auto applyLedger = std::make_shared<ApplyLedgerWork>(mApp, lcd);
// Pass `nullptr` for `hEntries` because SCP messages of buffered ledgers
// have already been logged.
auto applyLedger = std::make_shared<ApplyLedgerWork>(mApp, lcd, nullptr);

auto predicate = [](Application& app) {
auto& bl = app.getBucketManager().getBucketList();
Expand Down
88 changes: 81 additions & 7 deletions src/catchup/ApplyCheckpointWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@
namespace stellar
{

ApplyCheckpointWork::ApplyCheckpointWork(Application& app,
TmpDir const& downloadDir,
LedgerRange const& range,
OnFailureCallback cb)
ApplyCheckpointWork::ApplyCheckpointWork(
Application& app, TmpDir const& downloadDir, LedgerRange const& range,
OnFailureCallback cb, std::shared_ptr<TmpDirVec const>& scpDownloadDirs)
: BasicWork(app,
"apply-ledgers-" + fmt::format(FMT_STRING("{}-{}"),
range.mFirst, range.limit()),
Expand All @@ -36,6 +35,7 @@ ApplyCheckpointWork::ApplyCheckpointWork(Application& app,
, mLedgerRange(range)
, mCheckpoint(
app.getHistoryManager().checkpointContainingLedger(range.mFirst))
, mSCPDownloadDirs(scpDownloadDirs)
, mOnFailure(cb)
{
// Ledger range check to enforce application of a single checkpoint
Expand Down Expand Up @@ -69,6 +69,11 @@ ApplyCheckpointWork::closeFiles()
{
mHdrIn.close();
mTxIn.close();
for (auto& scpInfo : mSCPCheckpointInfo)
{
scpInfo.scpHistoryIn.close();
}
mSCPCheckpointInfo.clear();
mFilesOpen = false;
}

Expand All @@ -83,8 +88,7 @@ void
ApplyCheckpointWork::openInputFiles()
{
ZoneScoped;
mHdrIn.close();
mTxIn.close();
closeFiles();
FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, mCheckpoint);
FileTransferInfo ti(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS,
mCheckpoint);
Expand All @@ -95,6 +99,32 @@ ApplyCheckpointWork::openInputFiles()
mTxIn.open(ti.localPath_nogz());
mTxHistoryEntry = TransactionHistoryEntry();
mHeaderHistoryEntry = LedgerHeaderHistoryEntry();

if (mSCPDownloadDirs)
{
// Initialize `SCPCheckpointInfo`s for each SCP history download
// directory.
for (auto const& scpDir : *mSCPDownloadDirs)
{
FileTransferInfo si(*scpDir, HISTORY_FILE_TYPE_SCP, mCheckpoint);
CLOG_DEBUG(History, "Saving SCP messages from {}",
si.localPath_nogz());
mSCPCheckpointInfo.emplace_back();
SCPCheckpointInfo& scpInfo = mSCPCheckpointInfo.back();
scpInfo.scpHistoryEntry = std::make_shared<SCPHistoryEntry>();
try
{
scpInfo.scpHistoryIn.open(si.localPath_nogz());
}
catch (FileSystemException const&)
{
// File doesn't exist for this checkpoint. That's ok. Skip it.
mSCPCheckpointInfo.pop_back();
continue;
}
}
}

mFilesOpen = true;
}

Expand Down Expand Up @@ -251,6 +281,49 @@ ApplyCheckpointWork::getNextLedgerCloseData()
std::make_optional<Hash>(mHeaderHistoryEntry.hash));
}

std::unique_ptr<SCPHistoryEntryVec>
ApplyCheckpointWork::getNextSCPHistoryEntries()
{
ZoneScoped;
auto ret = std::make_unique<SCPHistoryEntryVec>();
uint32_t ledgerSeq = mApp.getLedgerManager().getLastClosedLedgerNum();
for (SCPCheckpointInfo& info : mSCPCheckpointInfo)
{
XDRInputFileStream& in = info.scpHistoryIn;
std::shared_ptr<SCPHistoryEntry>& entry = info.scpHistoryEntry;
do
{
uint32 scpHistSeq = entry->v0().ledgerMessages.ledgerSeq;

if (scpHistSeq <= ledgerSeq)
{
// Catching up to `ledgerSeq + 1`
CLOG_DEBUG(History, "Skipping SCP messages for ledger {}",
scpHistSeq);
}
else if (scpHistSeq == ledgerSeq + 1)
{
// Caught up
CLOG_DEBUG(History, "Loaded SCP messages for ledger {}",
ledgerSeq);
ret->push_back(entry);
break;
}
else
{
// Ahead. This archive does not have messages for `ledgerSeq+1`
// TODO: Log which archive is missing the messages (also applies
// to other logging statements in this function)
CLOG_WARNING(History,
"Archive missing SCP messages for ledger {}",
scpHistSeq);
break;
}
} while (in && in.readOne(*entry));
}
return ret;
}

BasicWork::State
ApplyCheckpointWork::onRun()
{
Expand Down Expand Up @@ -308,7 +381,8 @@ ApplyCheckpointWork::onRun()
return State::WORK_RUNNING;
}

auto applyLedger = std::make_shared<ApplyLedgerWork>(mApp, *lcd);
auto applyLedger = std::make_shared<ApplyLedgerWork>(
mApp, *lcd, getNextSCPHistoryEntries());

auto predicate = [](Application& app) {
auto& bl = app.getBucketManager().getBucketList();
Expand Down
27 changes: 26 additions & 1 deletion src/catchup/ApplyCheckpointWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once

#include "catchup/ApplyLedgerWork.h"
#include "herder/LedgerCloseData.h"
#include "herder/TxSetFrame.h"
#include "history/HistoryArchive.h"
Expand All @@ -20,6 +21,18 @@ namespace stellar
class TmpDir;
struct LedgerHeaderHistoryEntry;

using TmpDirVec = std::vector<std::shared_ptr<TmpDir const>>;

// This struct stores information about SCP messages in a single history
// checkpoint
struct SCPCheckpointInfo
{
// Input stream holding `SCPHistoryEntry`s
XDRInputFileStream scpHistoryIn;
// Most recent SCP history entry read from `scpHistoryIn`
std::shared_ptr<SCPHistoryEntry> scpHistoryEntry;
};

/**
* This class is responsible for applying transactions stored in files on
* temporary directory (downloadDir) to local ledger. It requires two sets of
Expand All @@ -45,11 +58,17 @@ class ApplyCheckpointWork : public BasicWork
TmpDir const& mDownloadDir;
LedgerRange const mLedgerRange;
uint32_t const mCheckpoint;
// The directories containing downloaded SCP history. May be null if no such
// directories exist.
std::shared_ptr<TmpDirVec const> mSCPDownloadDirs;

XDRInputFileStream mHdrIn;
XDRInputFileStream mTxIn;
TransactionHistoryEntry mTxHistoryEntry;
LedgerHeaderHistoryEntry mHeaderHistoryEntry;
// Vector containing each archive's SCP messages for the current checkpoint
// being processed
std::vector<SCPCheckpointInfo> mSCPCheckpointInfo;
OnFailureCallback mOnFailure;

bool mFilesOpen{false};
Expand All @@ -61,11 +80,17 @@ class ApplyCheckpointWork : public BasicWork

std::shared_ptr<LedgerCloseData> getNextLedgerCloseData();

// Returns a vector holding SCP messages from each archive for the ledger
// being processed. This vector may be smaller than the total number of
// archives if some archives did not contain messages for the ledger.
std::unique_ptr<SCPHistoryEntryVec> getNextSCPHistoryEntries();

void closeFiles();

public:
ApplyCheckpointWork(Application& app, TmpDir const& downloadDir,
LedgerRange const& range, OnFailureCallback cb);
LedgerRange const& range, OnFailureCallback cb,
std::shared_ptr<TmpDirVec const>& scpDownloadDirs);
~ApplyCheckpointWork() = default;
std::string getStatus() const override;
void onFailureRaise() override;
Expand Down
13 changes: 11 additions & 2 deletions src/catchup/ApplyLedgerWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "catchup/ApplyLedgerWork.h"
#include "herder/HerderPersistence.h"
#include "ledger/LedgerManager.h"
#include "main/Application.h"
#include <Tracy.hpp>
#include <fmt/format.h>

namespace stellar
{
ApplyLedgerWork::ApplyLedgerWork(Application& app,
LedgerCloseData const& ledgerCloseData)
ApplyLedgerWork::ApplyLedgerWork(
Application& app, LedgerCloseData const& ledgerCloseData,
std::unique_ptr<SCPHistoryEntryVec const> hEntries)
: BasicWork(
app, "apply-ledger-" + std::to_string(ledgerCloseData.getLedgerSeq()),
BasicWork::RETRY_NEVER)
, mApp(app)
, mLedgerCloseData(ledgerCloseData)
, mHEntries(std::move(hEntries))
{
}

Expand All @@ -24,6 +28,11 @@ ApplyLedgerWork::onRun()
{
ZoneScoped;
mApp.getLedgerManager().closeLedger(mLedgerCloseData);
if (mHEntries)
{
mApp.getHerderPersistence().copySCPHistoryFromEntries(
*mHEntries, mLedgerCloseData.getLedgerSeq());
}
return BasicWork::State::WORK_SUCCESS;
}

Expand Down
9 changes: 8 additions & 1 deletion src/catchup/ApplyLedgerWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@

#include "herder/LedgerCloseData.h"
#include "work/Work.h"
#include <memory>

namespace stellar
{

using SCPHistoryEntryVec = std::vector<std::shared_ptr<SCPHistoryEntry>>;

class ApplyLedgerWork : public BasicWork
{
Application& mApp;
LedgerCloseData const mLedgerCloseData;
// SCP messages for the ledger to be applied
std::unique_ptr<SCPHistoryEntryVec const> mHEntries;

public:
ApplyLedgerWork(Application& app, LedgerCloseData const& ledgerCloseData);
ApplyLedgerWork(Application& app, LedgerCloseData const& ledgerCloseData,
std::unique_ptr<SCPHistoryEntryVec const> hEntries);

std::string getStatus() const override;

Expand Down
35 changes: 31 additions & 4 deletions src/catchup/CatchupWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
#include "catchup/VerifyLedgerChainWork.h"
#include "herder/Herder.h"
#include "history/FileTransferInfo.h"
#include "history/HistoryArchiveManager.h"
#include "history/HistoryManager.h"
#include "historywork/BatchDownloadWork.h"
#include "historywork/BestEffortBatchDownloadWork.h"
#include "historywork/DownloadBucketsWork.h"
#include "historywork/DownloadVerifyTxResultsWork.h"
#include "historywork/GetAndUnzipRemoteFileWork.h"
Expand Down Expand Up @@ -81,7 +83,7 @@ CatchupWork::CatchupWork(Application& app,
std::shared_ptr<HistoryArchive> archive)
: Work(app, "catchup", BasicWork::RETRY_NEVER)
, mLocalState{app.getLedgerManager().getLastClosedLedgerHAS()}
, mDownloadDir{std::make_unique<TmpDir>(
, mDownloadDir{std::make_shared<TmpDir>(
mApp.getTmpDirManager().tmpDir(getName()))}
, mCatchupConfiguration{catchupConfiguration}
, mArchive{archive}
Expand Down Expand Up @@ -150,6 +152,7 @@ CatchupWork::doReset()
mHAS.reset();
mBucketHAS.reset();
mRetainedBuckets.clear();
mSCPDownloadDirs->clear();
}

void
Expand All @@ -166,7 +169,7 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange,
// Batch download has default retries ("a few") to ensure we rotate through
// archives
auto getLedgers = std::make_shared<BatchDownloadWork>(
mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, *mDownloadDir,
mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, mDownloadDir,
mArchive);
mRangeEndPromise = std::promise<LedgerNumHashPair>();
mRangeEndFuture = mRangeEndPromise.get_future().share();
Expand All @@ -179,9 +182,32 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange,
mApp, *mDownloadDir, verifyRange, mLastClosedLedgerHashPair,
mRangeEndFuture, std::move(fatalFailurePromise));

std::vector<std::shared_ptr<BasicWork>> seq{getLedgers, mVerifyLedgers};

Config const& cfg = mApp.getConfig();
// TODO: Test with this flag set to `false`. An empty `mSCPDownloadDirs`
// should prevent any further stages from running.
if (cfg.MODE_STORES_HISTORY_MISC)
{
for (std::string const& scpHistoryArchive : cfg.SCP_HISTORY_ARCHIVES)
{
CLOG_DEBUG(History, "Downloading SCP history from {}",
scpHistoryArchive);
std::shared_ptr<HistoryArchive> scpArchive =
mApp.getHistoryArchiveManager().getHistoryArchive(
scpHistoryArchive);

auto scpDownloadDir =
std::make_shared<TmpDir>(mApp.getTmpDirManager().tmpDir(
"scp-history-" + scpHistoryArchive));
mSCPDownloadDirs->push_back(scpDownloadDir);
seq.emplace_back(std::make_shared<BestEffortBatchDownloadWork>(
mApp, checkpointRange, HISTORY_FILE_TYPE_SCP, scpDownloadDir,
scpArchive));
}
}
// Never retry the sequence: downloads already have retries, and there's no
// point retrying verification
std::vector<std::shared_ptr<BasicWork>> seq{getLedgers, mVerifyLedgers};
mDownloadVerifyLedgersSeq = addWork<WorkSequence>(
"download-verify-ledgers-seq", seq, BasicWork::RETRY_NEVER);
mCurrentWork = mDownloadVerifyLedgersSeq;
Expand Down Expand Up @@ -305,7 +331,8 @@ CatchupWork::downloadApplyTransactions(CatchupRange const& catchupRange)
auto waitForPublish = mCatchupConfiguration.offline();
auto range = catchupRange.getReplayRange();
mTransactionsVerifyApplySeq = std::make_shared<DownloadApplyTxsWork>(
mApp, *mDownloadDir, range, mLastApplied, waitForPublish, mArchive);
mApp, *mDownloadDir, range, mLastApplied, waitForPublish,
mSCPDownloadDirs, mArchive);
}

BasicWork::State
Expand Down
Loading