From 93dffce295410f9f1e1e6e4b15f975440975fea4 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Thu, 28 Nov 2024 18:29:10 +0000 Subject: [PATCH 1/5] wip --- src/gribjump/Engine.cc | 12 +++++------ src/gribjump/Engine.h | 12 +++++++++-- src/gribjump/Forwarder.cc | 5 +++-- src/gribjump/Forwarder.h | 3 ++- src/gribjump/LocalGribJump.cc | 16 +++++++++----- src/gribjump/Task.h | 38 ++++++++++++++++++++++++++++++++-- src/gribjump/remote/Request.cc | 4 +++- src/gribjump/remote/Request.h | 4 ++-- 8 files changed, 73 insertions(+), 21 deletions(-) diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index cb3aa02..2a02ba0 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -104,13 +104,12 @@ filemap_t Engine::buildFileMap(const metkit::mars::MarsRequest& unionrequest, Ex return filemap; } -void Engine::scheduleExtractionTasks(filemap_t& filemap){ +TaskReport Engine::scheduleExtractionTasks(filemap_t& filemap){ bool forwardExtraction = LibGribJump::instance().config().getBool("forwardExtraction", false); if (forwardExtraction) { Forwarder forwarder; - forwarder.extract(filemap); - return; + return forwarder.extract(filemap); } bool inefficientExtraction = LibGribJump::instance().config().getBool("inefficientExtraction", false); @@ -128,9 +127,10 @@ void Engine::scheduleExtractionTasks(filemap_t& filemap){ } } taskGroup_.waitForTasks(); + return taskGroup_.report(); } -ResultsMap Engine::extract(ExtractionRequests& requests) { +TaskOutcome Engine::extract(ExtractionRequests& requests) { eckit::Timer timer("Engine::extract"); @@ -141,7 +141,7 @@ ResultsMap Engine::extract(ExtractionRequests& requests) { MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed()); timer.reset("Gribjump Engine: Built file map"); - scheduleExtractionTasks(filemap); + TaskReport report = scheduleExtractionTasks(filemap); MetricsManager::instance().set("elapsed_tasks", timer.elapsed()); timer.reset("Gribjump Engine: All tasks finished"); @@ -150,7 +150,7 @@ ResultsMap Engine::extract(ExtractionRequests& requests) { timer.reset("Gribjump Engine: Repackaged results"); - return results; + return {results, report}; } ResultsMap Engine::collectResults(ExItemMap& keyToExtractionItem) { diff --git a/src/gribjump/Engine.h b/src/gribjump/Engine.h index 4521795..3b5f863 100644 --- a/src/gribjump/Engine.h +++ b/src/gribjump/Engine.h @@ -22,13 +22,21 @@ namespace gribjump { +template + +struct TaskOutcome { + T result; + TaskReport report; +}; + class Engine { public: Engine(); ~Engine(); - ResultsMap extract(ExtractionRequests& requests); + // ResultsMap extract(ExtractionRequests& requests); + TaskOutcome extract(ExtractionRequests& requests); // byfiles: scan entire file, not just fields matching request size_t scan(const MarsRequests& requests, bool byfiles = false); @@ -37,7 +45,7 @@ class Engine { std::map > axes(const std::string& request, int level=3); - void scheduleExtractionTasks(filemap_t& filemap); + TaskReport scheduleExtractionTasks(filemap_t& filemap); void reportErrors(eckit::Stream& client_); void raiseErrors(); diff --git a/src/gribjump/Forwarder.cc b/src/gribjump/Forwarder.cc index 5b01e37..da4ab06 100644 --- a/src/gribjump/Forwarder.cc +++ b/src/gribjump/Forwarder.cc @@ -17,7 +17,6 @@ #include "gribjump/Forwarder.h" #include "gribjump/ExtractionItem.h" #include "gribjump/LibGribJump.h" -#include "gribjump/Task.h" namespace gribjump { @@ -54,7 +53,7 @@ size_t Forwarder::scan(const std::vector& uris) { return nFields; } -void Forwarder::extract(filemap_t& filemap) { +TaskReport Forwarder::extract(filemap_t& filemap) { std::unordered_map serverfilemaps = serverFileMap(filemap); TaskGroup taskGroup; @@ -63,6 +62,8 @@ void Forwarder::extract(filemap_t& filemap) { taskGroup.enqueueTask(new ForwardExtractionTask(taskGroup, counter++, endpoint, subfilemap)); } taskGroup.waitForTasks(); + + return taskGroup.report(); } diff --git a/src/gribjump/Forwarder.h b/src/gribjump/Forwarder.h index 3cea3f4..fb47594 100644 --- a/src/gribjump/Forwarder.h +++ b/src/gribjump/Forwarder.h @@ -10,6 +10,7 @@ /// @author Christopher Bradley #include "gribjump/Types.h" +#include "gribjump/Task.h" #include "eckit/net/Endpoint.h" #pragma once @@ -26,7 +27,7 @@ class Forwarder { ~Forwarder(); size_t scan(const std::vector& uris); - void extract(filemap_t& filemap); + TaskReport extract(filemap_t& filemap); private: diff --git a/src/gribjump/LocalGribJump.cc b/src/gribjump/LocalGribJump.cc index 5365078..004d9cd 100644 --- a/src/gribjump/LocalGribJump.cc +++ b/src/gribjump/LocalGribJump.cc @@ -87,8 +87,12 @@ std::vector> LocalGribJump::extract(const eckit: std::vector>> LocalGribJump::extract(ExtractionRequests& requests) { Engine engine; - ResultsMap results = engine.extract(requests); - engine.raiseErrors(); + // ResultsMap results = engine.extract(requests); + auto x = engine.extract(requests); + ResultsMap results = x.result; + // engine.raiseErrors(); + x.report.raiseErrors(); + std::vector>> extractionResults; for (auto& req : requests) { @@ -114,9 +118,11 @@ ResultsMap LocalGribJump::extract(const std::vector& requests, cons extractionRequests.push_back(ExtractionRequest(requests[i], ranges[i])); } - ResultsMap results = engine.extract(extractionRequests); - engine.raiseErrors(); - return results; + // ResultsMap results = engine.extract(extractionRequests); + auto x = engine.extract(extractionRequests); + // engine.raiseErrors(); + x.report.raiseErrors(); + return x.result; } std::map> LocalGribJump::axes(const std::string& request, int level) { diff --git a/src/gribjump/Task.h b/src/gribjump/Task.h index 2824db2..3bb7278 100644 --- a/src/gribjump/Task.h +++ b/src/gribjump/Task.h @@ -57,6 +57,39 @@ class Task { //---------------------------------------------------------------------------------------------------------------------- +// TaskReport contains error messages and other information produced by a TaskGroup, and methods to either +// report them to a client or raise an exception. +// TaskGroup will return a TaskReport object to calling code. +class TaskReport { + +public: + TaskReport() {} + TaskReport(std::vector&& errors) : errors_(std::move(errors)) {} + + void reportErrors(eckit::Stream& client) const { + client << errors_.size(); + for (const auto& s : errors_) { + client << s; + } + } + + void raiseErrors() const { + if (errors_.size() > 0) { + std::stringstream ss; + ss << "Encountered " << errors_.size() << " error(s) during task execution:" << std::endl; + for (const auto& s : errors_) { + ss << s << std::endl; + } + throw eckit::SeriousBug(ss.str()); + } + } + +private: + std::vector errors_; //< stores error messages, empty if no errors +}; + +//---------------------------------------------------------------------------------------------------------------------- +// class TaskGroup { public: @@ -78,9 +111,10 @@ class TaskGroup { /// Wait for all queued tasks to be executed void waitForTasks(); - void reportErrors(eckit::Stream& client); - void raiseErrors(); + void reportErrors(eckit::Stream& client); // remove this: TaskReport + void raiseErrors(); // XX remove this: TaskReport + TaskReport report() {return TaskReport(std::move(errors_)); } std::mutex debugMutex_; diff --git a/src/gribjump/remote/Request.cc b/src/gribjump/remote/Request.cc index fa5434c..a2d6aad 100644 --- a/src/gribjump/remote/Request.cc +++ b/src/gribjump/remote/Request.cc @@ -86,7 +86,9 @@ ExtractRequest::ExtractRequest(eckit::Stream& stream) : Request(stream) { void ExtractRequest::execute() { - results_ = engine_.extract(requests_); + auto x = engine_.extract(requests_); + results_ = x.result; + TaskReport report = x.report; if (LibGribJump::instance().debug()) { for (auto& pair : results_) { diff --git a/src/gribjump/remote/Request.h b/src/gribjump/remote/Request.h index 7483f7d..22f9413 100644 --- a/src/gribjump/remote/Request.h +++ b/src/gribjump/remote/Request.h @@ -45,8 +45,8 @@ class Request { protected: // members eckit::Stream& client_; - Engine engine_; //< Engine and schedule tasks based on request - + Engine engine_; //< Engine and schedule tasks based on request // XXX Can we now remove this? + TaskReport report_; // uint64_t id_; }; From c620b78c895cedf827bd548d70d5633312f863d6 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Fri, 29 Nov 2024 15:10:26 +0000 Subject: [PATCH 2/5] Make engine return TaskReport, allowing it to be stateless --- src/gribjump/Engine.cc | 43 +++++++++++++++++----------------- src/gribjump/Engine.h | 11 +++------ src/gribjump/Forwarder.cc | 4 ++-- src/gribjump/Forwarder.h | 3 ++- src/gribjump/LocalGribJump.cc | 32 ++++++++++--------------- src/gribjump/Task.cc | 18 +++++++------- src/gribjump/Task.h | 32 +++++-------------------- src/gribjump/remote/Request.cc | 21 ++++++++++------- tests/test_engine.cc | 4 ++-- 9 files changed, 68 insertions(+), 100 deletions(-) diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index 2a02ba0..067399f 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -115,19 +115,21 @@ TaskReport Engine::scheduleExtractionTasks(filemap_t& filemap){ bool inefficientExtraction = LibGribJump::instance().config().getBool("inefficientExtraction", false); size_t counter = 0; + TaskGroup taskGroup; + for (auto& [fname, extractionItems] : filemap) { if (extractionItems[0]->isRemote()) { if (inefficientExtraction) { - taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems)); + taskGroup.enqueueTask(new InefficientFileExtractionTask(taskGroup, counter++, fname, extractionItems)); } else { throw eckit::SeriousBug("Got remote URI from FDB, but forwardExtraction enabled in gribjump config."); } } else { - taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems)); + taskGroup.enqueueTask(new FileExtractionTask(taskGroup, counter++, fname, extractionItems)); } } - taskGroup_.waitForTasks(); - return taskGroup_.report(); + taskGroup.waitForTasks(); + return taskGroup.report(); } TaskOutcome Engine::extract(ExtractionRequests& requests) { @@ -137,20 +139,22 @@ TaskOutcome Engine::extract(ExtractionRequests& requests) { ExItemMap keyToExtractionItem; metkit::mars::MarsRequest unionreq = buildRequestMap(requests, keyToExtractionItem); + // Build file map filemap_t filemap = buildFileMap(unionreq, keyToExtractionItem); MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed()); timer.reset("Gribjump Engine: Built file map"); + // Schedule tasks TaskReport report = scheduleExtractionTasks(filemap); MetricsManager::instance().set("elapsed_tasks", timer.elapsed()); timer.reset("Gribjump Engine: All tasks finished"); + // Collect results ResultsMap results = collectResults(keyToExtractionItem); MetricsManager::instance().set("elapsed_collect_results", timer.elapsed()); - timer.reset("Gribjump Engine: Repackaged results"); - return {results, report}; + return {std::move(results), std::move(report)}; } ResultsMap Engine::collectResults(ExItemMap& keyToExtractionItem) { @@ -165,13 +169,14 @@ ResultsMap Engine::collectResults(ExItemMap& keyToExtractionItem) { return results; } -size_t Engine::scan(const MarsRequests& requests, bool byfiles) { +TaskOutcome Engine::scan(const MarsRequests& requests, bool byfiles) { std::vector uris = FDBLister::instance().URIs(requests); + // XXX do we explicitly need this? if (uris.empty()) { MetricsManager::instance().set("count_scanned_fields", 0); - return 0; + return {0, TaskReport()}; } // forwarded scan requests @@ -188,31 +193,32 @@ size_t Engine::scan(const MarsRequests& requests, bool byfiles) { } } - return scan(filemap); + return scheduleScanTasks(filemap); } -size_t Engine::scan(std::vector files) { +TaskOutcome Engine::scan(std::vector files) { scanmap_t scanmap; for (auto& fname : files) { scanmap[fname] = {}; } - return scan(scanmap); + return scheduleScanTasks(scanmap); } -size_t Engine::scan(const scanmap_t& scanmap) { +TaskOutcome Engine::scheduleScanTasks(const scanmap_t& scanmap) { size_t counter = 0; std::atomic nfields(0); + TaskGroup taskGroup; for (auto& [uri, offsets] : scanmap) { - taskGroup_.enqueueTask(new FileScanTask(taskGroup_, counter++, uri.path(), offsets, nfields)); + taskGroup.enqueueTask(new FileScanTask(taskGroup, counter++, uri.path(), offsets, nfields)); } - taskGroup_.waitForTasks(); + taskGroup.waitForTasks(); MetricsManager::instance().set("count_scanned_fields", static_cast(nfields)); - return nfields; + return {nfields, taskGroup.report()}; } std::map > Engine::axes(const std::string& request, int level) { @@ -220,13 +226,6 @@ std::map > Engine::axes(const std:: return FDBLister::instance().axes(request, level); } -void Engine::reportErrors(eckit::Stream& client) { - taskGroup_.reportErrors(client); -} - -void Engine::raiseErrors() { - taskGroup_.raiseErrors(); -} //---------------------------------------------------------------------------------------------------------------------- } // namespace gribjump diff --git a/src/gribjump/Engine.h b/src/gribjump/Engine.h index 3b5f863..1034d61 100644 --- a/src/gribjump/Engine.h +++ b/src/gribjump/Engine.h @@ -39,17 +39,14 @@ class Engine { TaskOutcome extract(ExtractionRequests& requests); // byfiles: scan entire file, not just fields matching request - size_t scan(const MarsRequests& requests, bool byfiles = false); - size_t scan(std::vector files); - size_t scan(const scanmap_t& scanmap); + TaskOutcome scan(const MarsRequests& requests, bool byfiles = false); + TaskOutcome scan(std::vector files); + TaskOutcome scheduleScanTasks(const scanmap_t& scanmap); std::map > axes(const std::string& request, int level=3); TaskReport scheduleExtractionTasks(filemap_t& filemap); - void reportErrors(eckit::Stream& client_); - void raiseErrors(); - private: filemap_t buildFileMap(const metkit::mars::MarsRequest& unionrequest, ExItemMap& keyToExtractionItem); @@ -58,8 +55,6 @@ class Engine { private: - TaskGroup taskGroup_; /// @todo Maybe we should be returning the taskGroup, rather than storing it here. /// I would really prefer a stateless engine. - }; diff --git a/src/gribjump/Forwarder.cc b/src/gribjump/Forwarder.cc index da4ab06..e2088e9 100644 --- a/src/gribjump/Forwarder.cc +++ b/src/gribjump/Forwarder.cc @@ -26,7 +26,7 @@ Forwarder::Forwarder() { Forwarder::~Forwarder() { } -size_t Forwarder::scan(const std::vector& uris) { +TaskOutcome Forwarder::scan(const std::vector& uris) { ASSERT(uris.size() > 0); @@ -50,7 +50,7 @@ size_t Forwarder::scan(const std::vector& uris) { } taskGroup.waitForTasks(); - return nFields; + return {nFields, taskGroup.report()}; } TaskReport Forwarder::extract(filemap_t& filemap) { diff --git a/src/gribjump/Forwarder.h b/src/gribjump/Forwarder.h index fb47594..8258875 100644 --- a/src/gribjump/Forwarder.h +++ b/src/gribjump/Forwarder.h @@ -11,6 +11,7 @@ /// @author Christopher Bradley #include "gribjump/Types.h" #include "gribjump/Task.h" +#include "gribjump/Engine.h" #include "eckit/net/Endpoint.h" #pragma once @@ -26,7 +27,7 @@ class Forwarder { Forwarder(); ~Forwarder(); - size_t scan(const std::vector& uris); + TaskOutcome scan(const std::vector& uris); TaskReport extract(filemap_t& filemap); private: diff --git a/src/gribjump/LocalGribJump.cc b/src/gribjump/LocalGribJump.cc index 004d9cd..9842879 100644 --- a/src/gribjump/LocalGribJump.cc +++ b/src/gribjump/LocalGribJump.cc @@ -49,16 +49,17 @@ LocalGribJump::LocalGribJump(const Config& config): GribJumpBase(config) { LocalGribJump::~LocalGribJump() {} size_t LocalGribJump::scan(const std::vector& paths) { - Engine engine; - return engine.scan(paths); + auto [result, report] = Engine().scan(paths); + report.raiseErrors(); + return result; } size_t LocalGribJump::scan(const std::vector& requests, bool byfiles) { - Engine engine; - return engine.scan(requests, byfiles); + auto [result, report] = Engine().scan(requests, byfiles); + report.raiseErrors(); + return result; } - std::vector> LocalGribJump::extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) { // Directly from file, no cache, no queue, no threads @@ -86,13 +87,8 @@ std::vector> LocalGribJump::extract(const eckit: /// @todo, change API, remove extraction request std::vector>> LocalGribJump::extract(ExtractionRequests& requests) { - Engine engine; - // ResultsMap results = engine.extract(requests); - auto x = engine.extract(requests); - ResultsMap results = x.result; - // engine.raiseErrors(); - x.report.raiseErrors(); - + auto [results, report] = Engine().extract(requests); + report.raiseErrors(); std::vector>> extractionResults; for (auto& req : requests) { @@ -111,18 +107,15 @@ std::vector>> LocalGribJump::extra } ResultsMap LocalGribJump::extract(const std::vector& requests, const std::vector>& ranges) { - Engine engine; ExtractionRequests extractionRequests; for (size_t i = 0; i < requests.size(); i++) { extractionRequests.push_back(ExtractionRequest(requests[i], ranges[i])); } - // ResultsMap results = engine.extract(extractionRequests); - auto x = engine.extract(extractionRequests); - // engine.raiseErrors(); - x.report.raiseErrors(); - return x.result; + auto [results, report] = Engine().extract(extractionRequests); + report.raiseErrors(); + return std::move(results); } std::map> LocalGribJump::axes(const std::string& request, int level) { @@ -130,8 +123,7 @@ std::map> LocalGribJump::axes(const // Note: This is likely to be removed from GribJump, and moved to FDB. // Here for now to support polytope. - Engine engine; - return engine.axes(request, level); + return Engine().axes(request, level); } static GribJumpBuilder builder("local"); diff --git a/src/gribjump/Task.cc b/src/gribjump/Task.cc index 69fd3ec..da450b6 100644 --- a/src/gribjump/Task.cc +++ b/src/gribjump/Task.cc @@ -54,14 +54,6 @@ void Task::notifyError(const std::string& s) { //---------------------------------------------------------------------------------------------------------------------- -TaskGroup::TaskGroup() {} - -TaskGroup::~TaskGroup() { - for (auto& t : tasks_) { - delete t; - } -} - void TaskGroup::notify(size_t taskid) { std::lock_guard lock(m_); taskStatus_[taskid] = Task::Status::DONE; @@ -118,14 +110,20 @@ void TaskGroup::waitForTasks() { } } -void TaskGroup::reportErrors(eckit::Stream& client) { +//---------------------------------------------------------------------------------------------------------------------- + +TaskReport::TaskReport() {} + +TaskReport::TaskReport(std::vector&& errors) : errors_(std::move(errors)) {} + +void TaskReport::reportErrors(eckit::Stream& client) const { client << errors_.size(); for (const auto& s : errors_) { client << s; } } -void TaskGroup::raiseErrors() { +void TaskReport::raiseErrors() const { if (errors_.size() > 0) { std::stringstream ss; ss << "Encountered " << eckit::Plural(errors_.size(), "error") << " during task execution:" << std::endl; diff --git a/src/gribjump/Task.h b/src/gribjump/Task.h index 3bb7278..e44e84c 100644 --- a/src/gribjump/Task.h +++ b/src/gribjump/Task.h @@ -63,26 +63,11 @@ class Task { class TaskReport { public: - TaskReport() {} - TaskReport(std::vector&& errors) : errors_(std::move(errors)) {} - - void reportErrors(eckit::Stream& client) const { - client << errors_.size(); - for (const auto& s : errors_) { - client << s; - } - } + TaskReport(); + TaskReport(std::vector&& errors); - void raiseErrors() const { - if (errors_.size() > 0) { - std::stringstream ss; - ss << "Encountered " << errors_.size() << " error(s) during task execution:" << std::endl; - for (const auto& s : errors_) { - ss << s << std::endl; - } - throw eckit::SeriousBug(ss.str()); - } - } + void reportErrors(eckit::Stream& client) const; + void raiseErrors() const; private: std::vector errors_; //< stores error messages, empty if no errors @@ -93,9 +78,7 @@ class TaskReport { class TaskGroup { public: - TaskGroup(); - - ~TaskGroup(); + TaskGroup() = default; /// Notify that a task has been completed /// potentially completing all the work for this request @@ -111,9 +94,6 @@ class TaskGroup { /// Wait for all queued tasks to be executed void waitForTasks(); - void reportErrors(eckit::Stream& client); // remove this: TaskReport - void raiseErrors(); // XX remove this: TaskReport - TaskReport report() {return TaskReport(std::move(errors_)); } std::mutex debugMutex_; @@ -138,7 +118,7 @@ class TaskGroup { mutable std::mutex m_; std::condition_variable cv_; - std::vector tasks_; //< stores tasks status, must be initialised by derived class + std::vector> tasks_; //< stores tasks status, must be initialised by derived class std::vector taskStatus_; std::vector errors_; //< stores error messages, empty if no errors diff --git a/src/gribjump/remote/Request.cc b/src/gribjump/remote/Request.cc index a2d6aad..3477fb1 100644 --- a/src/gribjump/remote/Request.cc +++ b/src/gribjump/remote/Request.cc @@ -32,7 +32,7 @@ Request::Request(eckit::Stream& stream) : client_(stream) { } void Request::reportErrors() { - engine_.reportErrors(client_); + report_.reportErrors(client_); } //---------------------------------------------------------------------------------------------------------------------- @@ -57,7 +57,9 @@ ScanRequest::ScanRequest(eckit::Stream& stream) : Request(stream) { } void ScanRequest::execute() { - nFields_ = engine_.scan(requests_, byfiles_); + auto [nfields, report] = engine_.scan(requests_, byfiles_); + nFields_ = nfields; + report_ = std::move(report); } void ScanRequest::replyToClient() { @@ -86,10 +88,10 @@ ExtractRequest::ExtractRequest(eckit::Stream& stream) : Request(stream) { void ExtractRequest::execute() { - auto x = engine_.extract(requests_); - results_ = x.result; - TaskReport report = x.report; - + auto [results, report] = engine_.extract(requests_); + results_ = std::move(results); + report_ = std::move(report); + if (LibGribJump::instance().debug()) { for (auto& pair : results_) { LOG_DEBUG_LIB(LibGribJump) << pair.first << ": "; @@ -160,7 +162,7 @@ ForwardedExtractRequest::ForwardedExtractRequest(eckit::Stream& stream) : Reques } void ForwardedExtractRequest::execute() { - engine_.scheduleExtractionTasks(filemap_); + report_ = engine_.scheduleExtractionTasks(filemap_); } void ForwardedExtractRequest::replyToClient() { @@ -186,7 +188,6 @@ ForwardedScanRequest::ForwardedScanRequest(eckit::Stream& stream) : Request(stre client_ >> nFiles; LOG_DEBUG_LIB(LibGribJump) << "ForwardedScanRequest: nFiles=" << nFiles << std::endl; - size_t count = 0; for (size_t i = 0; i < nFiles; i++) { std::string fname; @@ -201,7 +202,9 @@ ForwardedScanRequest::ForwardedScanRequest(eckit::Stream& stream) : Request(stre } void ForwardedScanRequest::execute() { - nfields_ = engine_.scan(scanmap_); + auto [nfields, report] = engine_.scheduleScanTasks(scanmap_); + nfields_ = nfields; + report_ = std::move(report); } void ForwardedScanRequest::replyToClient() { diff --git a/tests/test_engine.cc b/tests/test_engine.cc index bdcf7a1..b785727 100644 --- a/tests/test_engine.cc +++ b/tests/test_engine.cc @@ -126,8 +126,8 @@ CASE ("Engine: Basic extraction") { // drop the final request exRequests.pop_back(); - ResultsMap results = engine.extract(exRequests); - EXPECT_NO_THROW(engine.raiseErrors()); + auto [results, report] = engine.extract(exRequests); + EXPECT_NO_THROW(report.raiseErrors()); // print contents of map for (auto& [req, exs] : results) { From 94bd63ef3d691e08449273378f8ee2e6bb4a64ba Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Fri, 29 Nov 2024 18:36:28 +0000 Subject: [PATCH 3/5] Refactor tasks and make cancellable --- src/gribjump/Engine.cc | 8 +-- src/gribjump/Forwarder.cc | 7 +-- src/gribjump/Task.cc | 98 ++++++++++++++++++++------------ src/gribjump/Task.h | 75 ++++++++++++++++-------- src/gribjump/remote/WorkItem.h | 3 +- src/gribjump/remote/WorkQueue.cc | 3 +- src/gribjump/remote/WorkQueue.h | 3 +- 7 files changed, 123 insertions(+), 74 deletions(-) diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index 067399f..7d07b6f 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -114,18 +114,17 @@ TaskReport Engine::scheduleExtractionTasks(filemap_t& filemap){ bool inefficientExtraction = LibGribJump::instance().config().getBool("inefficientExtraction", false); - size_t counter = 0; TaskGroup taskGroup; for (auto& [fname, extractionItems] : filemap) { if (extractionItems[0]->isRemote()) { if (inefficientExtraction) { - taskGroup.enqueueTask(new InefficientFileExtractionTask(taskGroup, counter++, fname, extractionItems)); + taskGroup.enqueueTask(fname, extractionItems); } else { throw eckit::SeriousBug("Got remote URI from FDB, but forwardExtraction enabled in gribjump config."); } } else { - taskGroup.enqueueTask(new FileExtractionTask(taskGroup, counter++, fname, extractionItems)); + taskGroup.enqueueTask(fname, extractionItems); } } taskGroup.waitForTasks(); @@ -208,11 +207,10 @@ TaskOutcome Engine::scan(std::vector files) { TaskOutcome Engine::scheduleScanTasks(const scanmap_t& scanmap) { - size_t counter = 0; std::atomic nfields(0); TaskGroup taskGroup; for (auto& [uri, offsets] : scanmap) { - taskGroup.enqueueTask(new FileScanTask(taskGroup, counter++, uri.path(), offsets, nfields)); + taskGroup.enqueueTask(uri.path(), offsets, nfields); } taskGroup.waitForTasks(); diff --git a/src/gribjump/Forwarder.cc b/src/gribjump/Forwarder.cc index e2088e9..a6dd261 100644 --- a/src/gribjump/Forwarder.cc +++ b/src/gribjump/Forwarder.cc @@ -42,11 +42,9 @@ TaskOutcome Forwarder::scan(const std::vector& uris) { } TaskGroup taskGroup; - size_t counter = 0; std::atomic nFields(0); - size_t i = 0; for (auto& [endpoint, scanmap] : serverfilemaps) { - taskGroup.enqueueTask(new ForwardScanTask(taskGroup, counter++, endpoint, scanmap, nFields)); + taskGroup.enqueueTask(endpoint, scanmap, nFields); } taskGroup.waitForTasks(); @@ -57,9 +55,8 @@ TaskReport Forwarder::extract(filemap_t& filemap) { std::unordered_map serverfilemaps = serverFileMap(filemap); TaskGroup taskGroup; - size_t counter = 0; for (auto& [endpoint, subfilemap] : serverfilemaps) { - taskGroup.enqueueTask(new ForwardExtractionTask(taskGroup, counter++, endpoint, subfilemap)); + taskGroup.enqueueTask(endpoint, subfilemap); } taskGroup.waitForTasks(); diff --git a/src/gribjump/Task.cc b/src/gribjump/Task.cc index da450b6..54bc4fb 100644 --- a/src/gribjump/Task.cc +++ b/src/gribjump/Task.cc @@ -30,12 +30,7 @@ namespace gribjump { -static std::string thread_id_str() { - auto id = std::this_thread::get_id(); - std::stringstream ss; - ss << id; - return ss.str(); -} +constexpr bool cancelOnFirstError = true; ///@todo make this configurable //---------------------------------------------------------------------------------------------------------------------- @@ -45,24 +40,46 @@ Task::Task(TaskGroup& taskGroup, size_t taskid) : taskGroup_(taskGroup), taskid_ Task::~Task() {} void Task::notify() { + status_ = Status::DONE; taskGroup_.notify(id()); } void Task::notifyError(const std::string& s) { + status_ = Status::FAILED; taskGroup_.notifyError(id(), s); } +void Task::notifyCancelled() { + status_ = Status::CANCELLED; + taskGroup_.notifyCancelled(id()); +} + +void Task::execute() { + // atomically set status to executing, but only if it is currently pending + Status expected = Status::PENDING; + if (!status_.compare_exchange_strong(expected, Status::EXECUTING)) { + return; + } + executeImpl(); + notify(); +} + +void Task::cancel() { + // atomically set status to cancelled, but only if it is pending + Status expected = Status::PENDING; + status_.compare_exchange_strong(expected, Status::CANCELLED); +} + //---------------------------------------------------------------------------------------------------------------------- void TaskGroup::notify(size_t taskid) { std::lock_guard lock(m_); - taskStatus_[taskid] = Task::Status::DONE; - counter_++; + nComplete_++; // Logging progress if (waiting_) { - if (counter_ == logcounter_) { - eckit::Log::info() << "Gribjump Progress: " << counter_ << " of " << taskStatus_.size() << " tasks complete" << std::endl; + if (nComplete_ == logcounter_) { + eckit::Log::info() << "Gribjump Progress: " << nComplete_ << " of " << tasks_.size() << " tasks complete" << std::endl; logcounter_ += logincrement_; } } @@ -70,40 +87,59 @@ void TaskGroup::notify(size_t taskid) { cv_.notify_one(); } +void TaskGroup::notifyCancelled(size_t taskid) { + std::lock_guard lock(m_); + nComplete_++; + nCancelledTasks_++; + cv_.notify_one(); +} void TaskGroup::notifyError(size_t taskid, const std::string& s) { std::lock_guard lock(m_); - taskStatus_[taskid] = Task::Status::FAILED; errors_.push_back(s); - counter_++; + nComplete_++; cv_.notify_one(); + + if (cancelOnFirstError) { + cancelTasks(); + } +} + +// Note: This will only affect tasks that have not yet started. Cancelled tasks will call notifyCancelled() when they are executed. +// NB: We do not lock a mutex as this will be called from notifyError() +void TaskGroup::cancelTasks() { + for (auto& task : tasks_) { + task->cancel(); + } } void TaskGroup::enqueueTask(Task* task) { - taskStatus_.push_back(Task::Status::PENDING); - WorkItem w(task); - WorkQueue& queue = WorkQueue::instance(); - queue.push(w); + std::lock_guard lock(m_); + tasks_.push_back(std::unique_ptr(task)); // TaskGroup takes ownership of its tasks + WorkQueue::instance().push(task); + LOG_DEBUG_LIB(LibGribJump) << "Queued task " << tasks_.size() << std::endl; } void TaskGroup::waitForTasks() { - ASSERT(taskStatus_.size() > 0); // todo Might want to allow for "no tasks" case, though be careful with the lock / counter. - LOG_DEBUG_LIB(LibGribJump) << "Waiting for " << eckit::Plural(taskStatus_.size(), "task") << "..." << std::endl; std::unique_lock lock(m_); + ASSERT(tasks_.size() > 0); + LOG_DEBUG_LIB(LibGribJump) << "Waiting for " << eckit::Plural(tasks_.size(), "task") << "..." << std::endl; waiting_ = true; - logincrement_ = taskStatus_.size() / 10; + logincrement_ = tasks_.size() / 10; if (logincrement_ == 0) { logincrement_ = 1; } - cv_.wait(lock, [&]{return counter_ == taskStatus_.size();}); + cv_.wait(lock, [&]{return nComplete_ == tasks_.size();}); waiting_ = false; + done_ = true; LOG_DEBUG_LIB(LibGribJump) << "All tasks complete" << std::endl; - MetricsManager::instance().set("count_tasks", taskStatus_.size()); + MetricsManager::instance().set("count_tasks", tasks_.size()); MetricsManager::instance().set("count_failed_tasks", errors_.size()); + MetricsManager::instance().set("count_cancelled_tasks", nCancelledTasks_); if (errors_.size() > 0) { MetricsManager::instance().set("first_error", errors_[0]); @@ -143,9 +179,7 @@ FileExtractionTask::FileExtractionTask(TaskGroup& taskgroup, const size_t id, co { } -void FileExtractionTask::execute() { - const std::string thread_id = thread_id_str(); - eckit::Timer full_timer("Thread total time. Thread: " + thread_id, eckit::Log::debug()); +void FileExtractionTask::executeImpl() { // Sort extractionItems_ by offset std::sort(extractionItems_.begin(), extractionItems_.end(), [](const ExtractionItem* a, const ExtractionItem* b) { @@ -153,8 +187,6 @@ void FileExtractionTask::execute() { }); extract(); - - notify(); } void FileExtractionTask::extract() { @@ -201,12 +233,10 @@ ForwardExtractionTask::ForwardExtractionTask(TaskGroup& taskgroup, const size_t filemap_(filemap) {} -void ForwardExtractionTask::execute(){ +void ForwardExtractionTask::executeImpl(){ RemoteGribJump remoteGribJump(endpoint_); remoteGribJump.forwardExtract(filemap_); - - notify(); } ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, std::atomic& nfields): @@ -216,11 +246,10 @@ ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::n nfields_(nfields) { } -void ForwardScanTask::execute(){ +void ForwardScanTask::executeImpl(){ RemoteGribJump remoteGribJump(endpoint_); nfields_ += remoteGribJump.forwardScan(scanmap_); - notify(); } //---------------------------------------------------------------------------------------------------------------------- @@ -277,15 +306,10 @@ FileScanTask::FileScanTask(TaskGroup& taskgroup, const size_t id, const eckit::P nfields_(nfields){ } -void FileScanTask::execute() { - eckit::Timer timer; - eckit::Timer full_timer("Thread total time. Thread: " + thread_id_str()); +void FileScanTask::executeImpl() { std::sort(offsets_.begin(), offsets_.end()); - scan(); - - notify(); } void FileScanTask::scan() { diff --git a/src/gribjump/Task.h b/src/gribjump/Task.h index e44e84c..18bd30d 100644 --- a/src/gribjump/Task.h +++ b/src/gribjump/Task.h @@ -30,8 +30,10 @@ class Task { enum Status { DONE = 0, - PENDING = 1, - FAILED = 2 + PENDING, + FAILED, + EXECUTING, + CANCELLED, }; Task(TaskGroup& taskGroup, size_t id); @@ -41,18 +43,28 @@ class Task { size_t id() const { return taskid_; } /// executes the task to completion - virtual void execute() = 0; + virtual void execute() final; /// notifies the completion of the task - virtual void notify(); + void notify(); + + /// notifies that the task was cancelled before execution e.g. because of an error in a related task + void notifyCancelled(); /// notifies the error in execution of the task - virtual void notifyError(const std::string& s); + void notifyError(const std::string& s); + + /// cancels the task. If execute() is called after this, it will return immediately. + void cancel(); + +protected: + virtual void executeImpl() = 0; protected: TaskGroup& taskGroup_; //< Groups like-tasks to be executed in parallel size_t taskid_; //< Task id within parent request + std::atomic status_ = Status::PENDING; }; //---------------------------------------------------------------------------------------------------------------------- @@ -81,45 +93,60 @@ class TaskGroup { TaskGroup() = default; /// Notify that a task has been completed - /// potentially completing all the work for this request void notify(size_t taskid); /// Notify that a task has finished with error - /// potentially completing all the work for this request void notifyError(size_t taskid, const std::string& s); - /// Enqueue tasks to be executed to complete this request - void enqueueTask(Task* task); - + /// Notify that a task was cancelled + void notifyCancelled(size_t taskid); + + /// Enqueue tasks on the global task queue + template + void enqueueTask(Args&&... args) { + enqueueTask(new TaskType(*this, tasks_.size(), std::forward(args)...)); + } + /// Wait for all queued tasks to be executed void waitForTasks(); - TaskReport report() {return TaskReport(std::move(errors_)); } - - std::mutex debugMutex_; + /// Report on errors and other status information about executed tasks. + /// Calling code may use this to report to a client or raise an exception. + TaskReport report() { + std::lock_guard lock(m_); + ASSERT(done_); + return TaskReport(std::move(errors_)); + } size_t nTasks() const { std::lock_guard lock(m_); - return taskStatus_.size(); + return tasks_.size(); } + size_t nErrors() const { std::lock_guard lock(m_); return errors_.size(); } - + +private: + + void enqueueTask(Task* task); + + void cancelTasks(); + private: - int counter_ = 0; //< incremented by notify() or notifyError() + int nComplete_ = 0; //< incremented when a task completes + int nCancelledTasks_ = 0; //< incremented by notifyCancelled() int logcounter_ = 1; //< used to log progress int logincrement_ = 1; //< used to log progress - bool waiting_ = false; - + bool waiting_ = false; //< true if waiting for tasks to complete + bool done_ = false; //< true if all tasks have completed mutable std::mutex m_; std::condition_variable cv_; - std::vector> tasks_; //< stores tasks status, must be initialised by derived class - std::vector taskStatus_; + std::vector> tasks_; std::vector errors_; //< stores error messages, empty if no errors }; @@ -133,7 +160,7 @@ class FileExtractionTask : public Task { FileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems); - void execute() override; + void executeImpl() override; virtual void extract(); @@ -164,7 +191,7 @@ class ForwardExtractionTask : public Task { ForwardExtractionTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, filemap_t& filemap); - void execute() override; + void executeImpl() override; private: eckit::net::Endpoint endpoint_; @@ -177,7 +204,7 @@ class ForwardScanTask : public Task { ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, std::atomic& nfields_); - void execute() override; + void executeImpl() override; private: eckit::net::Endpoint endpoint_; @@ -194,7 +221,7 @@ class FileScanTask : public Task { FileScanTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, const std::vector& offsets, std::atomic& nfields); - void execute() override; + void executeImpl() override; void scan(); diff --git a/src/gribjump/remote/WorkItem.h b/src/gribjump/remote/WorkItem.h index c4582c4..c8bd638 100644 --- a/src/gribjump/remote/WorkItem.h +++ b/src/gribjump/remote/WorkItem.h @@ -34,7 +34,8 @@ class WorkItem { void error(const std::string& s); private: - Task* task_; + + Task* task_; //< non-owning pointer }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/gribjump/remote/WorkQueue.cc b/src/gribjump/remote/WorkQueue.cc index 5a5dd36..f18e24a 100644 --- a/src/gribjump/remote/WorkQueue.cc +++ b/src/gribjump/remote/WorkQueue.cc @@ -65,7 +65,8 @@ WorkQueue::WorkQueue() : queue_(eckit::Resource("$GRIBJUMP_QUEUESIZE;gri } } -void WorkQueue::push(WorkItem& item) { +void WorkQueue::push(Task* task) { + WorkItem item(task); queue_.push(item); } diff --git a/src/gribjump/remote/WorkQueue.h b/src/gribjump/remote/WorkQueue.h index 323056d..0580b9f 100644 --- a/src/gribjump/remote/WorkQueue.h +++ b/src/gribjump/remote/WorkQueue.h @@ -18,6 +18,7 @@ #include "gribjump/ExtractionData.h" #include "gribjump/remote/WorkItem.h" +#include "gribjump/Task.h" namespace gribjump { @@ -30,7 +31,7 @@ class WorkQueue : private eckit::NonCopyable { ~WorkQueue(); - void push(WorkItem& item); + void push(Task* task); protected: WorkQueue(); From d910428f42f4f65979a92630c97dd7a63a49f8b9 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Tue, 3 Dec 2024 09:17:08 +0100 Subject: [PATCH 4/5] Add header --- src/gribjump/Task.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/gribjump/Task.h b/src/gribjump/Task.h index 18bd30d..960da5d 100644 --- a/src/gribjump/Task.h +++ b/src/gribjump/Task.h @@ -11,6 +11,7 @@ #pragma once #include +#include #include #include "eckit/serialisation/Stream.h" From c88c79a866bd7ec3b46ad896e98a7b16b09b08c9 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Mon, 9 Dec 2024 14:26:07 +0000 Subject: [PATCH 5/5] Minor tidy up --- src/gribjump/Engine.cc | 2 +- src/gribjump/Engine.h | 1 - src/gribjump/LocalGribJump.cc | 4 ---- src/gribjump/remote/Request.h | 6 +++--- 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index 7d07b6f..174afdc 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -172,7 +172,7 @@ TaskOutcome Engine::scan(const MarsRequests& requests, bool byfiles) { std::vector uris = FDBLister::instance().URIs(requests); - // XXX do we explicitly need this? + /// @todo do we explicitly need this? if (uris.empty()) { MetricsManager::instance().set("count_scanned_fields", 0); return {0, TaskReport()}; diff --git a/src/gribjump/Engine.h b/src/gribjump/Engine.h index 1034d61..a395c18 100644 --- a/src/gribjump/Engine.h +++ b/src/gribjump/Engine.h @@ -35,7 +35,6 @@ class Engine { Engine(); ~Engine(); - // ResultsMap extract(ExtractionRequests& requests); TaskOutcome extract(ExtractionRequests& requests); // byfiles: scan entire file, not just fields matching request diff --git a/src/gribjump/LocalGribJump.cc b/src/gribjump/LocalGribJump.cc index 9842879..806b0fd 100644 --- a/src/gribjump/LocalGribJump.cc +++ b/src/gribjump/LocalGribJump.cc @@ -119,10 +119,6 @@ ResultsMap LocalGribJump::extract(const std::vector& requests, cons } std::map> LocalGribJump::axes(const std::string& request, int level) { - - // Note: This is likely to be removed from GribJump, and moved to FDB. - // Here for now to support polytope. - return Engine().axes(request, level); } diff --git a/src/gribjump/remote/Request.h b/src/gribjump/remote/Request.h index 22f9413..a8e8b4b 100644 --- a/src/gribjump/remote/Request.h +++ b/src/gribjump/remote/Request.h @@ -28,7 +28,7 @@ namespace gribjump { //---------------------------------------------------------------------------------------------------------------------- class Request { -public: // methods +public: Request(eckit::Stream& stream); @@ -45,8 +45,8 @@ class Request { protected: // members eckit::Stream& client_; - Engine engine_; //< Engine and schedule tasks based on request // XXX Can we now remove this? - TaskReport report_; // + Engine engine_; + TaskReport report_; uint64_t id_; };