diff --git a/VERSION b/VERSION index a3df0a6..6f4eebd 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.8.0 +0.8.1 diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index cb3aa02..174afdc 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -104,53 +104,56 @@ filemap_t Engine::buildFileMap(const metkit::mars::MarsRequest& unionrequest, Ex return filemap; } -void Engine::scheduleExtractionTasks(filemap_t& filemap){ +TaskReport Engine::scheduleExtractionTasks(filemap_t& filemap){ bool forwardExtraction = LibGribJump::instance().config().getBool("forwardExtraction", false); if (forwardExtraction) { Forwarder forwarder; - forwarder.extract(filemap); - return; + return forwarder.extract(filemap); } bool inefficientExtraction = LibGribJump::instance().config().getBool("inefficientExtraction", false); - size_t counter = 0; + TaskGroup taskGroup; + for (auto& [fname, extractionItems] : filemap) { if (extractionItems[0]->isRemote()) { if (inefficientExtraction) { - taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems)); + taskGroup.enqueueTask(fname, extractionItems); } else { throw eckit::SeriousBug("Got remote URI from FDB, but forwardExtraction enabled in gribjump config."); } } else { - taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems)); + taskGroup.enqueueTask(fname, extractionItems); } } - taskGroup_.waitForTasks(); + taskGroup.waitForTasks(); + return taskGroup.report(); } -ResultsMap Engine::extract(ExtractionRequests& requests) { +TaskOutcome Engine::extract(ExtractionRequests& requests) { eckit::Timer timer("Engine::extract"); ExItemMap keyToExtractionItem; metkit::mars::MarsRequest unionreq = buildRequestMap(requests, keyToExtractionItem); + // Build file map filemap_t filemap = buildFileMap(unionreq, keyToExtractionItem); MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed()); timer.reset("Gribjump Engine: Built file map"); - scheduleExtractionTasks(filemap); + // Schedule tasks + TaskReport report = scheduleExtractionTasks(filemap); MetricsManager::instance().set("elapsed_tasks", timer.elapsed()); timer.reset("Gribjump Engine: All tasks finished"); + // Collect results ResultsMap results = collectResults(keyToExtractionItem); MetricsManager::instance().set("elapsed_collect_results", timer.elapsed()); - timer.reset("Gribjump Engine: Repackaged results"); - return results; + return {std::move(results), std::move(report)}; } ResultsMap Engine::collectResults(ExItemMap& keyToExtractionItem) { @@ -165,13 +168,14 @@ ResultsMap Engine::collectResults(ExItemMap& keyToExtractionItem) { return results; } -size_t Engine::scan(const MarsRequests& requests, bool byfiles) { +TaskOutcome Engine::scan(const MarsRequests& requests, bool byfiles) { std::vector uris = FDBLister::instance().URIs(requests); + /// @todo do we explicitly need this? if (uris.empty()) { MetricsManager::instance().set("count_scanned_fields", 0); - return 0; + return {0, TaskReport()}; } // forwarded scan requests @@ -188,31 +192,31 @@ size_t Engine::scan(const MarsRequests& requests, bool byfiles) { } } - return scan(filemap); + return scheduleScanTasks(filemap); } -size_t Engine::scan(std::vector files) { +TaskOutcome Engine::scan(std::vector files) { scanmap_t scanmap; for (auto& fname : files) { scanmap[fname] = {}; } - return scan(scanmap); + return scheduleScanTasks(scanmap); } -size_t Engine::scan(const scanmap_t& scanmap) { +TaskOutcome Engine::scheduleScanTasks(const scanmap_t& scanmap) { - size_t counter = 0; std::atomic nfields(0); + TaskGroup taskGroup; for (auto& [uri, offsets] : scanmap) { - taskGroup_.enqueueTask(new FileScanTask(taskGroup_, counter++, uri.path(), offsets, nfields)); + taskGroup.enqueueTask(uri.path(), offsets, nfields); } - taskGroup_.waitForTasks(); + taskGroup.waitForTasks(); MetricsManager::instance().set("count_scanned_fields", static_cast(nfields)); - return nfields; + return {nfields, taskGroup.report()}; } std::map > Engine::axes(const std::string& request, int level) { @@ -220,13 +224,6 @@ std::map > Engine::axes(const std:: return FDBLister::instance().axes(request, level); } -void Engine::reportErrors(eckit::Stream& client) { - taskGroup_.reportErrors(client); -} - -void Engine::raiseErrors() { - taskGroup_.raiseErrors(); -} //---------------------------------------------------------------------------------------------------------------------- } // namespace gribjump diff --git a/src/gribjump/Engine.h b/src/gribjump/Engine.h index 4521795..a395c18 100644 --- a/src/gribjump/Engine.h +++ b/src/gribjump/Engine.h @@ -22,25 +22,29 @@ namespace gribjump { +template + +struct TaskOutcome { + T result; + TaskReport report; +}; + class Engine { public: Engine(); ~Engine(); - ResultsMap extract(ExtractionRequests& requests); + TaskOutcome extract(ExtractionRequests& requests); // byfiles: scan entire file, not just fields matching request - size_t scan(const MarsRequests& requests, bool byfiles = false); - size_t scan(std::vector files); - size_t scan(const scanmap_t& scanmap); + TaskOutcome scan(const MarsRequests& requests, bool byfiles = false); + TaskOutcome scan(std::vector files); + TaskOutcome scheduleScanTasks(const scanmap_t& scanmap); std::map > axes(const std::string& request, int level=3); - void scheduleExtractionTasks(filemap_t& filemap); - - void reportErrors(eckit::Stream& client_); - void raiseErrors(); + TaskReport scheduleExtractionTasks(filemap_t& filemap); private: @@ -50,8 +54,6 @@ class Engine { private: - TaskGroup taskGroup_; /// @todo Maybe we should be returning the taskGroup, rather than storing it here. /// I would really prefer a stateless engine. - }; diff --git a/src/gribjump/Forwarder.cc b/src/gribjump/Forwarder.cc index 5b01e37..a6dd261 100644 --- a/src/gribjump/Forwarder.cc +++ b/src/gribjump/Forwarder.cc @@ -17,7 +17,6 @@ #include "gribjump/Forwarder.h" #include "gribjump/ExtractionItem.h" #include "gribjump/LibGribJump.h" -#include "gribjump/Task.h" namespace gribjump { @@ -27,7 +26,7 @@ Forwarder::Forwarder() { Forwarder::~Forwarder() { } -size_t Forwarder::scan(const std::vector& uris) { +TaskOutcome Forwarder::scan(const std::vector& uris) { ASSERT(uris.size() > 0); @@ -43,26 +42,25 @@ size_t Forwarder::scan(const std::vector& uris) { } TaskGroup taskGroup; - size_t counter = 0; std::atomic nFields(0); - size_t i = 0; for (auto& [endpoint, scanmap] : serverfilemaps) { - taskGroup.enqueueTask(new ForwardScanTask(taskGroup, counter++, endpoint, scanmap, nFields)); + taskGroup.enqueueTask(endpoint, scanmap, nFields); } taskGroup.waitForTasks(); - return nFields; + return {nFields, taskGroup.report()}; } -void Forwarder::extract(filemap_t& filemap) { +TaskReport Forwarder::extract(filemap_t& filemap) { std::unordered_map serverfilemaps = serverFileMap(filemap); TaskGroup taskGroup; - size_t counter = 0; for (auto& [endpoint, subfilemap] : serverfilemaps) { - taskGroup.enqueueTask(new ForwardExtractionTask(taskGroup, counter++, endpoint, subfilemap)); + taskGroup.enqueueTask(endpoint, subfilemap); } taskGroup.waitForTasks(); + + return taskGroup.report(); } diff --git a/src/gribjump/Forwarder.h b/src/gribjump/Forwarder.h index 3cea3f4..8258875 100644 --- a/src/gribjump/Forwarder.h +++ b/src/gribjump/Forwarder.h @@ -10,6 +10,8 @@ /// @author Christopher Bradley #include "gribjump/Types.h" +#include "gribjump/Task.h" +#include "gribjump/Engine.h" #include "eckit/net/Endpoint.h" #pragma once @@ -25,8 +27,8 @@ class Forwarder { Forwarder(); ~Forwarder(); - size_t scan(const std::vector& uris); - void extract(filemap_t& filemap); + TaskOutcome scan(const std::vector& uris); + TaskReport extract(filemap_t& filemap); private: diff --git a/src/gribjump/LocalGribJump.cc b/src/gribjump/LocalGribJump.cc index 5365078..806b0fd 100644 --- a/src/gribjump/LocalGribJump.cc +++ b/src/gribjump/LocalGribJump.cc @@ -49,16 +49,17 @@ LocalGribJump::LocalGribJump(const Config& config): GribJumpBase(config) { LocalGribJump::~LocalGribJump() {} size_t LocalGribJump::scan(const std::vector& paths) { - Engine engine; - return engine.scan(paths); + auto [result, report] = Engine().scan(paths); + report.raiseErrors(); + return result; } size_t LocalGribJump::scan(const std::vector& requests, bool byfiles) { - Engine engine; - return engine.scan(requests, byfiles); + auto [result, report] = Engine().scan(requests, byfiles); + report.raiseErrors(); + return result; } - std::vector> LocalGribJump::extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) { // Directly from file, no cache, no queue, no threads @@ -86,9 +87,8 @@ std::vector> LocalGribJump::extract(const eckit: /// @todo, change API, remove extraction request std::vector>> LocalGribJump::extract(ExtractionRequests& requests) { - Engine engine; - ResultsMap results = engine.extract(requests); - engine.raiseErrors(); + auto [results, report] = Engine().extract(requests); + report.raiseErrors(); std::vector>> extractionResults; for (auto& req : requests) { @@ -107,25 +107,19 @@ std::vector>> LocalGribJump::extra } ResultsMap LocalGribJump::extract(const std::vector& requests, const std::vector>& ranges) { - Engine engine; ExtractionRequests extractionRequests; for (size_t i = 0; i < requests.size(); i++) { extractionRequests.push_back(ExtractionRequest(requests[i], ranges[i])); } - ResultsMap results = engine.extract(extractionRequests); - engine.raiseErrors(); - return results; + auto [results, report] = Engine().extract(extractionRequests); + report.raiseErrors(); + return std::move(results); } std::map> LocalGribJump::axes(const std::string& request, int level) { - - // Note: This is likely to be removed from GribJump, and moved to FDB. - // Here for now to support polytope. - - Engine engine; - return engine.axes(request, level); + return Engine().axes(request, level); } static GribJumpBuilder builder("local"); diff --git a/src/gribjump/Task.cc b/src/gribjump/Task.cc index 69fd3ec..54bc4fb 100644 --- a/src/gribjump/Task.cc +++ b/src/gribjump/Task.cc @@ -30,12 +30,7 @@ namespace gribjump { -static std::string thread_id_str() { - auto id = std::this_thread::get_id(); - std::stringstream ss; - ss << id; - return ss.str(); -} +constexpr bool cancelOnFirstError = true; ///@todo make this configurable //---------------------------------------------------------------------------------------------------------------------- @@ -45,32 +40,46 @@ Task::Task(TaskGroup& taskGroup, size_t taskid) : taskGroup_(taskGroup), taskid_ Task::~Task() {} void Task::notify() { + status_ = Status::DONE; taskGroup_.notify(id()); } void Task::notifyError(const std::string& s) { + status_ = Status::FAILED; taskGroup_.notifyError(id(), s); } -//---------------------------------------------------------------------------------------------------------------------- - -TaskGroup::TaskGroup() {} +void Task::notifyCancelled() { + status_ = Status::CANCELLED; + taskGroup_.notifyCancelled(id()); +} -TaskGroup::~TaskGroup() { - for (auto& t : tasks_) { - delete t; +void Task::execute() { + // atomically set status to executing, but only if it is currently pending + Status expected = Status::PENDING; + if (!status_.compare_exchange_strong(expected, Status::EXECUTING)) { + return; } + executeImpl(); + notify(); +} + +void Task::cancel() { + // atomically set status to cancelled, but only if it is pending + Status expected = Status::PENDING; + status_.compare_exchange_strong(expected, Status::CANCELLED); } +//---------------------------------------------------------------------------------------------------------------------- + void TaskGroup::notify(size_t taskid) { std::lock_guard lock(m_); - taskStatus_[taskid] = Task::Status::DONE; - counter_++; + nComplete_++; // Logging progress if (waiting_) { - if (counter_ == logcounter_) { - eckit::Log::info() << "Gribjump Progress: " << counter_ << " of " << taskStatus_.size() << " tasks complete" << std::endl; + if (nComplete_ == logcounter_) { + eckit::Log::info() << "Gribjump Progress: " << nComplete_ << " of " << tasks_.size() << " tasks complete" << std::endl; logcounter_ += logincrement_; } } @@ -78,54 +87,79 @@ void TaskGroup::notify(size_t taskid) { cv_.notify_one(); } +void TaskGroup::notifyCancelled(size_t taskid) { + std::lock_guard lock(m_); + nComplete_++; + nCancelledTasks_++; + cv_.notify_one(); +} void TaskGroup::notifyError(size_t taskid, const std::string& s) { std::lock_guard lock(m_); - taskStatus_[taskid] = Task::Status::FAILED; errors_.push_back(s); - counter_++; + nComplete_++; cv_.notify_one(); + + if (cancelOnFirstError) { + cancelTasks(); + } +} + +// Note: This will only affect tasks that have not yet started. Cancelled tasks will call notifyCancelled() when they are executed. +// NB: We do not lock a mutex as this will be called from notifyError() +void TaskGroup::cancelTasks() { + for (auto& task : tasks_) { + task->cancel(); + } } void TaskGroup::enqueueTask(Task* task) { - taskStatus_.push_back(Task::Status::PENDING); - WorkItem w(task); - WorkQueue& queue = WorkQueue::instance(); - queue.push(w); + std::lock_guard lock(m_); + tasks_.push_back(std::unique_ptr(task)); // TaskGroup takes ownership of its tasks + WorkQueue::instance().push(task); + LOG_DEBUG_LIB(LibGribJump) << "Queued task " << tasks_.size() << std::endl; } void TaskGroup::waitForTasks() { - ASSERT(taskStatus_.size() > 0); // todo Might want to allow for "no tasks" case, though be careful with the lock / counter. - LOG_DEBUG_LIB(LibGribJump) << "Waiting for " << eckit::Plural(taskStatus_.size(), "task") << "..." << std::endl; std::unique_lock lock(m_); + ASSERT(tasks_.size() > 0); + LOG_DEBUG_LIB(LibGribJump) << "Waiting for " << eckit::Plural(tasks_.size(), "task") << "..." << std::endl; waiting_ = true; - logincrement_ = taskStatus_.size() / 10; + logincrement_ = tasks_.size() / 10; if (logincrement_ == 0) { logincrement_ = 1; } - cv_.wait(lock, [&]{return counter_ == taskStatus_.size();}); + cv_.wait(lock, [&]{return nComplete_ == tasks_.size();}); waiting_ = false; + done_ = true; LOG_DEBUG_LIB(LibGribJump) << "All tasks complete" << std::endl; - MetricsManager::instance().set("count_tasks", taskStatus_.size()); + MetricsManager::instance().set("count_tasks", tasks_.size()); MetricsManager::instance().set("count_failed_tasks", errors_.size()); + MetricsManager::instance().set("count_cancelled_tasks", nCancelledTasks_); if (errors_.size() > 0) { MetricsManager::instance().set("first_error", errors_[0]); } } -void TaskGroup::reportErrors(eckit::Stream& client) { +//---------------------------------------------------------------------------------------------------------------------- + +TaskReport::TaskReport() {} + +TaskReport::TaskReport(std::vector&& errors) : errors_(std::move(errors)) {} + +void TaskReport::reportErrors(eckit::Stream& client) const { client << errors_.size(); for (const auto& s : errors_) { client << s; } } -void TaskGroup::raiseErrors() { +void TaskReport::raiseErrors() const { if (errors_.size() > 0) { std::stringstream ss; ss << "Encountered " << eckit::Plural(errors_.size(), "error") << " during task execution:" << std::endl; @@ -145,9 +179,7 @@ FileExtractionTask::FileExtractionTask(TaskGroup& taskgroup, const size_t id, co { } -void FileExtractionTask::execute() { - const std::string thread_id = thread_id_str(); - eckit::Timer full_timer("Thread total time. Thread: " + thread_id, eckit::Log::debug()); +void FileExtractionTask::executeImpl() { // Sort extractionItems_ by offset std::sort(extractionItems_.begin(), extractionItems_.end(), [](const ExtractionItem* a, const ExtractionItem* b) { @@ -155,8 +187,6 @@ void FileExtractionTask::execute() { }); extract(); - - notify(); } void FileExtractionTask::extract() { @@ -203,12 +233,10 @@ ForwardExtractionTask::ForwardExtractionTask(TaskGroup& taskgroup, const size_t filemap_(filemap) {} -void ForwardExtractionTask::execute(){ +void ForwardExtractionTask::executeImpl(){ RemoteGribJump remoteGribJump(endpoint_); remoteGribJump.forwardExtract(filemap_); - - notify(); } ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, std::atomic& nfields): @@ -218,11 +246,10 @@ ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::n nfields_(nfields) { } -void ForwardScanTask::execute(){ +void ForwardScanTask::executeImpl(){ RemoteGribJump remoteGribJump(endpoint_); nfields_ += remoteGribJump.forwardScan(scanmap_); - notify(); } //---------------------------------------------------------------------------------------------------------------------- @@ -279,15 +306,10 @@ FileScanTask::FileScanTask(TaskGroup& taskgroup, const size_t id, const eckit::P nfields_(nfields){ } -void FileScanTask::execute() { - eckit::Timer timer; - eckit::Timer full_timer("Thread total time. Thread: " + thread_id_str()); +void FileScanTask::executeImpl() { std::sort(offsets_.begin(), offsets_.end()); - scan(); - - notify(); } void FileScanTask::scan() { diff --git a/src/gribjump/Task.h b/src/gribjump/Task.h index 2824db2..960da5d 100644 --- a/src/gribjump/Task.h +++ b/src/gribjump/Task.h @@ -11,6 +11,7 @@ #pragma once #include +#include #include #include "eckit/serialisation/Stream.h" @@ -30,8 +31,10 @@ class Task { enum Status { DONE = 0, - PENDING = 1, - FAILED = 2 + PENDING, + FAILED, + EXECUTING, + CANCELLED, }; Task(TaskGroup& taskGroup, size_t id); @@ -41,71 +44,110 @@ class Task { size_t id() const { return taskid_; } /// executes the task to completion - virtual void execute() = 0; + virtual void execute() final; /// notifies the completion of the task - virtual void notify(); + void notify(); + + /// notifies that the task was cancelled before execution e.g. because of an error in a related task + void notifyCancelled(); /// notifies the error in execution of the task - virtual void notifyError(const std::string& s); + void notifyError(const std::string& s); + + /// cancels the task. If execute() is called after this, it will return immediately. + void cancel(); + +protected: + virtual void executeImpl() = 0; protected: TaskGroup& taskGroup_; //< Groups like-tasks to be executed in parallel size_t taskid_; //< Task id within parent request + std::atomic status_ = Status::PENDING; }; //---------------------------------------------------------------------------------------------------------------------- -class TaskGroup { +// TaskReport contains error messages and other information produced by a TaskGroup, and methods to either +// report them to a client or raise an exception. +// TaskGroup will return a TaskReport object to calling code. +class TaskReport { + public: + TaskReport(); + TaskReport(std::vector&& errors); + + void reportErrors(eckit::Stream& client) const; + void raiseErrors() const; + +private: + std::vector errors_; //< stores error messages, empty if no errors +}; - TaskGroup(); +//---------------------------------------------------------------------------------------------------------------------- +// +class TaskGroup { +public: - ~TaskGroup(); + TaskGroup() = default; /// Notify that a task has been completed - /// potentially completing all the work for this request void notify(size_t taskid); /// Notify that a task has finished with error - /// potentially completing all the work for this request void notifyError(size_t taskid, const std::string& s); - /// Enqueue tasks to be executed to complete this request - void enqueueTask(Task* task); - - /// Wait for all queued tasks to be executed - void waitForTasks(); + /// Notify that a task was cancelled + void notifyCancelled(size_t taskid); - void reportErrors(eckit::Stream& client); - void raiseErrors(); + /// Enqueue tasks on the global task queue + template + void enqueueTask(Args&&... args) { + enqueueTask(new TaskType(*this, tasks_.size(), std::forward(args)...)); + } + /// Wait for all queued tasks to be executed + void waitForTasks(); - std::mutex debugMutex_; + /// Report on errors and other status information about executed tasks. + /// Calling code may use this to report to a client or raise an exception. + TaskReport report() { + std::lock_guard lock(m_); + ASSERT(done_); + return TaskReport(std::move(errors_)); + } size_t nTasks() const { std::lock_guard lock(m_); - return taskStatus_.size(); + return tasks_.size(); } + size_t nErrors() const { std::lock_guard lock(m_); return errors_.size(); } - + +private: + + void enqueueTask(Task* task); + + void cancelTasks(); + private: - int counter_ = 0; //< incremented by notify() or notifyError() + int nComplete_ = 0; //< incremented when a task completes + int nCancelledTasks_ = 0; //< incremented by notifyCancelled() int logcounter_ = 1; //< used to log progress int logincrement_ = 1; //< used to log progress - bool waiting_ = false; - + bool waiting_ = false; //< true if waiting for tasks to complete + bool done_ = false; //< true if all tasks have completed mutable std::mutex m_; std::condition_variable cv_; - std::vector tasks_; //< stores tasks status, must be initialised by derived class - std::vector taskStatus_; + std::vector> tasks_; std::vector errors_; //< stores error messages, empty if no errors }; @@ -119,7 +161,7 @@ class FileExtractionTask : public Task { FileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems); - void execute() override; + void executeImpl() override; virtual void extract(); @@ -150,7 +192,7 @@ class ForwardExtractionTask : public Task { ForwardExtractionTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, filemap_t& filemap); - void execute() override; + void executeImpl() override; private: eckit::net::Endpoint endpoint_; @@ -163,7 +205,7 @@ class ForwardScanTask : public Task { ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, std::atomic& nfields_); - void execute() override; + void executeImpl() override; private: eckit::net::Endpoint endpoint_; @@ -180,7 +222,7 @@ class FileScanTask : public Task { FileScanTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, const std::vector& offsets, std::atomic& nfields); - void execute() override; + void executeImpl() override; void scan(); diff --git a/src/gribjump/remote/Request.cc b/src/gribjump/remote/Request.cc index fa5434c..3477fb1 100644 --- a/src/gribjump/remote/Request.cc +++ b/src/gribjump/remote/Request.cc @@ -32,7 +32,7 @@ Request::Request(eckit::Stream& stream) : client_(stream) { } void Request::reportErrors() { - engine_.reportErrors(client_); + report_.reportErrors(client_); } //---------------------------------------------------------------------------------------------------------------------- @@ -57,7 +57,9 @@ ScanRequest::ScanRequest(eckit::Stream& stream) : Request(stream) { } void ScanRequest::execute() { - nFields_ = engine_.scan(requests_, byfiles_); + auto [nfields, report] = engine_.scan(requests_, byfiles_); + nFields_ = nfields; + report_ = std::move(report); } void ScanRequest::replyToClient() { @@ -86,8 +88,10 @@ ExtractRequest::ExtractRequest(eckit::Stream& stream) : Request(stream) { void ExtractRequest::execute() { - results_ = engine_.extract(requests_); - + auto [results, report] = engine_.extract(requests_); + results_ = std::move(results); + report_ = std::move(report); + if (LibGribJump::instance().debug()) { for (auto& pair : results_) { LOG_DEBUG_LIB(LibGribJump) << pair.first << ": "; @@ -158,7 +162,7 @@ ForwardedExtractRequest::ForwardedExtractRequest(eckit::Stream& stream) : Reques } void ForwardedExtractRequest::execute() { - engine_.scheduleExtractionTasks(filemap_); + report_ = engine_.scheduleExtractionTasks(filemap_); } void ForwardedExtractRequest::replyToClient() { @@ -184,7 +188,6 @@ ForwardedScanRequest::ForwardedScanRequest(eckit::Stream& stream) : Request(stre client_ >> nFiles; LOG_DEBUG_LIB(LibGribJump) << "ForwardedScanRequest: nFiles=" << nFiles << std::endl; - size_t count = 0; for (size_t i = 0; i < nFiles; i++) { std::string fname; @@ -199,7 +202,9 @@ ForwardedScanRequest::ForwardedScanRequest(eckit::Stream& stream) : Request(stre } void ForwardedScanRequest::execute() { - nfields_ = engine_.scan(scanmap_); + auto [nfields, report] = engine_.scheduleScanTasks(scanmap_); + nfields_ = nfields; + report_ = std::move(report); } void ForwardedScanRequest::replyToClient() { diff --git a/src/gribjump/remote/Request.h b/src/gribjump/remote/Request.h index 7483f7d..a8e8b4b 100644 --- a/src/gribjump/remote/Request.h +++ b/src/gribjump/remote/Request.h @@ -28,7 +28,7 @@ namespace gribjump { //---------------------------------------------------------------------------------------------------------------------- class Request { -public: // methods +public: Request(eckit::Stream& stream); @@ -45,8 +45,8 @@ class Request { protected: // members eckit::Stream& client_; - Engine engine_; //< Engine and schedule tasks based on request - + Engine engine_; + TaskReport report_; uint64_t id_; }; diff --git a/src/gribjump/remote/WorkItem.h b/src/gribjump/remote/WorkItem.h index c4582c4..c8bd638 100644 --- a/src/gribjump/remote/WorkItem.h +++ b/src/gribjump/remote/WorkItem.h @@ -34,7 +34,8 @@ class WorkItem { void error(const std::string& s); private: - Task* task_; + + Task* task_; //< non-owning pointer }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/gribjump/remote/WorkQueue.cc b/src/gribjump/remote/WorkQueue.cc index 5a5dd36..f18e24a 100644 --- a/src/gribjump/remote/WorkQueue.cc +++ b/src/gribjump/remote/WorkQueue.cc @@ -65,7 +65,8 @@ WorkQueue::WorkQueue() : queue_(eckit::Resource("$GRIBJUMP_QUEUESIZE;gri } } -void WorkQueue::push(WorkItem& item) { +void WorkQueue::push(Task* task) { + WorkItem item(task); queue_.push(item); } diff --git a/src/gribjump/remote/WorkQueue.h b/src/gribjump/remote/WorkQueue.h index 323056d..0580b9f 100644 --- a/src/gribjump/remote/WorkQueue.h +++ b/src/gribjump/remote/WorkQueue.h @@ -18,6 +18,7 @@ #include "gribjump/ExtractionData.h" #include "gribjump/remote/WorkItem.h" +#include "gribjump/Task.h" namespace gribjump { @@ -30,7 +31,7 @@ class WorkQueue : private eckit::NonCopyable { ~WorkQueue(); - void push(WorkItem& item); + void push(Task* task); protected: WorkQueue(); diff --git a/tests/test_engine.cc b/tests/test_engine.cc index bdcf7a1..b785727 100644 --- a/tests/test_engine.cc +++ b/tests/test_engine.cc @@ -126,8 +126,8 @@ CASE ("Engine: Basic extraction") { // drop the final request exRequests.pop_back(); - ResultsMap results = engine.extract(exRequests); - EXPECT_NO_THROW(engine.raiseErrors()); + auto [results, report] = engine.extract(exRequests); + EXPECT_NO_THROW(report.raiseErrors()); // print contents of map for (auto& [req, exs] : results) {