From aaa728218d0e1fa01ddea778643f87d51d2af75d Mon Sep 17 00:00:00 2001 From: Stephan Hageboeck Date: Thu, 5 Feb 2026 14:07:55 +0100 Subject: [PATCH 1/2] [RDF] Add the total number of events to RSampleInfo. When samples are split in multiple ranges, the progress bar takes very long to find out the total number of events. By adding tree->GetEntries() (and the RNTuple equivalent), the total number of events is known as soon as a file is opened. --- tree/dataframe/inc/ROOT/RDF/RSampleInfo.hxx | 12 +++++++++--- tree/dataframe/inc/ROOT/RDFHelpers.hxx | 3 +-- tree/dataframe/src/RLoopManager.cxx | 4 ++-- tree/dataframe/src/RNTupleDS.cxx | 5 +++-- tree/dataframe/src/RTTreeDS.cxx | 4 ++-- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/tree/dataframe/inc/ROOT/RDF/RSampleInfo.hxx b/tree/dataframe/inc/ROOT/RDF/RSampleInfo.hxx index ca7d0458210f8..ac6d89a029d2a 100644 --- a/tree/dataframe/inc/ROOT/RDF/RSampleInfo.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RSampleInfo.hxx @@ -37,6 +37,7 @@ class RSampleInfo { std::pair fEntryRange; const ROOT::RDF::Experimental::RSample *fSample = nullptr; // non-owning + ULong64_t fTotalEntries = 0; // Number of entries in current file (if known). void ThrowIfNoSample() const { @@ -48,8 +49,8 @@ class RSampleInfo { public: RSampleInfo(std::string_view id, std::pair entryRange, - const ROOT::RDF::Experimental::RSample *sample = nullptr) - : fID(id), fEntryRange(entryRange), fSample(sample) + const ROOT::RDF::Experimental::RSample *sample = nullptr, ULong64_t totalEntries = 0) + : fID(id), fEntryRange(entryRange), fSample(sample), fTotalEntries{totalEntries} { } RSampleInfo() = default; @@ -121,9 +122,14 @@ public: /// Multiple multi-threading tasks might process different entry ranges of the same sample. std::pair EntryRange() const { return fEntryRange; } - /// @brief Return the number of entries of this sample that is being taken into consideration. + /// @brief Return the number of entries of this range of the sample. ULong64_t NEntries() const { return fEntryRange.second - fEntryRange.first; } + /// Return the total number of entries in the underlying dataset. + /// If the total number of entries is not known, the end of the current range is returned. + /// This can be larger than NEntries() if the sample is split in multiple ranges. + ULong64_t NEntriesTotal() const { return std::max(fTotalEntries, fEntryRange.second); } + bool operator==(const RSampleInfo &other) const { return fID == other.fID; } bool operator!=(const RSampleInfo &other) const { return !(*this == other); } }; diff --git a/tree/dataframe/inc/ROOT/RDFHelpers.hxx b/tree/dataframe/inc/ROOT/RDFHelpers.hxx index bbab81bd27355..826c155ab678a 100644 --- a/tree/dataframe/inc/ROOT/RDFHelpers.hxx +++ b/tree/dataframe/inc/ROOT/RDFHelpers.hxx @@ -391,8 +391,7 @@ public: void registerNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id) { std::lock_guard lock(fSampleNameToEventEntriesMutex); - fSampleNameToEventEntries[id.AsString()] = - std::max(id.EntryRange().second, fSampleNameToEventEntries[id.AsString()]); + fSampleNameToEventEntries[id.AsString()] = std::max(id.NEntriesTotal(), fSampleNameToEventEntries[id.AsString()]); } /// Thread-safe callback for RDataFrame. diff --git a/tree/dataframe/src/RLoopManager.cxx b/tree/dataframe/src/RLoopManager.cxx index 3f533303b1409..96986f60b49a4 100644 --- a/tree/dataframe/src/RLoopManager.cxx +++ b/tree/dataframe/src/RLoopManager.cxx @@ -739,11 +739,11 @@ void RLoopManager::UpdateSampleInfo(unsigned int slot, TTreeReader &r) { // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/' const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename; if (fSampleMap.empty()) { - fSampleInfos[slot] = RSampleInfo(id, range); + fSampleInfos[slot] = RSampleInfo(id, range, nullptr, tree->GetEntries()); } else { if (fSampleMap.find(id) == fSampleMap.end()) throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples."); - fSampleInfos[slot] = RSampleInfo(id, range, fSampleMap[id]); + fSampleInfos[slot] = RSampleInfo(id, range, fSampleMap[id], tree->GetEntries()); } } diff --git a/tree/dataframe/src/RNTupleDS.cxx b/tree/dataframe/src/RNTupleDS.cxx index d887bb7f43ce2..db1f3aa17b568 100644 --- a/tree/dataframe/src/RNTupleDS.cxx +++ b/tree/dataframe/src/RNTupleDS.cxx @@ -917,14 +917,15 @@ ROOT::RDF::RSampleInfo ROOT::Internal::RDF::RNTupleDS::CreateSampleInfo( if (sampleMap.empty()) return ROOT::RDF::RSampleInfo( - ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry)); + ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry), nullptr, + fPrincipalDescriptor.GetNEntries()); if (sampleMap.find(ntupleID) == sampleMap.end()) throw std::runtime_error("Full sample identifier '" + ntupleID + "' cannot be found in the available samples."); return ROOT::RDF::RSampleInfo( ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry), - sampleMap.at(ntupleID)); + sampleMap.at(ntupleID), fPrincipalDescriptor.GetNEntries()); } ROOT::RDataFrame ROOT::Internal::RDF::FromRNTuple(std::string_view ntupleName, diff --git a/tree/dataframe/src/RTTreeDS.cxx b/tree/dataframe/src/RTTreeDS.cxx index 824ff39c565a4..1a401ea495810 100644 --- a/tree/dataframe/src/RTTreeDS.cxx +++ b/tree/dataframe/src/RTTreeDS.cxx @@ -378,11 +378,11 @@ ROOT::RDF::RSampleInfo ROOT::Internal::RDF::RTTreeDS::CreateSampleInfo( // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/' const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename; if (sampleMap.empty()) { - return RSampleInfo(id, range); + return RSampleInfo(id, range, nullptr, tree->GetEntries()); } else { if (sampleMap.find(id) == sampleMap.end()) throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples."); - return RSampleInfo(id, range, sampleMap.at(id)); + return RSampleInfo(id, range, sampleMap.at(id), tree->GetEntries()); } } From f67256480858abe4b508a362a2f3d865a9a6597d Mon Sep 17 00:00:00 2001 From: Stephan Hageboeck Date: Tue, 10 Feb 2026 11:52:15 +0100 Subject: [PATCH 2/2] [RDF] Refactor progress bar and shorten its output. This removes redundant information from the line printed to the terminal and shortens the output. Otherwise, the progress bar frequently overflows the terminal. When the progress bar prints to a file, reduce the frequency to every 10 seconds, and limit its width to 60 chars to avoid cluttering the file. Furthermore, significantly improve how completion is estimated. The following two heuristics are employed: - Files that have been opened count as fractionOfFilesAlreadyOpened * eventsProcessed/totalEvents This tracks the progress of all files for which the number of events has been seen. - Files that have not been opened count as 1/totalFiles until they have been opened. This means that the progress bar e.g. can't reach 50% if half of the files haven't been opened yet. This change significantly reduces the jumps of the progress bar when a new file is opened. Finally, the code was refactored: - Handle locking logic for updates with a single RAII, update locks to C++17. - Remove a lock and mutex that didn't have any effect. - Reduce repeated function calls to functions that hold locks. - Simplify computation of average number of events. - Relax memory order of the atomics to what's necessary. - Outline as many functions as possible. Since RDFHelpers.hxx goes through JITting and pcms, it gets compiled frequently, so outlining is probably beneficial. - Make a lot of members const to avoid unintenional modifications. This makes it clear that no mutex needs to be locked to read these. - Collect helper functions in one anonymous namespace. - Remove a constructor argument that was ignored. --- tree/dataframe/inc/ROOT/RDFHelpers.hxx | 133 +++------- tree/dataframe/src/RDFHelpers.cxx | 323 +++++++++++++++---------- 2 files changed, 232 insertions(+), 224 deletions(-) diff --git a/tree/dataframe/inc/ROOT/RDFHelpers.hxx b/tree/dataframe/inc/ROOT/RDFHelpers.hxx index 826c155ab678a..65575a79ca7c3 100644 --- a/tree/dataframe/inc/ROOT/RDFHelpers.hxx +++ b/tree/dataframe/inc/ROOT/RDFHelpers.hxx @@ -310,8 +310,6 @@ void AddProgressBar(ROOT::RDataFrame df); /// @param nThread Number of threads that share a TH3D. void ThreadsPerTH3(unsigned int nThread = 1); -class ProgressBarAction; - /// RDF progress helper. /// This class provides callback functions to the RDataFrame. The event statistics /// (including elapsed time, currently processed file, currently processed events, the rate of event processing @@ -333,131 +331,62 @@ class ProgressBarAction; /// ~~~ class ProgressHelper { private: + std::size_t ComputeTotalEvents() const; double EvtPerSec() const; + void PrintProgressAndStats(std::ostream &stream, std::size_t currentEventCount, + std::chrono::seconds totalElapsedSeconds) const; std::pair RecordEvtCountAndTime(); - void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const; - void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const; - void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const; + void Update(); - std::chrono::time_point fBeginTime = std::chrono::system_clock::now(); - std::chrono::time_point fLastPrintTime = fBeginTime; - std::chrono::seconds fPrintInterval{1}; + bool const fIsTTY; + bool const fUseShellColours; std::atomic fProcessedEvents{0}; std::size_t fLastProcessedEvents{0}; - std::size_t fIncrement; - - mutable std::mutex fSampleNameToEventEntriesMutex; - std::map fSampleNameToEventEntries; // Filename, events in the file + std::size_t const fIncrement; + unsigned int const fNColumns; + unsigned int const fTotalFiles; - std::array fEventsPerSecondStatistics; - std::size_t fEventsPerSecondStatisticsIndex{0}; + std::array fEventsPerSecondStatistics; + unsigned int fEventsPerSecondStatisticsCounter{0}; - unsigned int fBarWidth; - unsigned int fTotalFiles; + std::chrono::time_point const fBeginTime = std::chrono::system_clock::now(); + std::chrono::time_point fLastPrintTime = fBeginTime; + std::chrono::seconds const fPrintInterval; - std::mutex fPrintMutex; - bool fIsTTY; - bool fUseShellColours; + // Mutex to ensure that only one thread updates the progress bar. + // Lock this mutex to update any of the members above: + std::mutex fUpdateMutex; - std::shared_ptr fTree{nullptr}; + mutable std::mutex fSampleNameToEventEntriesMutex; // Mutex to protect access to the below map + std::map fSampleNameToEventEntries; // Filename, events in the file public: /// Create a progress helper. /// \param increment RDF callbacks are called every `n` events. Pass this `n` here. - /// \param totalFiles read total number of files in the RDF. - /// \param progressBarWidth Number of characters the progress bar will occupy. - /// \param printInterval Update every stats every `n` seconds. + /// \param totalFiles number of files read in the RDF. + /// \param printInterval Update stats every `n` seconds. /// \param useColors Use shell colour codes to colour the output. Automatically disabled when /// we are not writing to a tty. - ProgressHelper(std::size_t increment, unsigned int totalFiles = 1, unsigned int progressBarWidth = 40, - unsigned int printInterval = 1, bool useColors = true); - + ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int printInterval = 0, + bool useColors = true); + ProgressHelper(ProgressHelper const &) = delete; // The mutexes and atomics won't allow copy/move + ProgressHelper(ProgressHelper &&) = delete; ~ProgressHelper() = default; + ProgressHelper &operator=(ProgressHelper const &) = delete; + ProgressHelper &operator=(ProgressHelper &&) = delete; - friend class ProgressBarAction; - - /// Register a new sample for completion statistics. - /// \see ROOT::RDF::RInterface::DefinePerSample(). - /// The *id.AsString()* refers to the name of the currently processed file. - /// The idea is to populate the event entries in the *fSampleNameToEventEntries* map - /// by selecting the greater of the two values: - /// *id.EntryRange().second* which is the upper event entry range of the processed sample - /// and the current value of the event entries in the *fSampleNameToEventEntries* map. - /// In the single threaded case, the two numbers are the same as the entry range corresponds - /// to the number of events in an individual file (each sample is simply a single file). - /// In the multithreaded case, the idea is to accumulate the higher event entry value until - /// the total number of events in a given file is reached. - void registerNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id) - { - std::lock_guard lock(fSampleNameToEventEntriesMutex); - fSampleNameToEventEntries[id.AsString()] = std::max(id.NEntriesTotal(), fSampleNameToEventEntries[id.AsString()]); - } + void RegisterNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id); /// Thread-safe callback for RDataFrame. /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the /// fPrintInterval). \param slot Ignored. \param value Ignored. template - void operator()(unsigned int /*slot*/, T &value) - { - operator()(value); - } - // clang-format off - /// Thread-safe callback for RDataFrame. - /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the fPrintInterval). - /// \param value Ignored. - // clang-format on - template - void operator()(T & /*value*/) - { - using namespace std::chrono; - // *************************************************** - // Warning: Here, everything needs to be thread safe: - // *************************************************** - fProcessedEvents += fIncrement; - - // We only print every n seconds. - if (duration_cast(system_clock::now() - fLastPrintTime) < fPrintInterval) { - return; - } - - // *************************************************** - // Protected by lock from here: - // *************************************************** - if (!fPrintMutex.try_lock()) - return; - std::lock_guard lockGuard(fPrintMutex, std::adopt_lock); - - std::size_t eventCount; - seconds elapsedSeconds; - std::tie(eventCount, elapsedSeconds) = RecordEvtCountAndTime(); - - if (fIsTTY) - std::cout << "\r"; - - PrintProgressBar(std::cout, eventCount); - PrintStats(std::cout, eventCount, elapsedSeconds); - - if (fIsTTY) - std::cout << std::flush; - else - std::cout << std::endl; - } - - std::size_t ComputeNEventsSoFar() const - { - std::unique_lock lock(fSampleNameToEventEntriesMutex); - std::size_t result = 0; - for (const auto &item : fSampleNameToEventEntries) - result += item.second; - return result; - } - - unsigned int ComputeCurrentFileIdx() const + void operator()(unsigned int /*slot*/, T & /*value*/) { - std::unique_lock lock(fSampleNameToEventEntriesMutex); - return fSampleNameToEventEntries.size(); + Update(); } + void PrintStatsFinal() const; }; } // namespace Experimental } // namespace RDF diff --git a/tree/dataframe/src/RDFHelpers.cxx b/tree/dataframe/src/RDFHelpers.cxx index 8ab92ea4ca5c4..f285a03de1ebd 100644 --- a/tree/dataframe/src/RDFHelpers.cxx +++ b/tree/dataframe/src/RDFHelpers.cxx @@ -32,11 +32,7 @@ #include #include -// TODO, this function should be part of core libraries #include -#if (!defined(_WIN32)) && (!defined(_WIN64)) -#include -#endif #if defined(_WIN32) || defined(_WIN64) #define WIN32_LEAN_AND_MEAN @@ -45,11 +41,14 @@ #include #else #include +#include #endif class TTreeReader; +namespace { // Get terminal size for progress bar +// TODO: Put this in core libraries? int get_tty_size() { #if defined(_WIN32) || defined(_WIN64) @@ -69,6 +68,30 @@ int get_tty_size() #endif } +/// Restore an output stream to its previous state using RAII. +struct RestoreStreamState { + RestoreStreamState(std::ostream &stream) : fStream(stream) { fPreservedState.copyfmt(stream); } + ~RestoreStreamState() { fStream.copyfmt(fPreservedState); } + + std::ostream &fStream; + std::ios fPreservedState{nullptr}; +}; +} // namespace + +/// Format std::chrono::seconds as `1:30m`. +std::ostream &operator<<(std::ostream &stream, std::chrono::seconds elapsedSeconds) +{ + RestoreStreamState restore(stream); + auto h = std::chrono::duration_cast(elapsedSeconds); + auto m = std::chrono::duration_cast(elapsedSeconds - h); + auto s = (elapsedSeconds - h - m).count(); + + if (h.count() > 0) + stream << h.count() << ':' << std::setw(2) << std::right << std::setfill('0'); + stream << m.count() << ':' << std::setw(2) << std::right << std::setfill('0') << s; + return stream << (h.count() > 0 ? 'h' : 'm'); +} + using ROOT::RDF::RResultHandle; unsigned int ROOT::RDF::RunGraphs(std::vector handles) @@ -148,40 +171,73 @@ void ThreadsPerTH3(unsigned int N) ROOT::Internal::RDF::NThreadPerTH3() = N; } -ProgressHelper::ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int progressBarWidth, - unsigned int printInterval, bool useColors) - : fPrintInterval(printInterval), - fIncrement{increment}, - fBarWidth{progressBarWidth = int(get_tty_size() / 4)}, - fTotalFiles{totalFiles}, +ProgressHelper::ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int printInterval, + bool useColors) + : #if defined(_WIN32) || defined(_WIN64) fIsTTY{_isatty(_fileno(stdout)) != 0}, - fUseShellColours{false && useColors} + fUseShellColours{false && useColors}, #else fIsTTY{isatty(fileno(stdout)) == 1}, - fUseShellColours{useColors && fIsTTY} // Control characters only with terminals. + fUseShellColours{useColors && fIsTTY}, // Control characters only with terminals. #endif + fIncrement{increment}, + fNColumns(fIsTTY ? (get_tty_size() == 0 ? 60 : get_tty_size()) : 50), + fTotalFiles{totalFiles}, + fPrintInterval{printInterval == 0 ? (fIsTTY ? 1 : 10) : printInterval} { + std::fill(fEventsPerSecondStatistics.begin(), fEventsPerSecondStatistics.end(), -1.); +} + +/// Register a new sample for completion statistics. +/// \see ROOT::RDF::RInterface::DefinePerSample(). +/// The *id.AsString()* refers to the name of the currently processed file. +/// The idea is to populate the event entries in the *fSampleNameToEventEntries* map +/// by selecting the greater of the two values: +/// *id.EntryRange().second* which is the upper event entry range of the processed sample +/// and the current value of the event entries in the *fSampleNameToEventEntries* map. +/// In the single threaded case, the two numbers are the same as the entry range corresponds +/// to the number of events in an individual file (each sample is simply a single file). +/// In the multithreaded case, the idea is to accumulate the higher event entry value until +/// the total number of events in a given file is reached. +void ProgressHelper::RegisterNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id) +{ + std::scoped_lock lock(fSampleNameToEventEntriesMutex); + fSampleNameToEventEntries[id.AsString()] = std::max(id.NEntriesTotal(), fSampleNameToEventEntries[id.AsString()]); } /// Compute a running mean of events/s. double ProgressHelper::EvtPerSec() const { - if (fEventsPerSecondStatisticsIndex < fEventsPerSecondStatistics.size()) - return std::accumulate(fEventsPerSecondStatistics.begin(), - fEventsPerSecondStatistics.begin() + fEventsPerSecondStatisticsIndex, 0.) / - fEventsPerSecondStatisticsIndex; - else - return std::accumulate(fEventsPerSecondStatistics.begin(), fEventsPerSecondStatistics.end(), 0.) / - fEventsPerSecondStatistics.size(); + double sum = 0; + unsigned int n = 0; + for (auto item : fEventsPerSecondStatistics) { + if (item >= 0.) { + sum += item; + ++n; + } + } + + return n > 0 ? sum / n : 0.; +} + +/// Compute total events in all open files. +std::size_t ProgressHelper::ComputeTotalEvents() const +{ + std::scoped_lock lock(fSampleNameToEventEntriesMutex); + std::size_t result = 0; + for (const auto &item : fSampleNameToEventEntries) + result += item.second; + return result; } /// Record current event counts and time stamp, populate evts/s statistics array. +/// The function assumes that a lock on the update mutex is held. std::pair ProgressHelper::RecordEvtCountAndTime() { using namespace std::chrono; - auto currentEventCount = fProcessedEvents.load(); + auto currentEventCount = fProcessedEvents.load(std::memory_order_acquire); auto eventsPerTimeInterval = currentEventCount - fLastProcessedEvents; fLastProcessedEvents = currentEventCount; @@ -190,97 +246,123 @@ std::pair ProgressHelper::RecordEvtCountAndTi fLastPrintTime = newPrintTime; duration secondsCurrentInterval = newPrintTime - oldPrintTime; - fEventsPerSecondStatistics[fEventsPerSecondStatisticsIndex++ % fEventsPerSecondStatistics.size()] = + fEventsPerSecondStatisticsCounter = (fEventsPerSecondStatisticsCounter + 1) % fEventsPerSecondStatistics.size(); + fEventsPerSecondStatistics[fEventsPerSecondStatisticsCounter] = eventsPerTimeInterval / secondsCurrentInterval.count(); return {currentEventCount, duration_cast(newPrintTime - fBeginTime)}; } -namespace { - -struct RestoreStreamState { - RestoreStreamState(std::ostream &stream) : fStream(stream) { - fPreservedState.copyfmt(stream); - } - ~RestoreStreamState() +/// Print event and time statistics. +void ProgressHelper::PrintProgressAndStats(std::ostream &stream, std::size_t currentEventCount, + std::chrono::seconds elapsedSeconds) const +{ + std::ostringstream buffer; + auto evtpersec = EvtPerSec(); + auto totalEventsInOpenFiles = ComputeTotalEvents(); + std::size_t currentFileIdx; { - fStream.copyfmt(fPreservedState); + std::scoped_lock lock(fSampleNameToEventEntriesMutex); + currentFileIdx = fSampleNameToEventEntries.size(); } - std::ostream &fStream; - std::ios fPreservedState{nullptr}; -}; - -/// Format std::chrono::seconds as `1:30m`. -std::ostream &operator<<(std::ostream &stream, std::chrono::seconds elapsedSeconds) -{ - RestoreStreamState restore(stream); - auto h = std::chrono::duration_cast(elapsedSeconds); - auto m = std::chrono::duration_cast(elapsedSeconds - h); - auto s = (elapsedSeconds - h - m).count(); + if (totalEventsInOpenFiles == 0) + return; - if (h.count() > 0) - stream << h.count() << ':' << std::setw(2) << std::right << std::setfill('0'); - stream << m.count() << ':' << std::setw(2) << std::right << std::setfill('0') << s; - return stream << (h.count() > 0 ? 'h' : 'm'); -} + double completion = 0.; + if (currentFileIdx < fTotalFiles) { + const double fractionSeenFiles = double(currentFileIdx) / fTotalFiles; + completion = fractionSeenFiles * (double(currentEventCount) / totalEventsInOpenFiles); + } else { + completion = double(currentEventCount) / totalEventsInOpenFiles; + } -} // namespace + // Print the bar + { + const auto barWidth = fNColumns / 4; + unsigned int nBar = std::min(completion, 1.) * barWidth; + std::string bars(std::max(nBar, 1u), '='); + bars.back() = (nBar == barWidth) ? '=' : '>'; -/// Print event and time statistics. -void ProgressHelper::PrintStats(std::ostream &stream, std::size_t currentEventCount, - std::chrono::seconds elapsedSeconds) const -{ - RestoreStreamState restore(stream); - auto evtpersec = EvtPerSec(); - auto GetNEventsOfCurrentFile = ComputeNEventsSoFar(); - auto currentFileIdx = ComputeCurrentFileIdx(); - auto totalFiles = fTotalFiles; + if (fUseShellColours) + buffer << "\033[33m"; + buffer << '|' << std::setfill(' ') << std::setw(barWidth) << std::left << bars << "| "; + if (fUseShellColours) + buffer << "\033[0m"; + } + // Elapsed time + buffer << "["; if (fUseShellColours) - stream << "\033[35m"; - stream << "[" - << "Elapsed time: " << elapsedSeconds << " "; + buffer << "\033[35m"; + buffer << "Elapsed: " << elapsedSeconds << " "; if (fUseShellColours) - stream << "\033[0m"; - stream << "processing file: " << currentFileIdx << " / " << totalFiles << " "; + buffer << "\033[0m"; + buffer << "files: " << currentFileIdx << " / " << fTotalFiles << " "; // Event counts: if (fUseShellColours) - stream << "\033[32m"; - - stream << "processed evts: " << currentEventCount; - if (GetNEventsOfCurrentFile != 0) { - stream << " / " << std::scientific << std::setprecision(2) << GetNEventsOfCurrentFile; + buffer << "\033[32m"; + + buffer << "events: " << currentEventCount; + if (totalEventsInOpenFiles != 0) { + buffer << " / " << std::scientific << std::setprecision(2); + if (currentFileIdx == fTotalFiles) + buffer << totalEventsInOpenFiles; + else + buffer << "(" << totalEventsInOpenFiles << " + x)"; } - stream << " "; + buffer << " "; if (fUseShellColours) - stream << "\033[0m"; + buffer << "\033[0m"; // events/s - stream << std::scientific << std::setprecision(2) << evtpersec << " evt/s"; + buffer << std::scientific << std::setprecision(2) << evtpersec << " evt/s"; // Time statistics: - if (GetNEventsOfCurrentFile != 0) { + // As long as not all files have been opened, estimate when 100% completion will be reached + // based on current completion elapsed time. (This assumes that unopened files have about the + // same size as the files that have been seen.) + // Once the total event count is known, use "missing events / evt/s". + if (totalEventsInOpenFiles != 0) { if (fUseShellColours) - stream << "\033[35m"; - std::chrono::seconds remainingSeconds( - static_cast((ComputeNEventsSoFar() - currentEventCount) / evtpersec)); - stream << " " << remainingSeconds << " " - << " remaining time (per file being processed)"; + buffer << "\033[35m"; + + std::chrono::seconds remainingSeconds; + if (currentFileIdx == fTotalFiles) { + remainingSeconds = + std::chrono::seconds{static_cast((ComputeTotalEvents() - currentEventCount) / evtpersec)}; + } else { + remainingSeconds = + std::chrono::seconds{static_cast(elapsedSeconds.count() / completion - elapsedSeconds.count())}; + } + buffer << " remaining ca.: " << remainingSeconds; + if (fUseShellColours) - stream << "\033[0m"; + buffer << "\033[0m"; } - stream << "] "; + buffer << "]"; + + RestoreStreamState restore(stream); + stream << std::left << std::setw(fNColumns - 1) << buffer.str(); } -void ProgressHelper::PrintStatsFinal(std::ostream &stream, std::chrono::seconds elapsedSeconds) const +void ProgressHelper::PrintStatsFinal() const { + auto &stream = std::cout; RestoreStreamState restore(stream); - auto totalEvents = ComputeNEventsSoFar(); - auto totalFiles = fTotalFiles; + const auto elapsedSeconds = + std::chrono::duration_cast(std::chrono::system_clock::now() - fBeginTime); + const auto totalEvents = ComputeTotalEvents(); + + // The next line resets the current line output in the terminal. + // Brings the cursor at the beginning ('\r'), prints whitespace with the + // same length as the terminal size, then resets the cursor again so we + // can print the final stats on a clean line. + if (fIsTTY) + stream << '\r' << std::string(fNColumns, ' ') << '\r'; if (fUseShellColours) stream << "\033[35m"; @@ -288,45 +370,58 @@ void ProgressHelper::PrintStatsFinal(std::ostream &stream, std::chrono::seconds << "Total elapsed time: " << elapsedSeconds << " "; if (fUseShellColours) stream << "\033[0m"; - stream << "processed files: " << totalFiles << " / " << totalFiles << " "; + stream << "processed files: " << fTotalFiles << " "; // Event counts: if (fUseShellColours) stream << "\033[32m"; - stream << "processed evts: " << totalEvents; - if (totalEvents != 0) { - stream << " / " << std::scientific << std::setprecision(2) << totalEvents; - } + stream << "processed events: " << totalEvents; if (fUseShellColours) stream << "\033[0m"; - stream << "] "; + stream << " " << std::scientific << std::setprecision(2) << (double)totalEvents / elapsedSeconds.count() + << " evt/s"; + + stream << "]\n"; } -/// Print a progress bar of width `ProgressHelper::fBarWidth` if `fGetNEventsOfCurrentFile` is known. -void ProgressHelper::PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const +/// Record number of events processed and update progress bar. +/// This function will atomically record elapsed times and event statistics, and one thread will udpate the progress bar +/// every n seconds (set by the fPrintInterval). +void ProgressHelper::Update() { - auto GetNEventsOfCurrentFile = ComputeNEventsSoFar(); - if (GetNEventsOfCurrentFile == 0) + using namespace std::chrono; + // *************************************************** + // Warning: Here, everything needs to be thread safe: + // *************************************************** + fProcessedEvents.fetch_add(fIncrement, std::memory_order_relaxed); + + // We only print every n seconds. + if (duration_cast(system_clock::now() - fLastPrintTime) < fPrintInterval) { return; + } - RestoreStreamState restore(stream); + // ****************************************************** + // Update the progress bar. Only one thread can proceed. + // ****************************************************** + std::unique_lock lockGuard(fUpdateMutex, std::try_to_lock); + if (!lockGuard) + return; - double completion = double(currentEventCount) / GetNEventsOfCurrentFile; - unsigned int nBar = std::min(completion, 1.) * fBarWidth; + auto const [eventCount, elapsedSeconds] = RecordEvtCountAndTime(); - std::string bars(std::max(nBar, 1u), '='); - bars.back() = (nBar == fBarWidth) ? '=' : '>'; + if (fIsTTY) + std::cout << "\r"; - if (fUseShellColours) - stream << "\033[33m"; - stream << '|' << std::setfill(' ') << std::setw(fBarWidth) << std::left << bars << "| "; - if (fUseShellColours) - stream << "\033[0m"; + PrintProgressAndStats(std::cout, eventCount, elapsedSeconds); + + if (fIsTTY) + std::cout << std::flush; + else + std::cout << std::endl; } -//*/ class ProgressBarAction final : public ROOT::Detail::RDF::RActionImpl { public: @@ -346,22 +441,7 @@ class ProgressBarAction final : public ROOT::Detail::RDF::RActionImpl lockGuard(fPrintMutex, std::adopt_lock); - const auto &[eventCount, elapsedSeconds] = fHelper->RecordEvtCountAndTime(); - - // The next line resets the current line output in the terminal. - // Brings the cursor at the beginning ('\r'), prints whitespace with the - // same length as the terminal size, then resets the cursor again so we - // can print the final stats on a clean line. - std::cout << '\r' << std::string(get_tty_size(), ' ') << '\r'; - fHelper->PrintStatsFinal(std::cout, elapsedSeconds); - std::cout << '\n'; - } + void Finalize() { fHelper->PrintStatsFinal(); } std::string GetActionName() { return "ProgressBar"; } // dummy implementation of PartialUpdate @@ -369,20 +449,19 @@ class ProgressBarAction final : public ROOT::Detail::RDF::RActionImplfHelper->registerNewSample(slot, id); - return this->fHelper->ComputeNEventsSoFar(); - }; + return + [this](unsigned int slot, const ROOT::RDF::RSampleInfo &id) { this->fHelper->RegisterNewSample(slot, id); }; } }; void AddProgressBar(ROOT::RDF::RNode node) { + constexpr std::size_t callbackEveryNEvents = 1000; auto total_files = node.GetNFiles(); - auto progress = std::make_shared(1000, total_files); + auto progress = std::make_shared(callbackEveryNEvents, total_files); ProgressBarAction c(progress); auto r = node.Book<>(c); - r.OnPartialResultSlot(1000, [progress](unsigned int slot, auto &&arg) { (*progress)(slot, arg); }); + r.OnPartialResultSlot(callbackEveryNEvents, [progress](unsigned int slot, auto &&arg) { (*progress)(slot, arg); }); } void AddProgressBar(ROOT::RDataFrame dataframe)