Skip to content

Commit

Permalink
Merge pull request #35 from ecmwf/feature/taskreport
Browse files Browse the repository at this point in the history
Feature/taskreport
  • Loading branch information
ChrisspyB authored Dec 9, 2024
2 parents a17f6ec + c88c79a commit 40eab02
Show file tree
Hide file tree
Showing 13 changed files with 221 additions and 156 deletions.
53 changes: 25 additions & 28 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,53 +104,56 @@ filemap_t Engine::buildFileMap(const metkit::mars::MarsRequest& unionrequest, Ex
return filemap;
}

void Engine::scheduleExtractionTasks(filemap_t& filemap){
TaskReport Engine::scheduleExtractionTasks(filemap_t& filemap){

bool forwardExtraction = LibGribJump::instance().config().getBool("forwardExtraction", false);
if (forwardExtraction) {
Forwarder forwarder;
forwarder.extract(filemap);
return;
return forwarder.extract(filemap);
}

bool inefficientExtraction = LibGribJump::instance().config().getBool("inefficientExtraction", false);

size_t counter = 0;
TaskGroup taskGroup;

for (auto& [fname, extractionItems] : filemap) {
if (extractionItems[0]->isRemote()) {
if (inefficientExtraction) {
taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems));
taskGroup.enqueueTask<InefficientFileExtractionTask>(fname, extractionItems);
} else {
throw eckit::SeriousBug("Got remote URI from FDB, but forwardExtraction enabled in gribjump config.");
}
} else {
taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems));
taskGroup.enqueueTask<FileExtractionTask>(fname, extractionItems);
}
}
taskGroup_.waitForTasks();
taskGroup.waitForTasks();
return taskGroup.report();
}

ResultsMap Engine::extract(ExtractionRequests& requests) {
TaskOutcome<ResultsMap> Engine::extract(ExtractionRequests& requests) {

eckit::Timer timer("Engine::extract");

ExItemMap keyToExtractionItem;
metkit::mars::MarsRequest unionreq = buildRequestMap(requests, keyToExtractionItem);

// Build file map
filemap_t filemap = buildFileMap(unionreq, keyToExtractionItem);
MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed());
timer.reset("Gribjump Engine: Built file map");

scheduleExtractionTasks(filemap);
// Schedule tasks
TaskReport report = scheduleExtractionTasks(filemap);
MetricsManager::instance().set("elapsed_tasks", timer.elapsed());
timer.reset("Gribjump Engine: All tasks finished");

// Collect results
ResultsMap results = collectResults(keyToExtractionItem);
MetricsManager::instance().set("elapsed_collect_results", timer.elapsed());

timer.reset("Gribjump Engine: Repackaged results");

return results;
return {std::move(results), std::move(report)};
}

