Skip to content

Commit

Permalink
Merge pull request #38 from ecmwf/feature/monitor
Browse files Browse the repository at this point in the history
Add status logging for monitoring
  • Loading branch information
ChrisspyB authored Jan 16, 2025
2 parents fc689d8 + 1d94546 commit f8f70e3
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 2 deletions.
32 changes: 31 additions & 1 deletion src/gribjump/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ void Task::notifyCancelled() {
}

void Task::execute() {
// atomically set status to executing, but only if it is currently pending
// atomically set status to executing, but only if it is currently pending (i.e. not cancelled)
Status expected = Status::PENDING;
if (!status_.compare_exchange_strong(expected, Status::EXECUTING)) {
return;
}
info();
executeImpl();
notify();
}
Expand All @@ -85,26 +86,33 @@ void TaskGroup::notify(size_t taskid) {
}

cv_.notify_one();
info();
}

void TaskGroup::notifyCancelled(size_t taskid) {
std::lock_guard<std::mutex> lock(m_);
nComplete_++;
nCancelledTasks_++;
cv_.notify_one();
info();
}

void TaskGroup::notifyError(size_t taskid, const std::string& s) {
std::lock_guard<std::mutex> lock(m_);
errors_.push_back(s);
nComplete_++;
cv_.notify_one();
info();

if (cancelOnFirstError) {
cancelTasks();
}
}

void TaskGroup::info() const {
eckit::Log::status() << nComplete_ << " of " << tasks_.size() << " tasks complete" << std::endl;
}

// Note: This will only affect tasks that have not yet started. Cancelled tasks will call notifyCancelled() when they are executed.
// NB: We do not lock a mutex as this will be called from notifyError()
void TaskGroup::cancelTasks() {
Expand Down Expand Up @@ -227,6 +235,10 @@ void FileExtractionTask::extract() {
}
}

