Skip to content

Commit

Permalink
Merge branch 'release/0.9.0'
Browse files Browse the repository at this point in the history
ChrisspyB committed Jan 31, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents bdc733d + d73b494 commit 9f299cd
Showing 18 changed files with 141 additions and 19 deletions.
13 changes: 13 additions & 0 deletions .github/ci-config-pygribjump.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
dependencies: |
ecmwf/ecbuild
MathisRosenhauer/libaec@master
ecmwf/eccodes
ecmwf/eckit@develop
ecmwf/metkit
ecmwf/fdb
ecmwf/gribjump
dependency_cmake_options: |
ecmwf/gribjump: "-DENABLE_FDB_BUILD_TOOLS=ON"
dependency_branch: develop
parallelism_factor: 8
self_build: false
19 changes: 19 additions & 0 deletions .github/ci-hpc-config-pygribjump.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
build:
python: "3.10"
modules:
- ninja
- aec
dependencies:
- ecmwf/ecbuild@develop
- ecmwf/eccodes@develop
- ecmwf/eckit@develop
- ecmwf/metkit@develop
- ecmwf/fdb@develop
- ecmwf/gribjump@develop
dependency_cmake_options:
- "ecmwf/gribjump: '-DENABLE_FDB_BUILD_TOOLS=ON'"
- "ecmwf/fdb: '-DENABLE_LUSTRE=OFF'"
parallel: 64
env:
- ECCODES_SAMPLES_PATH=$ECCODES_DIR/share/eccodes/samples
- ECCODES_DEFINITION_PATH=$ECCODES_DIR/share/eccodes/definitions
8 changes: 5 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -4,10 +4,10 @@ on:
# Trigger the workflow on push to master or develop, except tag creation
push:
branches:
- 'master'
- 'develop'
- "master"
- "develop"
tags-ignore:
- '**'
- "**"

# Trigger the workflow on pull request
pull_request: ~
@@ -27,6 +27,7 @@ jobs:
uses: ecmwf-actions/downstream-ci/.github/workflows/downstream-ci.yml@main
with:
gribjump: ecmwf/gribjump@${{ github.event.pull_request.head.sha || github.sha }}
pygribjump: pygribjump:ecmwf/gribjump@${{ github.event.pull_request.head.sha || github.sha }}
codecov_upload: true
secrets: inherit

@@ -55,6 +56,7 @@ jobs:
uses: ecmwf-actions/downstream-ci/.github/workflows/downstream-ci-hpc.yml@main
with:
gribjump: ecmwf/gribjump@${{ github.event.pull_request.head.sha || github.sha }}
pygribjump: pygribjump:ecmwf/gribjump@${{ github.event.pull_request.head.sha || github.sha }}
secrets: inherit

# Run CI of private downstream packages on HPC
6 changes: 3 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -13,16 +13,16 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
### dependencies and options

set( PERSISTENT_NAMESPACE "eckit" CACHE INTERNAL "" ) # needed for generating .b files for persistent support
ecbuild_find_package( NAME eckit VERSION 1.25.2 REQUIRED )
ecbuild_find_package( NAME metkit VERSION 1.5 REQUIRED )
ecbuild_find_package( NAME eckit VERSION 1.28.3 REQUIRED )
ecbuild_find_package( NAME metkit VERSION 1.11.22 REQUIRED )

# Set "GRIBJUMP_LOCAL_EXTRACT" to build everything. If it is off, build only minimal clientside functionality.
ecbuild_add_option( FEATURE GRIBJUMP_LOCAL_EXTRACT
DEFAULT ON
DESCRIPTION "Build local extraction and serverside functionality")

if (HAVE_GRIBJUMP_LOCAL_EXTRACT)
ecbuild_find_package( NAME fdb5 VERSION 5.13.1 REQUIRED )
ecbuild_find_package( NAME fdb5 VERSION 5.14.0 REQUIRED )
set(GRIBJUMP_HAVE_FDB 1)

ecbuild_find_package( NAME eccodes VERSION 2.32.1 REQUIRED )
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.8.1
0.9.0
1 change: 0 additions & 1 deletion pygribjump/src/pygribjump/VERSION

This file was deleted.

4 changes: 2 additions & 2 deletions pygribjump/src/pygribjump/_version.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from .pygribjump import *
import importlib.metadata

with open(Path(__file__).parent / "VERSION") as f:
__version__ = f.read().strip()
__version__ = importlib.metadata.version("pygribjump")
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -45,4 +45,7 @@ include-package-data = true
zip-safe = false