ResultsMap Engine::collectResults(ExItemMap& keyToExtractionItem) {
Expand All @@ -165,13 +168,14 @@ ResultsMap Engine::collectResults(ExItemMap& keyToExtractionItem) {
return results;
}

size_t Engine::scan(const MarsRequests& requests, bool byfiles) {
TaskOutcome<size_t> Engine::scan(const MarsRequests& requests, bool byfiles) {

std::vector<eckit::URI> uris = FDBLister::instance().URIs(requests);

/// @todo do we explicitly need this?
if (uris.empty()) {
MetricsManager::instance().set("count_scanned_fields", 0);
return 0;
return {0, TaskReport()};
}

// forwarded scan requests
Expand All @@ -188,45 +192,38 @@ size_t Engine::scan(const MarsRequests& requests, bool byfiles) {
}
}

return scan(filemap);
return scheduleScanTasks(filemap);
}

size_t Engine::scan(std::vector<eckit::PathName> files) {
TaskOutcome<size_t> Engine::scan(std::vector<eckit::PathName> files) {

scanmap_t scanmap;
for (auto& fname : files) {
scanmap[fname] = {};
}

return scan(scanmap);
return scheduleScanTasks(scanmap);
}

size_t Engine::scan(const scanmap_t& scanmap) {
TaskOutcome<size_t> Engine::scheduleScanTasks(const scanmap_t& scanmap) {

size_t counter = 0;
std::atomic<size_t> nfields(0);
TaskGroup taskGroup;
for (auto& [uri, offsets] : scanmap) {
taskGroup_.enqueueTask(new FileScanTask(taskGroup_, counter++, uri.path(), offsets, nfields));
taskGroup.enqueueTask<FileScanTask>(uri.path(), offsets, nfields);
}
taskGroup_.waitForTasks();
taskGroup.waitForTasks();

MetricsManager::instance().set("count_scanned_fields", static_cast<size_t>(nfields));

return nfields;
return {nfields, taskGroup.report()};
}

std::map<std::string, std::unordered_set<std::string> > Engine::axes(const std::string& request, int level) {
MetricsManager::instance().set("request", request);
return FDBLister::instance().axes(request, level);
}

void Engine::reportErrors(eckit::Stream& client) {
taskGroup_.reportErrors(client);
}

void Engine::raiseErrors() {
taskGroup_.raiseErrors();
}
//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
22 changes: 12 additions & 10 deletions src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,29 @@

namespace gribjump {

template <typename T>

struct TaskOutcome {
T result;
TaskReport report;
};

class Engine {
public:

Engine();
~Engine();

ResultsMap extract(ExtractionRequests& requests);
TaskOutcome<ResultsMap> extract(ExtractionRequests& requests);

// byfiles: scan entire file, not just fields matching request
size_t scan(const MarsRequests& requests, bool byfiles = false);
size_t scan(std::vector<eckit::PathName> files);
size_t scan(const scanmap_t& scanmap);
TaskOutcome<size_t> scan(const MarsRequests& requests, bool byfiles = false);
TaskOutcome<size_t> scan(std::vector<eckit::PathName> files);
TaskOutcome<size_t> scheduleScanTasks(const scanmap_t& scanmap);

std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level=3);

void scheduleExtractionTasks(filemap_t& filemap);

void reportErrors(eckit::Stream& client_);
void raiseErrors();
TaskReport scheduleExtractionTasks(filemap_t& filemap);

private:

Expand All @@ -50,8 +54,6 @@ class Engine {

private:

TaskGroup taskGroup_; /// @todo Maybe we should be returning the taskGroup, rather than storing it here. /// I would really prefer a stateless engine.

};


Expand Down
16 changes: 7 additions & 9 deletions src/gribjump/Forwarder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "gribjump/Forwarder.h"
#include "gribjump/ExtractionItem.h"
#include "gribjump/LibGribJump.h"
#include "gribjump/Task.h"

namespace gribjump {

Expand All @@ -27,7 +26,7 @@ Forwarder::Forwarder() {
Forwarder::~Forwarder() {
}

size_t Forwarder::scan(const std::vector<eckit::URI>& uris) {
TaskOutcome<size_t> Forwarder::scan(const std::vector<eckit::URI>& uris) {

ASSERT(uris.size() > 0);

Expand All @@ -43,26 +42,25 @@ size_t Forwarder::scan(const std::vector<eckit::URI>& uris) {
}

TaskGroup taskGroup;
size_t counter = 0;
std::atomic<size_t> nFields(0);
size_t i = 0;
for (auto& [endpoint, scanmap] : serverfilemaps) {
taskGroup.enqueueTask(new ForwardScanTask(taskGroup, counter++, endpoint, scanmap, nFields));
taskGroup.enqueueTask<ForwardScanTask>(endpoint, scanmap, nFields);
}
taskGroup.waitForTasks();

return nFields;
return {nFields, taskGroup.report()};
}

void Forwarder::extract(filemap_t& filemap) {
TaskReport Forwarder::extract(filemap_t& filemap) {
std::unordered_map<eckit::net::Endpoint, filemap_t> serverfilemaps = serverFileMap(filemap);

TaskGroup taskGroup;
size_t counter = 0;
for (auto& [endpoint, subfilemap] : serverfilemaps) {
taskGroup.enqueueTask(new ForwardExtractionTask(taskGroup, counter++, endpoint, subfilemap));
taskGroup.enqueueTask<ForwardExtractionTask>(endpoint, subfilemap);
}
taskGroup.waitForTasks();

return taskGroup.report();
}


Expand Down
6 changes: 4 additions & 2 deletions src/gribjump/Forwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

/// @author Christopher Bradley
#include "gribjump/Types.h"
#include "gribjump/Task.h"
#include "gribjump/Engine.h"
#include "eckit/net/Endpoint.h"

#pragma once
Expand All @@ -25,8 +27,8 @@ class Forwarder {
Forwarder();
~Forwarder();

size_t scan(const std::vector<eckit::URI>& uris);
void extract(filemap_t& filemap);
TaskOutcome<size_t> scan(const std::vector<eckit::URI>& uris);
TaskReport extract(filemap_t& filemap);

private:

Expand Down
30 changes: 12 additions & 18 deletions src/gribjump/LocalGribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,17 @@ LocalGribJump::LocalGribJump(const Config& config): GribJumpBase(config) {
LocalGribJump::~LocalGribJump() {}

size_t LocalGribJump::scan(const std::vector<eckit::PathName>& paths) {
Engine engine;
return engine.scan(paths);
auto [result, report] = Engine().scan(paths);
report.raiseErrors();
return result;
}

size_t LocalGribJump::scan(const std::vector<MarsRequest>& requests, bool byfiles) {
Engine engine;
return engine.scan(requests, byfiles);
auto [result, report] = Engine().scan(requests, byfiles);
report.raiseErrors();
return result;
}


std::vector<std::unique_ptr<ExtractionItem>> LocalGribJump::extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) {
// Directly from file, no cache, no queue, no threads

Expand Down Expand Up @@ -86,9 +87,8 @@ std::vector<std::unique_ptr<ExtractionItem>> LocalGribJump::extract(const eckit:
/// @todo, change API, remove extraction request
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> LocalGribJump::extract(ExtractionRequests& requests) {

Engine engine;
ResultsMap results = engine.extract(requests);
engine.raiseErrors();
auto [results, report] = Engine().extract(requests);
report.raiseErrors();

std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extractionResults;
for (auto& req : requests) {
Expand All @@ -107,25 +107,19 @@ std::vector<std::vector<std::unique_ptr<ExtractionResult>>> LocalGribJump::extra
}

ResultsMap LocalGribJump::extract(const std::vector<std::string>& requests, const std::vector<std::vector<Range>>& ranges) {
Engine engine;
ExtractionRequests extractionRequests;

for (size_t i = 0; i < requests.size(); i++) {
extractionRequests.push_back(ExtractionRequest(requests[i], ranges[i]));
}

ResultsMap results = engine.extract(extractionRequests);
engine.raiseErrors();
return results;
auto [results, report] = Engine().extract(extractionRequests);
report.raiseErrors();
return std::move(results);
}

std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const std::string& request, int level) {

// Note: This is likely to be removed from GribJump, and moved to FDB.
// Here for now to support polytope.

Engine engine;
return engine.axes(request, level);
return Engine().axes(request, level);
}

static GribJumpBuilder<LocalGribJump> builder("local");
Expand Down
Loading

0 comments on commit 40eab02

Please sign in to comment.