void FileExtractionTask::info() const {
eckit::Log::status() << "Extract " << extractionItems_.size() << " items from " << fname_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

// Forward the work to a remote server, and wait for the results.
Expand All @@ -242,6 +254,12 @@ void ForwardExtractionTask::executeImpl(){
remoteGribJump.forwardExtract(filemap_);
}

void ForwardExtractionTask::info() const {
eckit::Log::status() << "Forward extract to " << endpoint_ << "nfiles=" << filemap_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, std::atomic<size_t>& nfields):
Task(taskgroup, id),
endpoint_(endpoint),
Expand All @@ -255,6 +273,10 @@ void ForwardScanTask::executeImpl(){
nfields_ += remoteGribJump.forwardScan(scanmap_);
}

void ForwardScanTask::info() const {
eckit::Log::status() << "Forward scan to " << endpoint_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------
InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems):
FileExtractionTask(taskgroup, id, fname, extractionItems) {
Expand Down Expand Up @@ -299,6 +321,10 @@ void InefficientFileExtractionTask::extract() {
}
}

void InefficientFileExtractionTask::info() const {
eckit::Log::status() << "Inefficiently extract " << extractionItems_.size() << " items from " << fname_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------


Expand All @@ -325,6 +351,10 @@ void FileScanTask::scan() {
nfields_ += InfoCache::instance().scan(fname_, offsets_);
}

void FileScanTask::info() const {
eckit::Log::status() << "Scan " << offsets_.size() << " offsets in " << fname_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
17 changes: 16 additions & 1 deletion src/gribjump/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ class Task {
/// cancels the task. If execute() is called after this, it will return immediately.
void cancel();

/// Write description of task to eckit::Log::status() for monitoring
virtual void info() const = 0;

protected:
virtual void executeImpl() = 0;

protected:

TaskGroup& taskGroup_; //< Groups like-tasks to be executed in parallel
size_t taskid_; //< Task id within parent request
std::atomic<Status> status_ = Status::PENDING;
Expand Down Expand Up @@ -129,6 +132,8 @@ class TaskGroup {
return errors_.size();
}

void info() const;

private:

void enqueueTask(Task* task);
Expand Down Expand Up @@ -165,6 +170,8 @@ class FileExtractionTask : public Task {

virtual void extract();

virtual void info() const override;

protected:
eckit::PathName fname_;
ExtractionItems& extractionItems_;
Expand All @@ -183,6 +190,8 @@ class InefficientFileExtractionTask : public FileExtractionTask {

void extract() override;

virtual void info() const override;

};

//----------------------------------------------------------------------------------------------------------------------
Expand All @@ -194,6 +203,8 @@ class ForwardExtractionTask : public Task {

void executeImpl() override;

virtual void info() const override;

private:
eckit::net::Endpoint endpoint_;
filemap_t& filemap_;
Expand All @@ -207,6 +218,8 @@ class ForwardScanTask : public Task {

void executeImpl() override;

virtual void info() const override;

private:
eckit::net::Endpoint endpoint_;
scanmap_t& scanmap_;
Expand All @@ -226,6 +239,8 @@ class FileScanTask : public Task {

void scan();

virtual void info() const override;

private:
eckit::PathName fname_;
std::vector<eckit::Offset> offsets_;
Expand Down
1 change: 1 addition & 0 deletions src/gribjump/remote/GribJumpUser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void GribJumpUser::processRequest(eckit::Stream& s) {
RequestType request(s);
MetricsManager::instance().set("elapsed_receive", timer.elapsed());
timer.reset("Request received");
request.info();

request.execute();
MetricsManager::instance().set("elapsed_execute", timer.elapsed());
Expand Down
19 changes: 19 additions & 0 deletions src/gribjump/remote/Request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ void ScanRequest::replyToClient() {
client_ << nFields_;
}

void ScanRequest::info() const {
eckit::Log::status() << "New ScanRequest: nRequests=" << requests_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------


Expand Down Expand Up @@ -127,6 +131,10 @@ void ExtractRequest::replyToClient() {
LOG_DEBUG_LIB(LibGribJump) << "Sent " << nRequests << " results to client" << std::endl;
}

void ExtractRequest::info() const {
eckit::Log::status() << "New ExtractRequest: nRequests=" << requests_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

ForwardedExtractRequest::ForwardedExtractRequest(eckit::Stream& stream) : Request(stream) {
Expand Down Expand Up @@ -178,6 +186,9 @@ void ForwardedExtractRequest::replyToClient() {
}
}

void ForwardedExtractRequest::info() const {
eckit::Log::status() << "New ForwardedExtractRequest: nItems=" << items_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -210,6 +221,10 @@ void ForwardedScanRequest::execute() {
void ForwardedScanRequest::replyToClient() {
client_ << nfields_;
}

void ForwardedScanRequest::info() const {
eckit::Log::status() << "New ForwardedScanRequest: nfiles=" << scanmap_.size() << std::endl;
}
//----------------------------------------------------------------------------------------------------------------------

AxesRequest::AxesRequest(eckit::Stream& stream) : Request(stream) {
Expand Down Expand Up @@ -246,6 +261,10 @@ void AxesRequest::replyToClient() {
}
}

void AxesRequest::info() const {
eckit::Log::status() << "New AxesRequest: " << request_ << ", level=" << level_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
13 changes: 13 additions & 0 deletions src/gribjump/remote/Request.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class Request {

void reportErrors();

/// Print information about the request to status(), for monitoring
virtual void info() const = 0;

protected: // members

eckit::Stream& client_;
Expand All @@ -63,6 +66,8 @@ class ScanRequest : public Request {

void replyToClient() override;

void info() const override;

private:

std::vector<metkit::mars::MarsRequest> requests_;
Expand All @@ -85,6 +90,8 @@ class ExtractRequest : public Request {

void replyToClient() override;

void info() const override;

private:
std::vector<ExtractionRequest> requests_;

Expand All @@ -105,6 +112,8 @@ class ForwardedExtractRequest : public Request {

void replyToClient() override;

void info() const override;

private:

std::vector<std::unique_ptr<ExtractionItem>> items_;
Expand All @@ -127,6 +136,8 @@ class ForwardedScanRequest : public Request {

void replyToClient() override;

void info() const override;

private:

std::vector<std::unique_ptr<ExtractionItem>> items_;
Expand All @@ -149,6 +160,8 @@ class AxesRequest : public Request {

void replyToClient() override;

void info() const override;

private:

std::string request_; /// @todo why is this a string?
Expand Down
1 change: 1 addition & 0 deletions src/gribjump/remote/WorkQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ WorkQueue::WorkQueue() : queue_(eckit::Resource<size_t>("$GRIBJUMP_QUEUESIZE;gri
// GribJump gj = GribJump(); // one per thread

for (;;) {
eckit::Log::status() << "Waiting for job" << std::endl;
WorkItem item;
if (queue_.pop(item) == -1) {
LOG_DEBUG_LIB(LibGribJump) << "Thread " << std::this_thread::get_id() << " stopping (queue closed)" << std::endl;
Expand Down

0 comments on commit f8f70e3

Please sign in to comment.