Skip to content

Commit

Permalink
Merge branch 'integrated_node'
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolse committed Dec 6, 2023
2 parents 5017da7 + 4ec35e1 commit 3b2bca2
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 222 deletions.
15 changes: 14 additions & 1 deletion core/block_crypt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3182,9 +3182,18 @@ namespace beam
m_Total = nTotal;
m_Last_ms = GetTime_ms();
LOG_INFO() << sz;
if (m_pExternal)
m_pExternal->Reset(sz, nTotal);
}

void LongAction::OnProgress(uint64_t pos)
void LongAction::SetTotal(uint64_t nTotal)
{
m_Total = nTotal;
if (m_pExternal)
m_pExternal->SetTotal(nTotal);
}

bool LongAction::OnProgress(uint64_t pos)
{
uint32_t dt_ms = GetTime_ms() - m_Last_ms;

Expand All @@ -3206,6 +3215,10 @@ namespace beam

LOG_INFO() << "\t" << nDone << "%...";
}
if (m_pExternal)
return m_pExternal->OnProgress(pos);

return true;
}

/////////////
Expand Down
24 changes: 17 additions & 7 deletions core/block_crypt.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,28 @@ namespace beam
uint32_t GetTime_ms(); // platform-independent GetTickCount
uint32_t GetTimeNnz_ms(); // guaranteed non-zero

struct LongAction
struct ILongAction
{
uint32_t m_Last_ms;
uint64_t m_Total;
virtual void Reset(const char*, uint64_t nTotal) = 0;
virtual void SetTotal(uint64_t nTotal) = 0;
virtual bool OnProgress(uint64_t pos) = 0;
};
struct LongAction : ILongAction
{
uint32_t m_Last_ms = 0;
uint64_t m_Total = 0;
ILongAction *m_pExternal = nullptr;

LongAction(const char* sz, uint64_t nTotal) {
LongAction(const char* sz, uint64_t nTotal, ILongAction *pExternal = nullptr)
: m_pExternal(pExternal)
{
Reset(sz, nTotal);
}
LongAction() {}
LongAction() = default;

void Reset(const char*, uint64_t nTotal);
void OnProgress(uint64_t pos);
void Reset(const char*, uint64_t nTotal) final;
void SetTotal(uint64_t nTotal) final;
bool OnProgress(uint64_t pos) final;
};

void HeightAdd(Height& trg, Height val); // saturates if overflow
Expand Down
14 changes: 11 additions & 3 deletions node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ void Node::Initialize(IExternalPOW* externalPOW)
m_Processor.m_ExecutorMT.set_Threads(std::max<uint32_t>(m_Cfg.m_VerificationThreads, 1U));

m_Processor.m_Horizon = m_Cfg.m_Horizon;
m_Processor.Initialize(m_Cfg.m_sPathLocal.c_str(), m_Cfg.m_ProcessorParams);
m_Processor.Initialize(m_Cfg.m_sPathLocal.c_str(), m_Cfg.m_ProcessorParams, m_Cfg.m_Observer ? m_Cfg.m_Observer->GetLongActionHandler() : nullptr);

if (m_Cfg.m_ProcessorParams.m_EraseSelfID)
{
Expand Down Expand Up @@ -1188,7 +1188,7 @@ void Node::AccountRefreshCtx::AddAccount(const Key::IPKdf::Ptr& pOwner, Key::IPK
{
if (d.m_pOwner && d.m_pOwner->IsSame(*pOwner))
{
d.m_pOwner = pOwner; // perfer this instance
d.m_pOwner = pOwner; // prefer this instance
return;
}

Expand Down Expand Up @@ -1335,7 +1335,15 @@ void Node::RefreshAccounts()
assert(nAdd <= accs.size());
LOG_INFO() << "Owned accounts added: " << nAdd;

m_Processor.RescanAccounts(nAdd);
try
{
m_Processor.RescanAccounts(nAdd);
}
catch (const std::runtime_error&)
{
m_Processor.RollbackDB();
throw;
}
}

std::ostringstream os;
Expand Down
13 changes: 7 additions & 6 deletions node/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ struct Node
virtual void OnStateChanged() {}
virtual void OnRolledBack(const Block::SystemState::ID& id) {};
virtual void InitializeUtxosProgress(uint64_t done, uint64_t total) {};
virtual ILongAction* GetLongActionHandler() { return nullptr; }

enum Error
{
enum Error
{
Unknown,
TimeDiffToLarge
};
};

virtual void OnSyncError(Error error = Unknown) {}
virtual void OnSyncError(Error error = Unknown) {}
};

struct Config
Expand Down Expand Up @@ -210,7 +211,7 @@ struct Node
} m_SyncStatus;

uint32_t get_AcessiblePeerCount() const; // all the peers with known addresses. Including temporarily banned
const PeerManager::AddrSet& get_AcessiblePeerAddrs() const;
const PeerManager::AddrSet& get_AcessiblePeerAddrs() const;

bool m_UpdatedFromPeers = false;
bool m_PostStartSynced = false;
Expand All @@ -226,7 +227,7 @@ struct Node

uint8_t OnTransaction(Transaction::Ptr&&, std::unique_ptr<Merkle::Hash>&&, const PeerID*, bool bFluff, std::ostream* pExtraInfo);

// for step-by-step tests
// for step-by-step tests
void GenerateFakeBlocks(uint32_t n);

TxPool::Fluff m_TxPool;
Expand Down
71 changes: 61 additions & 10 deletions node/node_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ namespace
auto nodePath = pathFromStdString(nodePathStr);
auto appDataPath = nodePath.parent_path();

if (!boost::filesystem::exists(appDataPath))
if (!boost::filesystem::exists(appDataPath) ||
!boost::filesystem::exists(nodePath))
{
return;
}
Expand Down Expand Up @@ -119,6 +120,11 @@ namespace beam
}
}

