diff --git a/tree/dataframe/inc/ROOT/RDF/RSampleInfo.hxx b/tree/dataframe/inc/ROOT/RDF/RSampleInfo.hxx index ca7d0458210f8..ac6d89a029d2a 100644 --- a/tree/dataframe/inc/ROOT/RDF/RSampleInfo.hxx +++ b/tree/dataframe/inc/ROOT/RDF/RSampleInfo.hxx @@ -37,6 +37,7 @@ class RSampleInfo { std::pair fEntryRange; const ROOT::RDF::Experimental::RSample *fSample = nullptr; // non-owning + ULong64_t fTotalEntries = 0; // Number of entries in current file (if known). void ThrowIfNoSample() const { @@ -48,8 +49,8 @@ class RSampleInfo { public: RSampleInfo(std::string_view id, std::pair entryRange, - const ROOT::RDF::Experimental::RSample *sample = nullptr) - : fID(id), fEntryRange(entryRange), fSample(sample) + const ROOT::RDF::Experimental::RSample *sample = nullptr, ULong64_t totalEntries = 0) + : fID(id), fEntryRange(entryRange), fSample(sample), fTotalEntries{totalEntries} { } RSampleInfo() = default; @@ -121,9 +122,14 @@ public: /// Multiple multi-threading tasks might process different entry ranges of the same sample. std::pair EntryRange() const { return fEntryRange; } - /// @brief Return the number of entries of this sample that is being taken into consideration. + /// @brief Return the number of entries of this range of the sample. ULong64_t NEntries() const { return fEntryRange.second - fEntryRange.first; } + /// Return the total number of entries in the underlying dataset. + /// If the total number of entries is not known, the end of the current range is returned. + /// This can be larger than NEntries() if the sample is split in multiple ranges. + ULong64_t NEntriesTotal() const { return std::max(fTotalEntries, fEntryRange.second); } + bool operator==(const RSampleInfo &other) const { return fID == other.fID; } bool operator!=(const RSampleInfo &other) const { return !(*this == other); } }; diff --git a/tree/dataframe/inc/ROOT/RDFHelpers.hxx b/tree/dataframe/inc/ROOT/RDFHelpers.hxx index bbab81bd27355..65575a79ca7c3 100644 --- a/tree/dataframe/inc/ROOT/RDFHelpers.hxx +++ b/tree/dataframe/inc/ROOT/RDFHelpers.hxx @@ -310,8 +310,6 @@ void AddProgressBar(ROOT::RDataFrame df); /// @param nThread Number of threads that share a TH3D. void ThreadsPerTH3(unsigned int nThread = 1); -class ProgressBarAction; - /// RDF progress helper. /// This class provides callback functions to the RDataFrame. The event statistics /// (including elapsed time, currently processed file, currently processed events, the rate of event processing @@ -333,132 +331,62 @@ class ProgressBarAction; /// ~~~ class ProgressHelper { private: + std::size_t ComputeTotalEvents() const; double EvtPerSec() const; + void PrintProgressAndStats(std::ostream &stream, std::size_t currentEventCount, + std::chrono::seconds totalElapsedSeconds) const; std::pair RecordEvtCountAndTime(); - void PrintStats(std::ostream &stream, std::size_t currentEventCount, std::chrono::seconds totalElapsedSeconds) const; - void PrintStatsFinal(std::ostream &stream, std::chrono::seconds totalElapsedSeconds) const; - void PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const; + void Update(); - std::chrono::time_point fBeginTime = std::chrono::system_clock::now(); - std::chrono::time_point fLastPrintTime = fBeginTime; - std::chrono::seconds fPrintInterval{1}; + bool const fIsTTY; + bool const fUseShellColours; std::atomic fProcessedEvents{0}; std::size_t fLastProcessedEvents{0}; - std::size_t fIncrement; - - mutable std::mutex fSampleNameToEventEntriesMutex; - std::map fSampleNameToEventEntries; // Filename, events in the file + std::size_t const fIncrement; + unsigned int const fNColumns; + unsigned int const fTotalFiles; - std::array fEventsPerSecondStatistics; - std::size_t fEventsPerSecondStatisticsIndex{0}; + std::array fEventsPerSecondStatistics; + unsigned int fEventsPerSecondStatisticsCounter{0}; - unsigned int fBarWidth; - unsigned int fTotalFiles; + std::chrono::time_point const fBeginTime = std::chrono::system_clock::now(); + std::chrono::time_point fLastPrintTime = fBeginTime; + std::chrono::seconds const fPrintInterval; - std::mutex fPrintMutex; - bool fIsTTY; - bool fUseShellColours; + // Mutex to ensure that only one thread updates the progress bar. + // Lock this mutex to update any of the members above: + std::mutex fUpdateMutex; - std::shared_ptr fTree{nullptr}; + mutable std::mutex fSampleNameToEventEntriesMutex; // Mutex to protect access to the below map + std::map fSampleNameToEventEntries; // Filename, events in the file public: /// Create a progress helper. /// \param increment RDF callbacks are called every `n` events. Pass this `n` here. - /// \param totalFiles read total number of files in the RDF. - /// \param progressBarWidth Number of characters the progress bar will occupy. - /// \param printInterval Update every stats every `n` seconds. + /// \param totalFiles number of files read in the RDF. + /// \param printInterval Update stats every `n` seconds. /// \param useColors Use shell colour codes to colour the output. Automatically disabled when /// we are not writing to a tty. - ProgressHelper(std::size_t increment, unsigned int totalFiles = 1, unsigned int progressBarWidth = 40, - unsigned int printInterval = 1, bool useColors = true); - + ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int printInterval = 0, + bool useColors = true); + ProgressHelper(ProgressHelper const &) = delete; // The mutexes and atomics won't allow copy/move + ProgressHelper(ProgressHelper &&) = delete; ~ProgressHelper() = default; + ProgressHelper &operator=(ProgressHelper const &) = delete; + ProgressHelper &operator=(ProgressHelper &&) = delete; - friend class ProgressBarAction; - - /// Register a new sample for completion statistics. - /// \see ROOT::RDF::RInterface::DefinePerSample(). - /// The *id.AsString()* refers to the name of the currently processed file. - /// The idea is to populate the event entries in the *fSampleNameToEventEntries* map - /// by selecting the greater of the two values: - /// *id.EntryRange().second* which is the upper event entry range of the processed sample - /// and the current value of the event entries in the *fSampleNameToEventEntries* map. - /// In the single threaded case, the two numbers are the same as the entry range corresponds - /// to the number of events in an individual file (each sample is simply a single file). - /// In the multithreaded case, the idea is to accumulate the higher event entry value until - /// the total number of events in a given file is reached. - void registerNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id) - { - std::lock_guard lock(fSampleNameToEventEntriesMutex); - fSampleNameToEventEntries[id.AsString()] = - std::max(id.EntryRange().second, fSampleNameToEventEntries[id.AsString()]); - } + void RegisterNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id); /// Thread-safe callback for RDataFrame. /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the /// fPrintInterval). \param slot Ignored. \param value Ignored. template - void operator()(unsigned int /*slot*/, T &value) - { - operator()(value); - } - // clang-format off - /// Thread-safe callback for RDataFrame. - /// It will record elapsed times and event statistics, and print a progress bar every n seconds (set by the fPrintInterval). - /// \param value Ignored. - // clang-format on - template - void operator()(T & /*value*/) - { - using namespace std::chrono; - // *************************************************** - // Warning: Here, everything needs to be thread safe: - // *************************************************** - fProcessedEvents += fIncrement; - - // We only print every n seconds. - if (duration_cast(system_clock::now() - fLastPrintTime) < fPrintInterval) { - return; - } - - // *************************************************** - // Protected by lock from here: - // *************************************************** - if (!fPrintMutex.try_lock()) - return; - std::lock_guard lockGuard(fPrintMutex, std::adopt_lock); - - std::size_t eventCount; - seconds elapsedSeconds; - std::tie(eventCount, elapsedSeconds) = RecordEvtCountAndTime(); - - if (fIsTTY) - std::cout << "\r"; - - PrintProgressBar(std::cout, eventCount); - PrintStats(std::cout, eventCount, elapsedSeconds); - - if (fIsTTY) - std::cout << std::flush; - else - std::cout << std::endl; - } - - std::size_t ComputeNEventsSoFar() const - { - std::unique_lock lock(fSampleNameToEventEntriesMutex); - std::size_t result = 0; - for (const auto &item : fSampleNameToEventEntries) - result += item.second; - return result; - } - - unsigned int ComputeCurrentFileIdx() const + void operator()(unsigned int /*slot*/, T & /*value*/) { - std::unique_lock lock(fSampleNameToEventEntriesMutex); - return fSampleNameToEventEntries.size(); + Update(); } + void PrintStatsFinal() const; }; } // namespace Experimental } // namespace RDF diff --git a/tree/dataframe/src/RDFHelpers.cxx b/tree/dataframe/src/RDFHelpers.cxx index 8ab92ea4ca5c4..f285a03de1ebd 100644 --- a/tree/dataframe/src/RDFHelpers.cxx +++ b/tree/dataframe/src/RDFHelpers.cxx @@ -32,11 +32,7 @@ #include #include -// TODO, this function should be part of core libraries #include -#if (!defined(_WIN32)) && (!defined(_WIN64)) -#include -#endif #if defined(_WIN32) || defined(_WIN64) #define WIN32_LEAN_AND_MEAN @@ -45,11 +41,14 @@ #include #else #include +#include #endif class TTreeReader; +namespace { // Get terminal size for progress bar +// TODO: Put this in core libraries? int get_tty_size() { #if defined(_WIN32) || defined(_WIN64) @@ -69,6 +68,30 @@ int get_tty_size() #endif } +/// Restore an output stream to its previous state using RAII. +struct RestoreStreamState { + RestoreStreamState(std::ostream &stream) : fStream(stream) { fPreservedState.copyfmt(stream); } + ~RestoreStreamState() { fStream.copyfmt(fPreservedState); } + + std::ostream &fStream; + std::ios fPreservedState{nullptr}; +}; +} // namespace + +/// Format std::chrono::seconds as `1:30m`. +std::ostream &operator<<(std::ostream &stream, std::chrono::seconds elapsedSeconds) +{ + RestoreStreamState restore(stream); + auto h = std::chrono::duration_cast(elapsedSeconds); + auto m = std::chrono::duration_cast(elapsedSeconds - h); + auto s = (elapsedSeconds - h - m).count(); + + if (h.count() > 0) + stream << h.count() << ':' << std::setw(2) << std::right << std::setfill('0'); + stream << m.count() << ':' << std::setw(2) << std::right << std::setfill('0') << s; + return stream << (h.count() > 0 ? 'h' : 'm'); +} + using ROOT::RDF::RResultHandle; unsigned int ROOT::RDF::RunGraphs(std::vector handles) @@ -148,40 +171,73 @@ void ThreadsPerTH3(unsigned int N) ROOT::Internal::RDF::NThreadPerTH3() = N; } -ProgressHelper::ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int progressBarWidth, - unsigned int printInterval, bool useColors) - : fPrintInterval(printInterval), - fIncrement{increment}, - fBarWidth{progressBarWidth = int(get_tty_size() / 4)}, - fTotalFiles{totalFiles}, +ProgressHelper::ProgressHelper(std::size_t increment, unsigned int totalFiles, unsigned int printInterval, + bool useColors) + : #if defined(_WIN32) || defined(_WIN64) fIsTTY{_isatty(_fileno(stdout)) != 0}, - fUseShellColours{false && useColors} + fUseShellColours{false && useColors}, #else fIsTTY{isatty(fileno(stdout)) == 1}, - fUseShellColours{useColors && fIsTTY} // Control characters only with terminals. + fUseShellColours{useColors && fIsTTY}, // Control characters only with terminals. #endif + fIncrement{increment}, + fNColumns(fIsTTY ? (get_tty_size() == 0 ? 60 : get_tty_size()) : 50), + fTotalFiles{totalFiles}, + fPrintInterval{printInterval == 0 ? (fIsTTY ? 1 : 10) : printInterval} { + std::fill(fEventsPerSecondStatistics.begin(), fEventsPerSecondStatistics.end(), -1.); +} + +/// Register a new sample for completion statistics. +/// \see ROOT::RDF::RInterface::DefinePerSample(). +/// The *id.AsString()* refers to the name of the currently processed file. +/// The idea is to populate the event entries in the *fSampleNameToEventEntries* map +/// by selecting the greater of the two values: +/// *id.EntryRange().second* which is the upper event entry range of the processed sample +/// and the current value of the event entries in the *fSampleNameToEventEntries* map. +/// In the single threaded case, the two numbers are the same as the entry range corresponds +/// to the number of events in an individual file (each sample is simply a single file). +/// In the multithreaded case, the idea is to accumulate the higher event entry value until +/// the total number of events in a given file is reached. +void ProgressHelper::RegisterNewSample(unsigned int /*slot*/, const ROOT::RDF::RSampleInfo &id) +{ + std::scoped_lock lock(fSampleNameToEventEntriesMutex); + fSampleNameToEventEntries[id.AsString()] = std::max(id.NEntriesTotal(), fSampleNameToEventEntries[id.AsString()]); } /// Compute a running mean of events/s. double ProgressHelper::EvtPerSec() const { - if (fEventsPerSecondStatisticsIndex < fEventsPerSecondStatistics.size()) - return std::accumulate(fEventsPerSecondStatistics.begin(), - fEventsPerSecondStatistics.begin() + fEventsPerSecondStatisticsIndex, 0.) / - fEventsPerSecondStatisticsIndex; - else - return std::accumulate(fEventsPerSecondStatistics.begin(), fEventsPerSecondStatistics.end(), 0.) / - fEventsPerSecondStatistics.size(); + double sum = 0; + unsigned int n = 0; + for (auto item : fEventsPerSecondStatistics) { + if (item >= 0.) { + sum += item; + ++n; + } + } + + return n > 0 ? sum / n : 0.; +} + +/// Compute total events in all open files. +std::size_t ProgressHelper::ComputeTotalEvents() const +{ + std::scoped_lock lock(fSampleNameToEventEntriesMutex); + std::size_t result = 0; + for (const auto &item : fSampleNameToEventEntries) + result += item.second; + return result; } /// Record current event counts and time stamp, populate evts/s statistics array. +/// The function assumes that a lock on the update mutex is held. std::pair ProgressHelper::RecordEvtCountAndTime() { using namespace std::chrono; - auto currentEventCount = fProcessedEvents.load(); + auto currentEventCount = fProcessedEvents.load(std::memory_order_acquire); auto eventsPerTimeInterval = currentEventCount - fLastProcessedEvents; fLastProcessedEvents = currentEventCount; @@ -190,97 +246,123 @@ std::pair ProgressHelper::RecordEvtCountAndTi fLastPrintTime = newPrintTime; duration secondsCurrentInterval = newPrintTime - oldPrintTime; - fEventsPerSecondStatistics[fEventsPerSecondStatisticsIndex++ % fEventsPerSecondStatistics.size()] = + fEventsPerSecondStatisticsCounter = (fEventsPerSecondStatisticsCounter + 1) % fEventsPerSecondStatistics.size(); + fEventsPerSecondStatistics[fEventsPerSecondStatisticsCounter] = eventsPerTimeInterval / secondsCurrentInterval.count(); return {currentEventCount, duration_cast(newPrintTime - fBeginTime)}; } -namespace { - -struct RestoreStreamState { - RestoreStreamState(std::ostream &stream) : fStream(stream) { - fPreservedState.copyfmt(stream); - } - ~RestoreStreamState() +/// Print event and time statistics. +void ProgressHelper::PrintProgressAndStats(std::ostream &stream, std::size_t currentEventCount, + std::chrono::seconds elapsedSeconds) const +{ + std::ostringstream buffer; + auto evtpersec = EvtPerSec(); + auto totalEventsInOpenFiles = ComputeTotalEvents(); + std::size_t currentFileIdx; { - fStream.copyfmt(fPreservedState); + std::scoped_lock lock(fSampleNameToEventEntriesMutex); + currentFileIdx = fSampleNameToEventEntries.size(); } - std::ostream &fStream; - std::ios fPreservedState{nullptr}; -}; - -/// Format std::chrono::seconds as `1:30m`. -std::ostream &operator<<(std::ostream &stream, std::chrono::seconds elapsedSeconds) -{ - RestoreStreamState restore(stream); - auto h = std::chrono::duration_cast(elapsedSeconds); - auto m = std::chrono::duration_cast(elapsedSeconds - h); - auto s = (elapsedSeconds - h - m).count(); + if (totalEventsInOpenFiles == 0) + return; - if (h.count() > 0) - stream << h.count() << ':' << std::setw(2) << std::right << std::setfill('0'); - stream << m.count() << ':' << std::setw(2) << std::right << std::setfill('0') << s; - return stream << (h.count() > 0 ? 'h' : 'm'); -} + double completion = 0.; + if (currentFileIdx < fTotalFiles) { + const double fractionSeenFiles = double(currentFileIdx) / fTotalFiles; + completion = fractionSeenFiles * (double(currentEventCount) / totalEventsInOpenFiles); + } else { + completion = double(currentEventCount) / totalEventsInOpenFiles; + } -} // namespace + // Print the bar + { + const auto barWidth = fNColumns / 4; + unsigned int nBar = std::min(completion, 1.) * barWidth; + std::string bars(std::max(nBar, 1u), '='); + bars.back() = (nBar == barWidth) ? '=' : '>'; -/// Print event and time statistics. -void ProgressHelper::PrintStats(std::ostream &stream, std::size_t currentEventCount, - std::chrono::seconds elapsedSeconds) const -{ - RestoreStreamState restore(stream); - auto evtpersec = EvtPerSec(); - auto GetNEventsOfCurrentFile = ComputeNEventsSoFar(); - auto currentFileIdx = ComputeCurrentFileIdx(); - auto totalFiles = fTotalFiles; + if (fUseShellColours) + buffer << "\033[33m"; + buffer << '|' << std::setfill(' ') << std::setw(barWidth) << std::left << bars << "| "; + if (fUseShellColours) + buffer << "\033[0m"; + } + // Elapsed time + buffer << "["; if (fUseShellColours) - stream << "\033[35m"; - stream << "[" - << "Elapsed time: " << elapsedSeconds << " "; + buffer << "\033[35m"; + buffer << "Elapsed: " << elapsedSeconds << " "; if (fUseShellColours) - stream << "\033[0m"; - stream << "processing file: " << currentFileIdx << " / " << totalFiles << " "; + buffer << "\033[0m"; + buffer << "files: " << currentFileIdx << " / " << fTotalFiles << " "; // Event counts: if (fUseShellColours) - stream << "\033[32m"; - - stream << "processed evts: " << currentEventCount; - if (GetNEventsOfCurrentFile != 0) { - stream << " / " << std::scientific << std::setprecision(2) << GetNEventsOfCurrentFile; + buffer << "\033[32m"; + + buffer << "events: " << currentEventCount; + if (totalEventsInOpenFiles != 0) { + buffer << " / " << std::scientific << std::setprecision(2); + if (currentFileIdx == fTotalFiles) + buffer << totalEventsInOpenFiles; + else + buffer << "(" << totalEventsInOpenFiles << " + x)"; } - stream << " "; + buffer << " "; if (fUseShellColours) - stream << "\033[0m"; + buffer << "\033[0m"; // events/s - stream << std::scientific << std::setprecision(2) << evtpersec << " evt/s"; + buffer << std::scientific << std::setprecision(2) << evtpersec << " evt/s"; // Time statistics: - if (GetNEventsOfCurrentFile != 0) { + // As long as not all files have been opened, estimate when 100% completion will be reached + // based on current completion elapsed time. (This assumes that unopened files have about the + // same size as the files that have been seen.) + // Once the total event count is known, use "missing events / evt/s". + if (totalEventsInOpenFiles != 0) { if (fUseShellColours) - stream << "\033[35m"; - std::chrono::seconds remainingSeconds( - static_cast((ComputeNEventsSoFar() - currentEventCount) / evtpersec)); - stream << " " << remainingSeconds << " " - << " remaining time (per file being processed)"; + buffer << "\033[35m"; + + std::chrono::seconds remainingSeconds; + if (currentFileIdx == fTotalFiles) { + remainingSeconds = + std::chrono::seconds{static_cast((ComputeTotalEvents() - currentEventCount) / evtpersec)}; + } else { + remainingSeconds = + std::chrono::seconds{static_cast(elapsedSeconds.count() / completion - elapsedSeconds.count())}; + } + buffer << " remaining ca.: " << remainingSeconds; + if (fUseShellColours) - stream << "\033[0m"; + buffer << "\033[0m"; } - stream << "] "; + buffer << "]"; + + RestoreStreamState restore(stream); + stream << std::left << std::setw(fNColumns - 1) << buffer.str(); } -void ProgressHelper::PrintStatsFinal(std::ostream &stream, std::chrono::seconds elapsedSeconds) const +void ProgressHelper::PrintStatsFinal() const { + auto &stream = std::cout; RestoreStreamState restore(stream); - auto totalEvents = ComputeNEventsSoFar(); - auto totalFiles = fTotalFiles; + const auto elapsedSeconds = + std::chrono::duration_cast(std::chrono::system_clock::now() - fBeginTime); + const auto totalEvents = ComputeTotalEvents(); + + // The next line resets the current line output in the terminal. + // Brings the cursor at the beginning ('\r'), prints whitespace with the + // same length as the terminal size, then resets the cursor again so we + // can print the final stats on a clean line. + if (fIsTTY) + stream << '\r' << std::string(fNColumns, ' ') << '\r'; if (fUseShellColours) stream << "\033[35m"; @@ -288,45 +370,58 @@ void ProgressHelper::PrintStatsFinal(std::ostream &stream, std::chrono::seconds << "Total elapsed time: " << elapsedSeconds << " "; if (fUseShellColours) stream << "\033[0m"; - stream << "processed files: " << totalFiles << " / " << totalFiles << " "; + stream << "processed files: " << fTotalFiles << " "; // Event counts: if (fUseShellColours) stream << "\033[32m"; - stream << "processed evts: " << totalEvents; - if (totalEvents != 0) { - stream << " / " << std::scientific << std::setprecision(2) << totalEvents; - } + stream << "processed events: " << totalEvents; if (fUseShellColours) stream << "\033[0m"; - stream << "] "; + stream << " " << std::scientific << std::setprecision(2) << (double)totalEvents / elapsedSeconds.count() + << " evt/s"; + + stream << "]\n"; } -/// Print a progress bar of width `ProgressHelper::fBarWidth` if `fGetNEventsOfCurrentFile` is known. -void ProgressHelper::PrintProgressBar(std::ostream &stream, std::size_t currentEventCount) const +/// Record number of events processed and update progress bar. +/// This function will atomically record elapsed times and event statistics, and one thread will udpate the progress bar +/// every n seconds (set by the fPrintInterval). +void ProgressHelper::Update() { - auto GetNEventsOfCurrentFile = ComputeNEventsSoFar(); - if (GetNEventsOfCurrentFile == 0) + using namespace std::chrono; + // *************************************************** + // Warning: Here, everything needs to be thread safe: + // *************************************************** + fProcessedEvents.fetch_add(fIncrement, std::memory_order_relaxed); + + // We only print every n seconds. + if (duration_cast(system_clock::now() - fLastPrintTime) < fPrintInterval) { return; + } - RestoreStreamState restore(stream); + // ****************************************************** + // Update the progress bar. Only one thread can proceed. + // ****************************************************** + std::unique_lock lockGuard(fUpdateMutex, std::try_to_lock); + if (!lockGuard) + return; - double completion = double(currentEventCount) / GetNEventsOfCurrentFile; - unsigned int nBar = std::min(completion, 1.) * fBarWidth; + auto const [eventCount, elapsedSeconds] = RecordEvtCountAndTime(); - std::string bars(std::max(nBar, 1u), '='); - bars.back() = (nBar == fBarWidth) ? '=' : '>'; + if (fIsTTY) + std::cout << "\r"; - if (fUseShellColours) - stream << "\033[33m"; - stream << '|' << std::setfill(' ') << std::setw(fBarWidth) << std::left << bars << "| "; - if (fUseShellColours) - stream << "\033[0m"; + PrintProgressAndStats(std::cout, eventCount, elapsedSeconds); + + if (fIsTTY) + std::cout << std::flush; + else + std::cout << std::endl; } -//*/ class ProgressBarAction final : public ROOT::Detail::RDF::RActionImpl { public: @@ -346,22 +441,7 @@ class ProgressBarAction final : public ROOT::Detail::RDF::RActionImpl lockGuard(fPrintMutex, std::adopt_lock); - const auto &[eventCount, elapsedSeconds] = fHelper->RecordEvtCountAndTime(); - - // The next line resets the current line output in the terminal. - // Brings the cursor at the beginning ('\r'), prints whitespace with the - // same length as the terminal size, then resets the cursor again so we - // can print the final stats on a clean line. - std::cout << '\r' << std::string(get_tty_size(), ' ') << '\r'; - fHelper->PrintStatsFinal(std::cout, elapsedSeconds); - std::cout << '\n'; - } + void Finalize() { fHelper->PrintStatsFinal(); } std::string GetActionName() { return "ProgressBar"; } // dummy implementation of PartialUpdate @@ -369,20 +449,19 @@ class ProgressBarAction final : public ROOT::Detail::RDF::RActionImplfHelper->registerNewSample(slot, id); - return this->fHelper->ComputeNEventsSoFar(); - }; + return + [this](unsigned int slot, const ROOT::RDF::RSampleInfo &id) { this->fHelper->RegisterNewSample(slot, id); }; } }; void AddProgressBar(ROOT::RDF::RNode node) { + constexpr std::size_t callbackEveryNEvents = 1000; auto total_files = node.GetNFiles(); - auto progress = std::make_shared(1000, total_files); + auto progress = std::make_shared(callbackEveryNEvents, total_files); ProgressBarAction c(progress); auto r = node.Book<>(c); - r.OnPartialResultSlot(1000, [progress](unsigned int slot, auto &&arg) { (*progress)(slot, arg); }); + r.OnPartialResultSlot(callbackEveryNEvents, [progress](unsigned int slot, auto &&arg) { (*progress)(slot, arg); }); } void AddProgressBar(ROOT::RDataFrame dataframe) diff --git a/tree/dataframe/src/RLoopManager.cxx b/tree/dataframe/src/RLoopManager.cxx index 3f533303b1409..96986f60b49a4 100644 --- a/tree/dataframe/src/RLoopManager.cxx +++ b/tree/dataframe/src/RLoopManager.cxx @@ -739,11 +739,11 @@ void RLoopManager::UpdateSampleInfo(unsigned int slot, TTreeReader &r) { // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/' const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename; if (fSampleMap.empty()) { - fSampleInfos[slot] = RSampleInfo(id, range); + fSampleInfos[slot] = RSampleInfo(id, range, nullptr, tree->GetEntries()); } else { if (fSampleMap.find(id) == fSampleMap.end()) throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples."); - fSampleInfos[slot] = RSampleInfo(id, range, fSampleMap[id]); + fSampleInfos[slot] = RSampleInfo(id, range, fSampleMap[id], tree->GetEntries()); } } diff --git a/tree/dataframe/src/RNTupleDS.cxx b/tree/dataframe/src/RNTupleDS.cxx index d887bb7f43ce2..db1f3aa17b568 100644 --- a/tree/dataframe/src/RNTupleDS.cxx +++ b/tree/dataframe/src/RNTupleDS.cxx @@ -917,14 +917,15 @@ ROOT::RDF::RSampleInfo ROOT::Internal::RDF::RNTupleDS::CreateSampleInfo( if (sampleMap.empty()) return ROOT::RDF::RSampleInfo( - ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry)); + ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry), nullptr, + fPrincipalDescriptor.GetNEntries()); if (sampleMap.find(ntupleID) == sampleMap.end()) throw std::runtime_error("Full sample identifier '" + ntupleID + "' cannot be found in the available samples."); return ROOT::RDF::RSampleInfo( ntupleID, std::make_pair(fCurrentRanges[rangeIdx].fFirstEntry, fCurrentRanges[rangeIdx].fLastEntry), - sampleMap.at(ntupleID)); + sampleMap.at(ntupleID), fPrincipalDescriptor.GetNEntries()); } ROOT::RDataFrame ROOT::Internal::RDF::FromRNTuple(std::string_view ntupleName, diff --git a/tree/dataframe/src/RTTreeDS.cxx b/tree/dataframe/src/RTTreeDS.cxx index 824ff39c565a4..1a401ea495810 100644 --- a/tree/dataframe/src/RTTreeDS.cxx +++ b/tree/dataframe/src/RTTreeDS.cxx @@ -378,11 +378,11 @@ ROOT::RDF::RSampleInfo ROOT::Internal::RDF::RTTreeDS::CreateSampleInfo( // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/' const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename; if (sampleMap.empty()) { - return RSampleInfo(id, range); + return RSampleInfo(id, range, nullptr, tree->GetEntries()); } else { if (sampleMap.find(id) == sampleMap.end()) throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples."); - return RSampleInfo(id, range, sampleMap.at(id)); + return RSampleInfo(id, range, sampleMap.at(id), tree->GetEntries()); } }