[tool.setuptools.package-data]
"pygribjump" = ["VERSION", "pygribjump/src/pygribjump/gribjump_c.h"]
"pygribjump" = [
"VERSION",
"gribjump_c.h"
]
4 changes: 2 additions & 2 deletions src/gribjump/FDBPlugin.cc
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ FDBPlugin& FDBPlugin::instance() {

FDBPlugin::FDBPlugin() {
// NB: Can't access eckit::Resource outside the callback because eckit::main has not finished initialising
fdb5::LibFdb5::instance().registerConstructorCallback([](fdb5::FDB& fdb) {
fdb5::LibFdb5::instance().registerConstructorCallback([](fdb5::CallbackRegistry& fdb) {
static bool enableGribjump = eckit::Resource<bool>("fdbEnableGribjump;$FDB_ENABLE_GRIBJUMP", false);
static bool disableGribjump = eckit::Resource<bool>("fdbDisableGribjump;$FDB_DISABLE_GRIBJUMP", false); // Emergency off-switch
if (enableGribjump && !disableGribjump) {
@@ -44,7 +44,7 @@ FDBPlugin::FDBPlugin() {
});
}

void FDBPlugin::addFDB(fdb5::FDB& fdb) {
void FDBPlugin::addFDB(fdb5::CallbackRegistry& fdb) {

parseConfig();
std::lock_guard<std::mutex> lock(mutex_);
2 changes: 1 addition & 1 deletion src/gribjump/FDBPlugin.h
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ class FDBPlugin {
public:

static FDBPlugin& instance();
void addFDB(fdb5::FDB& fdb);
void addFDB(fdb5::CallbackRegistry& fdb);

private:

41 changes: 37 additions & 4 deletions src/gribjump/Task.cc
Original file line number Diff line number Diff line change
@@ -55,11 +55,12 @@ void Task::notifyCancelled() {
}

void Task::execute() {
// atomically set status to executing, but only if it is currently pending
// atomically set status to executing, but only if it is currently pending (i.e. not cancelled)
Status expected = Status::PENDING;
if (!status_.compare_exchange_strong(expected, Status::EXECUTING)) {
return;
}
info();
executeImpl();
notify();
}
@@ -85,26 +86,33 @@ void TaskGroup::notify(size_t taskid) {
}

cv_.notify_one();
info();
}

void TaskGroup::notifyCancelled(size_t taskid) {
std::lock_guard<std::mutex> lock(m_);
nComplete_++;
nCancelledTasks_++;
cv_.notify_one();
info();
}

void TaskGroup::notifyError(size_t taskid, const std::string& s) {
std::lock_guard<std::mutex> lock(m_);
errors_.push_back(s);
nComplete_++;
cv_.notify_one();
info();

if (cancelOnFirstError) {
cancelTasks();
}
}

void TaskGroup::info() const {
eckit::Log::status() << nComplete_ << " of " << tasks_.size() << " tasks complete" << std::endl;
}

// Note: This will only affect tasks that have not yet started. Cancelled tasks will call notifyCancelled() when they are executed.
// NB: We do not lock a mutex as this will be called from notifyError()
void TaskGroup::cancelTasks() {
@@ -114,9 +122,12 @@ void TaskGroup::cancelTasks() {
}

void TaskGroup::enqueueTask(Task* task) {
std::lock_guard<std::mutex> lock(m_);
tasks_.push_back(std::unique_ptr<Task>(task)); // TaskGroup takes ownership of its tasks
WorkQueue::instance().push(task);
{
std::lock_guard<std::mutex> lock(m_);
tasks_.push_back(std::unique_ptr<Task>(task)); // TaskGroup takes ownership of its tasks
}

WorkQueue::instance().push(task); /// @note Can block, so release the lock first

LOG_DEBUG_LIB(LibGribJump) << "Queued task " << tasks_.size() << std::endl;
}
@@ -224,6 +235,10 @@ void FileExtractionTask::extract() {
}
}

void FileExtractionTask::info() const {
eckit::Log::status() << "Extract " << extractionItems_.size() << " items from " << fname_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

// Forward the work to a remote server, and wait for the results.
@@ -239,6 +254,12 @@ void ForwardExtractionTask::executeImpl(){
remoteGribJump.forwardExtract(filemap_);
}

void ForwardExtractionTask::info() const {
eckit::Log::status() << "Forward extract to " << endpoint_ << "nfiles=" << filemap_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, std::atomic<size_t>& nfields):
Task(taskgroup, id),
endpoint_(endpoint),
@@ -252,6 +273,10 @@ void ForwardScanTask::executeImpl(){
nfields_ += remoteGribJump.forwardScan(scanmap_);
}

void ForwardScanTask::info() const {
eckit::Log::status() << "Forward scan to " << endpoint_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------
InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems):
FileExtractionTask(taskgroup, id, fname, extractionItems) {
@@ -296,6 +321,10 @@ void InefficientFileExtractionTask::extract() {
}
}

void InefficientFileExtractionTask::info() const {
eckit::Log::status() << "Inefficiently extract " << extractionItems_.size() << " items from " << fname_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------


@@ -322,6 +351,10 @@ void FileScanTask::scan() {
nfields_ += InfoCache::instance().scan(fname_, offsets_);
}

void FileScanTask::info() const {
eckit::Log::status() << "Scan " << offsets_.size() << " offsets in " << fname_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
17 changes: 16 additions & 1 deletion src/gribjump/Task.h
Original file line number Diff line number Diff line change
@@ -58,11 +58,14 @@ class Task {
/// cancels the task. If execute() is called after this, it will return immediately.
void cancel();

/// Write description of task to eckit::Log::status() for monitoring
virtual void info() const = 0;

protected:
virtual void executeImpl() = 0;

protected:

TaskGroup& taskGroup_; //< Groups like-tasks to be executed in parallel
size_t taskid_; //< Task id within parent request
std::atomic<Status> status_ = Status::PENDING;
@@ -129,6 +132,8 @@ class TaskGroup {
return errors_.size();
}

void info() const;

private:

void enqueueTask(Task* task);
@@ -165,6 +170,8 @@ class FileExtractionTask : public Task {

virtual void extract();

virtual void info() const override;

protected:
eckit::PathName fname_;
ExtractionItems& extractionItems_;
@@ -183,6 +190,8 @@ class InefficientFileExtractionTask : public FileExtractionTask {

void extract() override;

virtual void info() const override;

};

//----------------------------------------------------------------------------------------------------------------------
@@ -194,6 +203,8 @@ class ForwardExtractionTask : public Task {

void executeImpl() override;

virtual void info() const override;

private:
eckit::net::Endpoint endpoint_;
filemap_t& filemap_;
@@ -207,6 +218,8 @@ class ForwardScanTask : public Task {

void executeImpl() override;

virtual void info() const override;

private:
eckit::net::Endpoint endpoint_;
scanmap_t& scanmap_;
@@ -226,6 +239,8 @@ class FileScanTask : public Task {

void scan();

virtual void info() const override;

private:
eckit::PathName fname_;
std::vector<eckit::Offset> offsets_;
1 change: 1 addition & 0 deletions src/gribjump/info/InfoExtractor.cc
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@ std::vector<std::unique_ptr<JumpInfo>> InfoExtractor::extract(const eckit::PathN

eckit::FileHandle fh(path);
std::vector<std::unique_ptr<JumpInfo>> infos;
infos.reserve(offsets.size());

for (size_t i = 0; i < offsets.size(); i++) {
fh.openForRead();
1 change: 1 addition & 0 deletions src/gribjump/remote/GribJumpUser.cc
Original file line number Diff line number Diff line change
@@ -98,6 +98,7 @@ void GribJumpUser::processRequest(eckit::Stream& s) {
RequestType request(s);
MetricsManager::instance().set("elapsed_receive", timer.elapsed());
timer.reset("Request received");
request.info();

request.execute();
MetricsManager::instance().set("elapsed_execute", timer.elapsed());
19 changes: 19 additions & 0 deletions src/gribjump/remote/Request.cc
Original file line number Diff line number Diff line change
@@ -66,6 +66,10 @@ void ScanRequest::replyToClient() {
client_ << nFields_;
}

void ScanRequest::info() const {
eckit::Log::status() << "New ScanRequest: nRequests=" << requests_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------


@@ -127,6 +131,10 @@ void ExtractRequest::replyToClient() {
LOG_DEBUG_LIB(LibGribJump) << "Sent " << nRequests << " results to client" << std::endl;
}

void ExtractRequest::info() const {
eckit::Log::status() << "New ExtractRequest: nRequests=" << requests_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

ForwardedExtractRequest::ForwardedExtractRequest(eckit::Stream& stream) : Request(stream) {
@@ -178,6 +186,9 @@ void ForwardedExtractRequest::replyToClient() {
}
}

void ForwardedExtractRequest::info() const {
eckit::Log::status() << "New ForwardedExtractRequest: nItems=" << items_.size() << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

@@ -210,6 +221,10 @@ void ForwardedScanRequest::execute() {
void ForwardedScanRequest::replyToClient() {
client_ << nfields_;
}

void ForwardedScanRequest::info() const {
eckit::Log::status() << "New ForwardedScanRequest: nfiles=" << scanmap_.size() << std::endl;
}
//----------------------------------------------------------------------------------------------------------------------

AxesRequest::AxesRequest(eckit::Stream& stream) : Request(stream) {
@@ -246,6 +261,10 @@ void AxesRequest::replyToClient() {
}
}

void AxesRequest::info() const {
eckit::Log::status() << "New AxesRequest: " << request_ << ", level=" << level_ << std::endl;
}

//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
Loading

0 comments on commit 9f299cd

Please sign in to comment.