void NodeClient::setBeforeStartAction(std::function<void()> action)
{
m_beforeStartAction = std::move(action);
}

void NodeClient::setKdf(beam::Key::IKdf::Ptr kdf)
{
m_pKdf = kdf;
Expand Down Expand Up @@ -156,6 +162,10 @@ namespace beam
{
try
{
if (m_beforeStartAction)
{
m_beforeStartAction();
}
removeNodeDataIfNeeded(m_observer->getLocalNodeStorage());
Rules::Scope scopeRules(m_rules);
auto reactor = io::Reactor::create();
Expand Down Expand Up @@ -291,7 +301,7 @@ namespace beam

LOG_INFO() << "starting a node on " << node.m_Cfg.m_Listen.port() << " port...";

class MyObserver final : public Node::IObserver
class MyObserver final : public Node::IObserver, public ILongAction
{
public:
MyObserver(Node& node, NodeClient& model)
Expand Down Expand Up @@ -321,14 +331,7 @@ namespace beam
m_model.m_observer->onStartedNode();
}

// make sure no overflow during conversion from SyncStatus to int,int.
const auto threshold = static_cast<unsigned int>(std::numeric_limits<int>::max());
while (s.m_Total > threshold)
{
s.m_Total >>= 1;
s.m_Done >>= 1;
}

AdjustProgress(s.m_Done, s.m_Total);
m_model.m_observer->onSyncProgressUpdated(static_cast<int>(s.m_Done), static_cast<int>(s.m_Total));
}

Expand All @@ -342,10 +345,58 @@ namespace beam
m_model.m_observer->onInitProgressUpdated(done, total);
}

ILongAction* GetLongActionHandler() override
{
return this;
}

void Reset(const char* sz, uint64_t nTotal) override
{
SetTotal(nTotal);
m_Last_ms = GetTime_ms();
}

void SetTotal(uint64_t nTotal) override
{
m_Total = nTotal;
}

bool OnProgress(uint64_t pos) override
{
if (m_model.m_shouldTerminateModel)
{
return false;
}
uint32_t dt_ms = GetTime_ms() - m_Last_ms;
const uint32_t nWindow_ms = 1000; // 1 sec
uint32_t n = dt_ms / nWindow_ms;
if (n)
{
m_Last_ms += n * nWindow_ms;
uint64_t total = m_Total;
AdjustProgress(pos, total);
m_model.m_observer->onSyncProgressUpdated(static_cast<int>(pos), static_cast<int>(total));
}
return true;
}
private:
void AdjustProgress(uint64_t& done, uint64_t& total)
{
// make sure no overflow during conversion from SyncStatus to int,int.
constexpr auto threshold = static_cast<unsigned int>(std::numeric_limits<int>::max());
while (total > threshold)
{
total >>= 1;
done >>= 1;
}
}

private:
Node& m_node;
NodeClient& m_model;
Height m_Done0 = MaxHeight;
uint64_t m_Total = 0;
uint32_t m_Last_ms = 0;
bool m_reportedStarted = false;
} obs(node, *this);

Expand Down
3 changes: 2 additions & 1 deletion node/node_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace beam
public:
NodeClient(const Rules& rules, INodeClientObserver* observer);
~NodeClient();

void setBeforeStartAction(std::function<void()> action);
void setKdf(beam::Key::IKdf::Ptr);
void setOwnerKey(beam::Key::IPKdf::Ptr);
void startNode();
Expand All @@ -78,5 +78,6 @@ namespace beam
Key::IKdf::Ptr m_pKdf;
Key::IPKdf::Ptr m_ownerKey;
io::Timer::Ptr m_timer;
std::function<void()> m_beforeStartAction;
};
}
Loading

0 comments on commit 3b2bca2

Please sign in to comment.