From 651da6cb6e072a3020015f3817675b2e02ea4911 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Mon, 9 Sep 2024 20:15:09 +0100 Subject: [PATCH 1/7] Add LRU cache --- src/gribjump/CMakeLists.txt | 1 + src/gribjump/info/InfoCache.cc | 71 ++++++++++++------------ src/gribjump/info/InfoCache.h | 9 ++-- src/gribjump/info/LRUCache.h | 98 ++++++++++++++++++++++++++++++++++ tests/CMakeLists.txt | 9 ++++ tests/test_misc_units.cc | 68 +++++++++++++++++++++++ 6 files changed, 214 insertions(+), 42 deletions(-) create mode 100644 src/gribjump/info/LRUCache.h create mode 100644 tests/test_misc_units.cc diff --git a/src/gribjump/CMakeLists.txt b/src/gribjump/CMakeLists.txt index 35d6049..b7b0391 100644 --- a/src/gribjump/CMakeLists.txt +++ b/src/gribjump/CMakeLists.txt @@ -78,6 +78,7 @@ if( HAVE_GRIBJUMP_LOCAL_EXTRACT ) info/InfoAggregator.cc info/InfoCache.cc info/InfoCache.h + info/LRUCache.h info/UnsupportedInfo.h info/UnsupportedInfo.cc diff --git a/src/gribjump/info/InfoCache.cc b/src/gribjump/info/InfoCache.cc index 51f1c41..84d2d5b 100644 --- a/src/gribjump/info/InfoCache.cc +++ b/src/gribjump/info/InfoCache.cc @@ -39,7 +39,9 @@ InfoCache& InfoCache::instance() { InfoCache::~InfoCache() { } -InfoCache::InfoCache(): cacheDir_(eckit::PathName()) { +InfoCache::InfoCache(): + cacheDir_(eckit::PathName()), + cache_(eckit::Resource("gribjumpCacheSize", LibGribJump::instance().config().getInt("cache.size", 128))) { const Config& config = LibGribJump::instance().config(); @@ -78,18 +80,21 @@ eckit::PathName InfoCache::cacheFilePath(const eckit::PathName& path) const { return cacheDir_ / path.baseName() + file_ext; } -FileCache& InfoCache::getFileCache(const eckit::PathName& path, bool load) { +std::shared_ptr InfoCache::getFileCache(const eckit::PathName& path, bool load) { std::lock_guard lock(mutex_); + const filename_t& f = path.baseName(); - auto it = cache_.find(f); - if(it != cache_.end()) return *(it->second); - + + if (cache_.exists(f)) { + return cache_.get(f); + } + eckit::PathName cachePath = cacheFilePath(path); LOG_DEBUG_LIB(LibGribJump) << "New InfoCache entry for file " << f << " at " << cachePath << std::endl; - std::unique_ptr filecache(new FileCache(cachePath, load)); - cache_.insert(std::make_pair(f, std::move(filecache))); - return *cache_[f]; + auto filecache = std::make_shared(cachePath, load); + cache_.put(f, filecache); + return filecache; } std::shared_ptr InfoCache::get(const eckit::URI& uri) { @@ -102,12 +107,12 @@ std::shared_ptr InfoCache::get(const eckit::URI& uri) { std::shared_ptr InfoCache::get(const eckit::PathName& path, const eckit::Offset offset) { - FileCache& filecache = getFileCache(path); - filecache.load(); + std::shared_ptr filecache = getFileCache(path); + filecache->load(); // return it if in memory cache { - std::shared_ptr info = filecache.find(offset); + std::shared_ptr info = filecache->find(offset); if (info) return info; LOG_DEBUG_LIB(LibGribJump) << "InfoCache file " << path << " does not contain JumpInfo for field at offset " << offset << std::endl; @@ -119,20 +124,20 @@ std::shared_ptr InfoCache::get(const eckit::PathName& path, const ecki InfoExtractor extractor; std::shared_ptr info = extractor.extract(path, offset); - filecache.insert(offset, info); + filecache->insert(offset, info); return info; } std::vector> InfoCache::get(const eckit::PathName& path, const eckit::OffsetList& offsets) { - FileCache& filecache = getFileCache(path); - filecache.load(); + std::shared_ptr filecache = getFileCache(path); + filecache->load(); std::vector missingOffsets; for (const auto& offset : offsets) { - if (!filecache.find(offset)) { + if (!filecache->find(offset)) { missingOffsets.push_back(offset); } } @@ -144,14 +149,14 @@ std::vector> InfoCache::get(const eckit::PathName& pat std::vector> infos = extractor.extract(path, missingOffsets); for (size_t i = 0; i < infos.size(); i++) { - filecache.insert(missingOffsets[i], std::move(infos[i])); + filecache->insert(missingOffsets[i], std::move(infos[i])); } } std::vector> result; for (const auto& offset : offsets) { - std::shared_ptr info = filecache.find(offset); + std::shared_ptr info = filecache->find(offset); ASSERT(info); result.push_back(info); } @@ -167,17 +172,10 @@ std::vector> InfoCache::get(const eckit::PathName& pat void InfoCache::insert(const eckit::PathName& path, const eckit::Offset offset, std::shared_ptr info) { LOG_DEBUG_LIB(LibGribJump) << "GribJumpCache inserting " << path << ":" << offset << std::endl; - FileCache& filecache = getFileCache(path, false); - filecache.insert(offset, info); + std::shared_ptr filecache = getFileCache(path, false); + filecache->insert(offset, info); } - -// void InfoCache::insert(const eckit::PathName& path, std::vector> infos) { -// LOG_DEBUG_LIB(LibGribJump) << "GribJumpCache inserting " << path << "" << infos.size() << " fields" << std::endl; -// FileCache& filecache = getFileCache(path, false); -// filecache.insert(infos); -// } - void InfoCache::flush(bool append) { std::lock_guard lock(mutex_); for (auto& [filename, filecache] : cache_) { @@ -199,14 +197,14 @@ void InfoCache::scan(const eckit::PathName& fdbpath, const std::vector filecache = getFileCache(fdbpath); + filecache->load(); // Find which offsets are not already in file cache std::vector newOffsets; for (const auto& offset : offsets) { - if(!filecache.find(offset)) { + if(!filecache->find(offset)) { newOffsets.push_back(offset); } } @@ -223,34 +221,33 @@ void InfoCache::scan(const eckit::PathName& fdbpath, const std::vector> infos; // infos.reserve(uinfos.size()); // std::move(uinfos.begin(), uinfos.end(), std::back_inserter(infos)); - // filecache.insert(infos); + // filecache->insert(infos); for (size_t i = 0; i < uinfos.size(); i++) { - filecache.insert(newOffsets[i], std::move(uinfos[i])); + filecache->insert(newOffsets[i], std::move(uinfos[i])); } if (persistentCache_) { - filecache.write(); + filecache->write(); } } void InfoCache::scan(const eckit::PathName& fdbpath) { - LOG_DEBUG_LIB(LibGribJump) << "Scanning whole file " << fdbpath << std::endl; // if cache exists load so we can merge with memory cache - FileCache& filecache = getFileCache(fdbpath); - filecache.load(); + std::shared_ptr filecache = getFileCache(fdbpath); + filecache->load(); InfoExtractor extractor; std::vector>> uinfos = extractor.extract(fdbpath); /* This needs to give use the offsets too*/ for (size_t i = 0; i < uinfos.size(); i++) { - filecache.insert(uinfos[i].first, std::move(uinfos[i].second)); + filecache->insert(uinfos[i].first, std::move(uinfos[i].second)); } if (persistentCache_) { - filecache.write(); + filecache->write(); } } diff --git a/src/gribjump/info/InfoCache.h b/src/gribjump/info/InfoCache.h index 8ee7e50..4c7eae3 100644 --- a/src/gribjump/info/InfoCache.h +++ b/src/gribjump/info/InfoCache.h @@ -19,8 +19,8 @@ #include "eckit/filesystem/URI.h" #include "eckit/serialisation/FileStream.h" - #include "gribjump/info/JumpInfo.h" +#include "gribjump/info/LRUCache.h" #include "gribjump/LibGribJump.h" namespace gribjump { @@ -30,8 +30,8 @@ class InfoCache { private: // types - using filename_t = std::string; //< key is fieldlocation's path basename - using cache_t = std::map>; //< map fieldlocation's to gribinfo + using filename_t = std::string; //< key is fieldlocation's path basename + using cache_t = LRUCache>; //< map fieldlocation's to gribinfo public: @@ -61,14 +61,13 @@ class InfoCache { void print(std::ostream& s) const; - FileCache& getFileCache(const eckit::PathName& f, bool load=true); - private: // methods InfoCache(); ~InfoCache(); + std::shared_ptr getFileCache(const eckit::PathName& f, bool load=true); eckit::PathName cacheFilePath(const eckit::PathName& path) const; diff --git a/src/gribjump/info/LRUCache.h b/src/gribjump/info/LRUCache.h new file mode 100644 index 0000000..8054949 --- /dev/null +++ b/src/gribjump/info/LRUCache.h @@ -0,0 +1,98 @@ +/* + * (C) Copyright 2024- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @author Christopher Bradley + +#pragma once + +#include +#include +#include "eckit/exception/Exceptions.h" + +namespace gribjump { + +// Note: not thread safe, use an external lock if needed +template +class LRUCache { +public: + LRUCache(size_t capacity) : capacity_(capacity) {} + + void put(const K& key, const V& value) { + if (map_.find(key) == map_.end()) { + if (list_.size() == capacity_) { + auto last = list_.back(); + list_.pop_back(); + map_.erase(last); + } + list_.push_front(key); + } else { + list_.remove(key); + list_.push_front(key); + } + map_[key] = value; + } + + // Remember that put may receive a unique_ptr + void put(const K& key, V&& value) { + if (map_.find(key) == map_.end()) { + if (list_.size() == capacity_) { + auto last = list_.back(); + list_.pop_back(); + map_.erase(last); + } + list_.push_front(key); + } else { + list_.remove(key); + list_.push_front(key); + } + map_[key] = std::move(value); + } + + V& get(const K& key) { + if (map_.find(key) == map_.end()) { + throw eckit::BadValue("Key does not exist"); + } + list_.remove(key); + list_.push_front(key); + return map_[key]; + } + + + + bool exists(const K& key) { + return map_.find(key) != map_.end(); + } + + void print(std::ostream& s) { + for (auto& key : list_) { + s << key << " -> " << map_[key] << std::endl; + } + } + + typename std::unordered_map::const_iterator begin() const { + return map_.begin(); + } + + typename std::unordered_map::const_iterator end() const { + return map_.end(); + } + + void clear() { + list_.clear(); + map_.clear(); + } + +private: + size_t capacity_; + std::list list_; + std::unordered_map map_; +}; + +} // namespace gribjump diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7fd8cfa..6c55815 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -120,6 +120,15 @@ ecbuild_add_test( LIBS gribjump ) +ecbuild_add_test( + TARGET "gribjump_test_misc_units" + SOURCES "test_misc_units.cc" + INCLUDES "${ECKIT_INCLUDE_DIRS}" + ENVIRONMENT "${gribjump_env}" + NO_AS_NEEDED + LIBS gribjump +) + if (ENABLE_FDB_BUILD_TOOLS) add_subdirectory(tools) endif() \ No newline at end of file diff --git a/tests/test_misc_units.cc b/tests/test_misc_units.cc new file mode 100644 index 0000000..35cf804 --- /dev/null +++ b/tests/test_misc_units.cc @@ -0,0 +1,68 @@ +/* + * (C) Copyright 2024- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation + * nor does it submit to any jurisdiction. + */ + +#include +#include + +#include "eckit/testing/Test.h" +#include "gribjump/info/LRUCache.h" + + +#include "metkit/mars/MarsParser.h" +#include "metkit/mars/MarsExpension.h" +using namespace eckit::testing; + +namespace gribjump { +namespace test { + +// Miscellanous unit tests +//----------------------------------------------------------------------------- +CASE( "test_lru" ){ + + LRUCache cache(3); + + // Check basic functionality + cache.put("a", 1); + cache.put("b", 2); + cache.put("c", 3); + + EXPECT(cache.get("a") == 1); + EXPECT(cache.get("b") == 2); + EXPECT(cache.get("c") == 3); + + cache.put("d", 4); + + EXPECT_THROWS_AS(cache.get("a"), eckit::BadValue); + EXPECT(cache.get("d") == 4); + + // Check recency is updated with get + cache.put("x", 1); + cache.put("y", 2); + cache.put("z", 3); + + EXPECT(cache.get("z") == 3); + EXPECT(cache.get("y") == 2); + EXPECT(cache.get("x") == 1); + + // z should now be the least recently used + cache.put("w", 1); + + EXPECT_THROWS_AS(cache.get("z"), eckit::BadValue); +} +//----------------------------------------------------------------------------- + +} // namespace test +} // namespace gribjump + + +int main(int argc, char **argv) +{ + return run_tests ( argc, argv ); +} From be75c5d6265b22dea803641031e34b5a1bc53dbe Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Tue, 10 Sep 2024 15:19:16 +0100 Subject: [PATCH 2/7] Tidy up --- src/gribjump/info/InfoCache.cc | 2 +- src/gribjump/info/LRUCache.h | 25 +------------------------ 2 files changed, 2 insertions(+), 25 deletions(-) diff --git a/src/gribjump/info/InfoCache.cc b/src/gribjump/info/InfoCache.cc index 84d2d5b..f433899 100644 --- a/src/gribjump/info/InfoCache.cc +++ b/src/gribjump/info/InfoCache.cc @@ -41,7 +41,7 @@ InfoCache::~InfoCache() { InfoCache::InfoCache(): cacheDir_(eckit::PathName()), - cache_(eckit::Resource("gribjumpCacheSize", LibGribJump::instance().config().getInt("cache.size", 128))) { + cache_(eckit::Resource("gribjumpCacheSize", LibGribJump::instance().config().getInt("cache.size", 64))) { const Config& config = LibGribJump::instance().config(); diff --git a/src/gribjump/info/LRUCache.h b/src/gribjump/info/LRUCache.h index 8054949..b4d22b4 100644 --- a/src/gribjump/info/LRUCache.h +++ b/src/gribjump/info/LRUCache.h @@ -18,7 +18,7 @@ namespace gribjump { -// Note: not thread safe, use an external lock if needed +// Note: not a thread safe container, use an external lock if needed template class LRUCache { public: @@ -39,21 +39,6 @@ class LRUCache { map_[key] = value; } - // Remember that put may receive a unique_ptr - void put(const K& key, V&& value) { - if (map_.find(key) == map_.end()) { - if (list_.size() == capacity_) { - auto last = list_.back(); - list_.pop_back(); - map_.erase(last); - } - list_.push_front(key); - } else { - list_.remove(key); - list_.push_front(key); - } - map_[key] = std::move(value); - } V& get(const K& key) { if (map_.find(key) == map_.end()) { @@ -64,18 +49,10 @@ class LRUCache { return map_[key]; } - - bool exists(const K& key) { return map_.find(key) != map_.end(); } - void print(std::ostream& s) { - for (auto& key : list_) { - s << key << " -> " << map_[key] << std::endl; - } - } - typename std::unordered_map::const_iterator begin() const { return map_.begin(); } From b62b4b2f5149dbe83e3caa1bd78868dbb1d4623c Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Tue, 10 Sep 2024 19:48:46 +0100 Subject: [PATCH 3/7] Large tidy up and improve use of raw pointers --- src/gribjump/Bitmap.h | 15 --------- src/gribjump/CMakeLists.txt | 1 + src/gribjump/Engine.cc | 60 +++++++++++++++++++++------------- src/gribjump/Engine.h | 17 +++++----- src/gribjump/ExtractionData.h | 3 +- src/gribjump/ExtractionItem.h | 9 +---- src/gribjump/GribJump.cc | 8 ----- src/gribjump/GribJumpBase.h | 9 ++--- src/gribjump/Interval.h | 17 ---------- src/gribjump/Lister.cc | 5 ++- src/gribjump/Lister.h | 7 ++-- src/gribjump/LocalGribJump.cc | 17 ++-------- src/gribjump/LocalGribJump.h | 5 +-- src/gribjump/Task.cc | 7 ++-- src/gribjump/Task.h | 17 ++++++++-- src/gribjump/Types.h | 21 ++++++++++++ src/gribjump/remote/Request.cc | 15 +++++---- src/gribjump/remote/Request.h | 2 +- tests/data.cc | 3 +- tests/test_O1280.cc | 1 - tests/test_api.cc | 1 - tests/test_engine.cc | 13 ++++---- tests/test_gribjump.cc | 3 -- 23 files changed, 118 insertions(+), 138 deletions(-) delete mode 100644 src/gribjump/Bitmap.h delete mode 100644 src/gribjump/Interval.h create mode 100644 src/gribjump/Types.h diff --git a/src/gribjump/Bitmap.h b/src/gribjump/Bitmap.h deleted file mode 100644 index 4967252..0000000 --- a/src/gribjump/Bitmap.h +++ /dev/null @@ -1,15 +0,0 @@ -/* - * (C) Copyright 2023- ECMWF. - * - * This software is licensed under the terms of the Apache Licence Version 2.0 - * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. - * In applying this licence, ECMWF does not waive the privileges and immunities - * granted to it by virtue of its status as an intergovernmental organisation nor - * does it submit to any jurisdiction. - */ - -#pragma once - -#include - -using Bitmap = std::vector; diff --git a/src/gribjump/CMakeLists.txt b/src/gribjump/CMakeLists.txt index b7b0391..f48bafb 100644 --- a/src/gribjump/CMakeLists.txt +++ b/src/gribjump/CMakeLists.txt @@ -44,6 +44,7 @@ list( APPEND gribjump_srcs GribJumpException.h ExtractionData.cc ExtractionData.h + Types.h ) if( HAVE_GRIBJUMP_LOCAL_EXTRACT ) diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index 4f739db..70698a6 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -14,9 +14,7 @@ #include "metkit/mars/MarsExpension.h" - #include "gribjump/Engine.h" -#include "gribjump/Lister.h" #include "gribjump/ExtractionItem.h" #include "gribjump/remote/WorkQueue.h" #include "gribjump/jumper/JumperFactory.h" @@ -129,9 +127,8 @@ Engine::Engine() {} Engine::~Engine() {} -Results Engine::extract(const MarsRequests& requests, const RangesList& ranges, bool flatten) { - typedef std::map keyToExItem_t; - keyToExItem_t keyToExtractionItem; +ExItemMap Engine::buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten){ + ExItemMap keyToExtractionItem; eckit::Timer timer; @@ -151,42 +148,61 @@ Results Engine::extract(const MarsRequests& requests, const RangesList& ranges, LOG_DEBUG_LIB(LibGribJump) << "Built keyToExtractionItem" << std::endl; - const metkit::mars::MarsRequest req = unionRequest(requests); + return keyToExtractionItem; +} + +filemap_t Engine::buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem) { + // Map files to ExtractionItem + eckit::Timer timer; + const metkit::mars::MarsRequest req = unionRequest(requests); timer.reset("Gribjump Engine: Flattened requests and constructed union request"); - // Map files to ExtractionItem filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem); timer.reset("Gribjump Engine: Called fdb.list and constructed file map"); - size_t counter = 0; - for (auto& [fname, extractionItems] : filemap) { - if (isRemote(extractionItems[0]->URI())) { - taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems)); - } - else { - // Reaching here is an error on the databridge, as it means we think the file is local... - taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems)); + return filemap; +} + + + +ResultsMap Engine::extract(const MarsRequests& requests, const RangesList& ranges, bool flatten) { + + ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, ranges, flatten); // Owns the ExtractionItems + filemap_t filemap = buildFileMap(requests, keyToExtractionItem); + eckit::Timer timer; + + bool remoteExtraction = LibGribJump::instance().config().getBool("remoteExtraction", false); + if (remoteExtraction) { + NOTIMP; + } + else { + size_t counter = 0; + for (auto& [fname, extractionItems] : filemap) { + if (isRemote(extractionItems[0]->URI())) { + taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems)); + } + else { + // Reaching here is an error on the databridge, as it means we think the file is local... + taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems)); + } } } - timer.reset("Gribjump Engine: Enqueued " + std::to_string(filemap.size()) + " file tasks"); taskGroup_.waitForTasks(); - timer.reset("Gribjump Engine: All tasks finished"); - // Create map of base request to vector of extraction items - std::map> reqToExtractionItems; + // Create map of base request to vector of extraction items. Takes ownership of the ExtractionItems + ResultsMap results; for (auto& [key, ex] : keyToExtractionItem) { - reqToExtractionItems[ex->request()].push_back(ex); + results[ex->request()].push_back(std::move(ex)); } timer.reset("Gribjump Engine: Repackaged results"); - return reqToExtractionItems; - + return results; } size_t Engine::scan(const MarsRequests& requests, bool byfiles) { diff --git a/src/gribjump/Engine.h b/src/gribjump/Engine.h index 3b7eb8a..2cea651 100644 --- a/src/gribjump/Engine.h +++ b/src/gribjump/Engine.h @@ -16,24 +16,18 @@ #include "metkit/mars/MarsRequest.h" #include "gribjump/ExtractionItem.h" #include "gribjump/Task.h" +#include "gribjump/Lister.h" +#include "gribjump/Types.h" namespace gribjump { -typedef std::vector MarsRequests; -typedef std::pair Range; -typedef std::vector> RangesList; - -// typedef std::vector Results; -typedef std::map> Results; - - class Engine { public: Engine(); ~Engine(); - Results extract(const MarsRequests& requests, const RangesList& ranges, bool flattenRequests = false); + ResultsMap extract(const MarsRequests& requests, const RangesList& ranges, bool flattenRequests = false); // byfiles: scan entire file, not just fields matching request size_t scan(const MarsRequests& requests, bool byfiles = false); @@ -42,6 +36,11 @@ class Engine { void reportErrors(eckit::Stream& client_); +private: + + filemap_t buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem); + ExItemMap buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten); + private: TaskGroup taskGroup_; diff --git a/src/gribjump/ExtractionData.h b/src/gribjump/ExtractionData.h index 79d3fac..98e4b6f 100644 --- a/src/gribjump/ExtractionData.h +++ b/src/gribjump/ExtractionData.h @@ -18,8 +18,7 @@ #include "eckit/serialisation/Stream.h" #include "metkit/mars/MarsRequest.h" - -using Range = std::pair; +#include "gribjump/Types.h" namespace gribjump { diff --git a/src/gribjump/ExtractionItem.h b/src/gribjump/ExtractionItem.h index 18292a8..80d9805 100644 --- a/src/gribjump/ExtractionItem.h +++ b/src/gribjump/ExtractionItem.h @@ -13,22 +13,15 @@ #pragma once #include - #include "eckit/filesystem/URI.h" #include "metkit/mars/MarsRequest.h" #include "gribjump/LibGribJump.h" - +#include "gribjump/Types.h" namespace gribjump { -using Ranges = std::vector>; -using ExValues = std::vector>; -using ExMask = std::vector>>; // An object for grouping request, uri and result information together. -// Note, this is a one to one mapping between request and result. -// i.e. the request is assumed to be of cardinality 1. /// No it isn't! It's the base request - class ExtractionItem : public eckit::NonCopyable { public: diff --git a/src/gribjump/GribJump.cc b/src/gribjump/GribJump.cc index 609a117..c70f263 100644 --- a/src/gribjump/GribJump.cc +++ b/src/gribjump/GribJump.cc @@ -57,12 +57,4 @@ void GribJump::stats() { impl_->stats(); } -void GribJump::aggregate(const fdb5::Key& key, const eckit::URI& location) { - impl_->aggregate(key, location); -} - -void GribJump::aggregate(const fdb5::Key& key, const eckit::message::Message& msg) { - impl_->aggregate(key, msg); -} - } // namespace gribjump diff --git a/src/gribjump/GribJumpBase.h b/src/gribjump/GribJumpBase.h index 1d6bc00..9d77bd0 100644 --- a/src/gribjump/GribJumpBase.h +++ b/src/gribjump/GribJumpBase.h @@ -24,6 +24,7 @@ #include "gribjump/Config.h" #include "gribjump/Stats.h" #include "gribjump/LibGribJump.h" +#include "gribjump/Types.h" namespace fdb5 { class Key; @@ -32,7 +33,8 @@ namespace fdb5 { namespace gribjump { -typedef std::pair Range; +using ResultsMap = std::map>>; + class GribJumpBase : public eckit::NonCopyable { public: @@ -47,14 +49,9 @@ class GribJumpBase : public eckit::NonCopyable { virtual std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) = 0; virtual std::map> axes(const std::string& request) = 0; - virtual void stats(); - // Note: Only implemented if FDB is enabled - virtual void aggregate(const fdb5::Key& key, const eckit::URI& location) {NOTIMP;} - virtual void aggregate(const fdb5::Key& key, const eckit::message::Message& msg) {NOTIMP;} - protected: // members Stats stats_; diff --git a/src/gribjump/Interval.h b/src/gribjump/Interval.h deleted file mode 100644 index f5310a7..0000000 --- a/src/gribjump/Interval.h +++ /dev/null @@ -1,17 +0,0 @@ -/* - * (C) Copyright 2023- ECMWF. - * - * This software is licensed under the terms of the Apache Licence Version 2.0 - * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. - * In applying this licence, ECMWF does not waive the privileges and immunities - * granted to it by virtue of its status as an intergovernmental organisation nor - * does it submit to any jurisdiction. - */ - -#pragma once - -#include -#include - -// NOTE: Ranges are treated as half-open intervals [start, end) -using Interval = std::pair; diff --git a/src/gribjump/Lister.cc b/src/gribjump/Lister.cc index 76c2c29..10e9e58 100644 --- a/src/gribjump/Lister.cc +++ b/src/gribjump/Lister.cc @@ -72,8 +72,7 @@ std::string fdbkeyToStr(const fdb5::Key& key) { } // i.e. do all of the listing work I want... -filemap_t FDBLister::fileMap(const metkit::mars::MarsRequest& unionRequest, const reqToXRR_t& reqToExtractionItem) { - +filemap_t FDBLister::fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToExtractionItem) { eckit::AutoLock lock(this); filemap_t filemap; @@ -93,7 +92,7 @@ filemap_t FDBLister::fileMap(const metkit::mars::MarsRequest& unionRequest, cons // Set the URI in the ExtractionItem eckit::URI uri = elem.location().fullUri(); - ExtractionItem* extractionItem = reqToExtractionItem.at(key); + ExtractionItem* extractionItem = reqToExtractionItem.at(key).get(); extractionItem->URI(uri); // Add to filemap diff --git a/src/gribjump/Lister.h b/src/gribjump/Lister.h index b527672..a8bf343 100644 --- a/src/gribjump/Lister.h +++ b/src/gribjump/Lister.h @@ -48,9 +48,8 @@ class Lister { }; // ------------------------------------------------------------------ -using reqToXRR_t = std::map; -// We explicitly want this map to be randomly sorted. -using filemap_t = std::unordered_map>; // String is filepath, eckit::PathName is not hashable? +// filemap holds non-owning pointers to ExtractionItems +using filemap_t = std::map; class FDBLister : public Lister { public: @@ -61,7 +60,7 @@ class FDBLister : public Lister { virtual std::map > axes(const std::string& request) override; virtual std::map > axes(const fdb5::FDBToolRequest& request); - filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const reqToXRR_t& reqToXRR); // Used during extraction + filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToXRR); // Used during extraction std::map< eckit::PathName, eckit::OffsetList > filesOffsets(std::vector requests); // Used during scan diff --git a/src/gribjump/LocalGribJump.cc b/src/gribjump/LocalGribJump.cc index 64c79ea..d05ce86 100644 --- a/src/gribjump/LocalGribJump.cc +++ b/src/gribjump/LocalGribJump.cc @@ -99,7 +99,7 @@ std::vector> LocalGribJump::extract(std::vector> results = extract(requests, ranges, flatten); + ResultsMap results = extract(requests, ranges, flatten); std::vector> extractionResults; for (auto& req : polyRequest) { @@ -117,10 +117,9 @@ std::vector> LocalGribJump::extract(std::vector> LocalGribJump::extract(const std::vector& requests, const std::vector>& ranges, bool flatten) { +ResultsMap LocalGribJump::extract(const std::vector& requests, const std::vector>& ranges, bool flatten) { Engine engine; - std::map> results = engine.extract(requests, ranges, flatten); - return results; + return engine.extract(requests, ranges, flatten); } std::map> LocalGribJump::axes(const std::string& request) { @@ -132,16 +131,6 @@ std::map> LocalGribJump::axes(const return engine.axes(request); } -// TODO: remove these, plugin should use aggregator directly (which has its own config). -void LocalGribJump::aggregate(const fdb5::Key& key, const eckit::URI& location) { - NOTIMP; -}; - -void LocalGribJump::aggregate(const fdb5::Key& key, const eckit::message::Message& msg) { - NOTIMP; -}; - - static GribJumpBuilder builder("local"); } // namespace gribjump diff --git a/src/gribjump/LocalGribJump.h b/src/gribjump/LocalGribJump.h index f5b9ef6..bb0172a 100644 --- a/src/gribjump/LocalGribJump.h +++ b/src/gribjump/LocalGribJump.h @@ -35,10 +35,7 @@ class LocalGribJump : public GribJumpBase { size_t scan(const std::vector requests, bool byfiles) override; // new API! - std::map> extract(const std::vector& requests, const std::vector>& ranges, bool flatten); - - void aggregate(const fdb5::Key& key, const eckit::URI& location) override; - void aggregate(const fdb5::Key& key, const eckit::message::Message& msg) override; + ResultsMap extract(const std::vector& requests, const std::vector>& ranges, bool flatten); // old API std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) override; diff --git a/src/gribjump/Task.cc b/src/gribjump/Task.cc index 05c53ff..5b61aa2 100644 --- a/src/gribjump/Task.cc +++ b/src/gribjump/Task.cc @@ -117,7 +117,7 @@ void TaskGroup::reportErrors(eckit::Stream& client_) { //---------------------------------------------------------------------------------------------------------------------- -FileExtractionTask::FileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, std::vector& extractionItems) : +FileExtractionTask::FileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems) : Task(taskgroup, id), fname_(fname), extractionItems_(extractionItems) { @@ -165,7 +165,10 @@ void FileExtractionTask::extract() { } //---------------------------------------------------------------------------------------------------------------------- -InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, std::vector& extractionItems): + + +//---------------------------------------------------------------------------------------------------------------------- +InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems): FileExtractionTask(taskgroup, id, fname, extractionItems) { } diff --git a/src/gribjump/Task.h b/src/gribjump/Task.h index 9a86d97..c9308c3 100644 --- a/src/gribjump/Task.h +++ b/src/gribjump/Task.h @@ -106,7 +106,7 @@ class FileExtractionTask : public Task { // Each extraction item is assumed to be for the same file. - FileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, std::vector& extractionItems); + FileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems); void execute() override; @@ -114,7 +114,7 @@ class FileExtractionTask : public Task { protected: eckit::PathName fname_; - std::vector& extractionItems_; + ExtractionItems& extractionItems_; }; //---------------------------------------------------------------------------------------------------------------------- @@ -125,7 +125,18 @@ class FileExtractionTask : public Task { class InefficientFileExtractionTask : public FileExtractionTask { public: - InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, std::vector& extractionItems); + InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems); + + void extract() override; + +}; + +//---------------------------------------------------------------------------------------------------------------------- +// Task that forwards the work to a remote server, based on the URI of the extraction item. +class RemoteFileExtractionTask : public FileExtractionTask { +public: + + RemoteFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems); void extract() override; diff --git a/src/gribjump/Types.h b/src/gribjump/Types.h new file mode 100644 index 0000000..cd916e5 --- /dev/null +++ b/src/gribjump/Types.h @@ -0,0 +1,21 @@ +// types.h +#pragma once + +namespace gribjump { + +class ExtractionItem; + +using MarsRequests = std::vector; +using Range = std::pair; +using Interval = std::pair; +using RangesList = std::vector>; +using Bitmap = std::vector; + +using Ranges = std::vector; +using ExValues = std::vector>; +using ExMask = std::vector>>; + +using ExtractionItems = std::vector; // Non-owning pointers +using ExItemMap = std::map>; + +} // namespace gribjump diff --git a/src/gribjump/remote/Request.cc b/src/gribjump/remote/Request.cc index cbea684..890083b 100644 --- a/src/gribjump/remote/Request.cc +++ b/src/gribjump/remote/Request.cc @@ -89,14 +89,14 @@ void ExtractRequest::execute() { results_ = engine_.extract(marsRequests_, ranges_, flatten_); - // tmp, debug - for (auto& pair : results_) { - LOG_DEBUG_LIB(LibGribJump) << pair.first << ": "; - for (auto& item : pair.second) { - item->debug_print(); + if (LibGribJump::instance().debug()) { + for (auto& pair : results_) { + LOG_DEBUG_LIB(LibGribJump) << pair.first << ": "; + for (auto& item : pair.second) { + item->debug_print(); + } } } - } void ExtractRequest::replyToClient() { @@ -113,7 +113,8 @@ void ExtractRequest::replyToClient() { auto it = results_.find(marsRequests_[i]); ASSERT(it != results_.end()); - std::vector items = it->second; + std::vector>& items = it->second; + // ExtractionItems items = it->second; size_t nfields = items.size(); client_ << nfields; for (size_t i = 0; i < nfields; i++) { diff --git a/src/gribjump/remote/Request.h b/src/gribjump/remote/Request.h index 1a0b78d..9c158cd 100644 --- a/src/gribjump/remote/Request.h +++ b/src/gribjump/remote/Request.h @@ -91,7 +91,7 @@ class ExtractRequest : public Request { std::vector marsRequests_; bool flatten_; - std::map> results_; + ResultsMap results_; }; diff --git a/tests/data.cc b/tests/data.cc index 9f83f5e..71449c5 100644 --- a/tests/data.cc +++ b/tests/data.cc @@ -15,8 +15,7 @@ #include "eckit/testing/Test.h" -#include "gribjump/Interval.h" -#include "gribjump/Bitmap.h" +#include "gribjump/Types.h" #include "gribjump/tools/EccodesExtract.h" diff --git a/tests/test_O1280.cc b/tests/test_O1280.cc index 570a0bc..88411ab 100644 --- a/tests/test_O1280.cc +++ b/tests/test_O1280.cc @@ -26,7 +26,6 @@ using namespace eckit::testing; namespace gribjump { namespace test { -using Range = std::pair; const size_t expectedNumberOfValues = 6599680; // O1280 const std::vector o1280_offsets = { diff --git a/tests/test_api.cc b/tests/test_api.cc index ed11826..c776fdb 100644 --- a/tests/test_api.cc +++ b/tests/test_api.cc @@ -113,7 +113,6 @@ CASE( "test_gribjump_api_extract" ) { requests = expand.expand(parsedRequests); } - using Interval = std::pair; std::vector> allIntervals = { { diff --git a/tests/test_engine.cc b/tests/test_engine.cc index be9d069..f233a64 100644 --- a/tests/test_engine.cc +++ b/tests/test_engine.cc @@ -39,7 +39,6 @@ namespace test { //----------------------------------------------------------------------------- -using Interval = std::pair; size_t expectedCount(std::vector> allIntervals){ // count the number of values expected given the intervals @@ -105,7 +104,7 @@ CASE ("test_engine_basic") { }; Engine engine; - std::map> results = engine.extract(requests, allIntervals, false); + ResultsMap results = engine.extract(requests, allIntervals, false); // print contents of map @@ -121,7 +120,9 @@ CASE ("test_engine_basic") { for (size_t i = 0; i < 3; i++) { metkit::mars::MarsRequest req = requests[i]; std::vector intervals = allIntervals[i]; - auto exs = results[req]; + // auto exs = results[req]; + // just get a reference to results[req] to avoid copying + auto& exs = results[req]; auto comparisonValues = eccodesExtract(req, intervals); for (size_t j = 0; j < exs.size(); j++) { for (size_t k = 0; k < comparisonValues[j].size(); k++) { @@ -142,8 +143,8 @@ CASE ("test_engine_basic") { EXPECT(count == 45); // Check missing data has no values. - ExtractionItem* ex = results[requests[3]][0]; - EXPECT(ex->values().size() == 0); + ExtractionItem& ex = *results[requests[3]][0]; + EXPECT(ex.values().size() == 0); // --- Extract (test 2) @@ -180,7 +181,7 @@ CASE ("test_engine_basic") { // compare results metkit::mars::MarsRequest req = requests[0]; - auto exs = results[req]; + auto& exs = results[req]; auto comparisonValues = eccodesExtract(req, allIntervals[0])[0]; // [0] Because each archived field has identical values. count = 0; for (size_t j = 0; j < exs.size(); j++) { diff --git a/tests/test_gribjump.cc b/tests/test_gribjump.cc index 39e9a64..f01073f 100644 --- a/tests/test_gribjump.cc +++ b/tests/test_gribjump.cc @@ -24,9 +24,6 @@ namespace test { //----------------------------------------------------------------------------- - -using Range = std::pair; - void doTest(int i, JumpInfo gribInfo, JumpHandle &dataSource){ EXPECT(gribInfo.ready()); size_t numberOfDataPoints = gribInfo.getNumberOfDataPoints(); From 9b0d930bf0ca5b07efdf4f497d991fc59f195e16 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Wed, 11 Sep 2024 11:04:02 +0100 Subject: [PATCH 4/7] Update header --- src/gribjump/Types.h | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/gribjump/Types.h b/src/gribjump/Types.h index cd916e5..0384b28 100644 --- a/src/gribjump/Types.h +++ b/src/gribjump/Types.h @@ -1,6 +1,20 @@ -// types.h +/* + * (C) Copyright 2023- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + #pragma once +#include +#include +#include +#include + namespace gribjump { class ExtractionItem; From 57d567eaf98eb91b4cdad1af41a2fc6a28364403 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Fri, 13 Sep 2024 16:00:26 +0100 Subject: [PATCH 5/7] Set server homeenv to GRIBJUMP_HOME --- src/tools/gribjump-server.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tools/gribjump-server.cc b/src/tools/gribjump-server.cc index a57c311..2edbdc5 100644 --- a/src/tools/gribjump-server.cc +++ b/src/tools/gribjump-server.cc @@ -37,7 +37,7 @@ namespace gribjump { class GribJumpServerApp : public BaseApp, public GribJumpServer { public: GribJumpServerApp(int argc, char** argv) : - BaseApp(argc, argv), + BaseApp(argc, argv, "GRIBJUMP_HOME"), GribJumpServer(eckit::net::Port( // gribjumpServerPort "gribjumpServer", eckit::Resource("$GRIBJUMP_SERVER_PORT", LibGribJump::instance().config().getInt("server.port", 9777) ))) From 7df1486f8c8630c8b6e6bba17da526f8e23668b7 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Fri, 13 Sep 2024 17:09:55 +0100 Subject: [PATCH 6/7] Name timers --- src/gribjump/Engine.cc | 15 ++++++--------- src/gribjump/remote/GribJumpUser.cc | 2 +- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index 70698a6..734ecaa 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -130,8 +130,6 @@ Engine::~Engine() {} ExItemMap Engine::buildKeyToExtractionItem(const MarsRequests& requests, const RangesList& ranges, bool flatten){ ExItemMap keyToExtractionItem; - eckit::Timer timer; - flattenedKeys_t flatKeys = buildFlatKeys(requests, flatten); // Map from base request to {flattened keys} LOG_DEBUG_LIB(LibGribJump) << "Built flat keys" << std::endl; @@ -146,20 +144,15 @@ ExItemMap Engine::buildKeyToExtractionItem(const MarsRequests& requests, const R } } - LOG_DEBUG_LIB(LibGribJump) << "Built keyToExtractionItem" << std::endl; - return keyToExtractionItem; } filemap_t Engine::buildFileMap(const MarsRequests& requests, ExItemMap& keyToExtractionItem) { // Map files to ExtractionItem - eckit::Timer timer; const metkit::mars::MarsRequest req = unionRequest(requests); - timer.reset("Gribjump Engine: Flattened requests and constructed union request"); filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem); - timer.reset("Gribjump Engine: Called fdb.list and constructed file map"); return filemap; } @@ -168,9 +161,13 @@ filemap_t Engine::buildFileMap(const MarsRequests& requests, ExItemMap& keyToExt ResultsMap Engine::extract(const MarsRequests& requests, const RangesList& ranges, bool flatten) { + eckit::Timer timer("Gribjump Engine: extract"); + ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, ranges, flatten); // Owns the ExtractionItems + timer.reset("Gribjump Engine: Key to ExtractionItem map built"); + filemap_t filemap = buildFileMap(requests, keyToExtractionItem); - eckit::Timer timer; + timer.reset("Gribjump Engine: File map built"); bool remoteExtraction = LibGribJump::instance().config().getBool("remoteExtraction", false); if (remoteExtraction) { @@ -188,7 +185,7 @@ ResultsMap Engine::extract(const MarsRequests& requests, const RangesList& range } } } - + timer.reset("Gribjump Engine: All tasks enqueued"); taskGroup_.waitForTasks(); timer.reset("Gribjump Engine: All tasks finished"); diff --git a/src/gribjump/remote/GribJumpUser.cc b/src/gribjump/remote/GribJumpUser.cc index 79947e7..60d34bb 100644 --- a/src/gribjump/remote/GribJumpUser.cc +++ b/src/gribjump/remote/GribJumpUser.cc @@ -33,7 +33,7 @@ void GribJumpUser::serve(eckit::Stream& s, std::istream& in, std::ostream& out) eckit::Log::info() << "Serving new connection" << std::endl; try { - eckit::Timer timer; + eckit::Timer timer("Connection served"); handle_client(s, timer); } catch (std::exception& e) { From 6e2beb9092985ae937c36763e2e0812369646900 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Fri, 13 Sep 2024 17:11:34 +0100 Subject: [PATCH 7/7] Bump version --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 4b9fcbe..cb0c939 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.5.1 +0.5.2