From 5974d926e682624302eee86bac8a0ef0b1a8d9e3 Mon Sep 17 00:00:00 2001 From: Emanuele Danovaro Date: Wed, 18 Dec 2024 09:41:22 +0000 Subject: [PATCH] addressed PR comments (mutex on flush while evicting a catalogue from cache + Client handle simplified) --- src/fdb5/api/RemoteFDB.cc | 4 +- src/fdb5/api/RemoteFDB.h | 4 +- src/fdb5/database/Archiver.cc | 58 ++++++++++++---------- src/fdb5/database/Archiver.h | 1 + src/fdb5/database/EntryVisitMechanism.cc | 13 ++--- src/fdb5/database/EntryVisitMechanism.h | 2 +- src/fdb5/remote/client/Client.h | 4 +- src/fdb5/remote/client/ClientConnection.cc | 8 +-- src/fdb5/remote/client/RemoteCatalogue.cc | 7 +-- src/fdb5/remote/client/RemoteCatalogue.h | 6 +-- src/fdb5/remote/client/RemoteStore.cc | 4 +- src/fdb5/remote/client/RemoteStore.h | 4 +- 12 files changed, 58 insertions(+), 57 deletions(-) diff --git a/src/fdb5/api/RemoteFDB.cc b/src/fdb5/api/RemoteFDB.cc index d52735005..c2401657f 100644 --- a/src/fdb5/api/RemoteFDB.cc +++ b/src/fdb5/api/RemoteFDB.cc @@ -246,7 +246,7 @@ void RemoteFDB::print(std::ostream& s) const { } // Client -bool RemoteFDB::handle(remote::Message message, bool control, uint32_t requestID) { +bool RemoteFDB::handle(remote::Message message, uint32_t requestID) { switch (message) { case fdb5::remote::Message::Complete: { @@ -279,7 +279,7 @@ bool RemoteFDB::handle(remote::Message message, bool control, uint32_t requestID return false; } } -bool RemoteFDB::handle(remote::Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) { +bool RemoteFDB::handle(remote::Message message, uint32_t requestID, eckit::Buffer&& payload) { switch (message) { case fdb5::remote::Message::Blob: { diff --git a/src/fdb5/api/RemoteFDB.h b/src/fdb5/api/RemoteFDB.h index 2ada336ca..7524896bc 100644 --- a/src/fdb5/api/RemoteFDB.h +++ b/src/fdb5/api/RemoteFDB.h @@ -77,8 +77,8 @@ class RemoteFDB : public LocalFDB, public remote::Client { FDBStats stats() const override { NOTIMP; } // Client - bool handle(remote::Message message, bool control, uint32_t requestID) override; - bool handle(remote::Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) override; + bool handle(remote::Message message, uint32_t requestID) override; + bool handle(remote::Message message, uint32_t requestID, eckit::Buffer&& payload) override; private: // members diff --git a/src/fdb5/database/Archiver.cc b/src/fdb5/database/Archiver.cc index e77411b38..7b49b9cb4 100644 --- a/src/fdb5/database/Archiver.cc +++ b/src/fdb5/database/Archiver.cc @@ -77,37 +77,43 @@ void Archiver::selectDatabase(const Key& dbKey) { static size_t fdbMaxNbDBsOpen = eckit::Resource("fdbMaxNbDBsOpen", 64); - if (databases_.size() >= fdbMaxNbDBsOpen) { - bool found = false; - time_t oldest = ::time(0) + 24 * 60 * 60; - Key oldK; - for (auto i = databases_.begin(); i != databases_.end(); ++i) { - if (i->second.time_ <= oldest) { - found = true; - oldK = i->first; - oldest = i->second.time_; + { + std::lock_guard cacheLock(cacheMutex_); + if (databases_.size() >= fdbMaxNbDBsOpen) { + bool found = false; + time_t oldest = ::time(0) + 24 * 60 * 60; + Key oldK; + for (auto i = databases_.begin(); i != databases_.end(); ++i) { + if (i->second.time_ <= oldest) { + found = true; + oldK = i->first; + oldest = i->second.time_; + } + } + if (found) { + // flushing before evicting from cache + std::lock_guard lock(flushMutex_); + + databases_[oldK].catalogue_->flush(databases_[oldK].store_->flush()); + + eckit::Log::info() << "Closing database " << *databases_[oldK].catalogue_ << std::endl; + databases_.erase(oldK); } } - if (found) { - databases_[oldK].catalogue_->flush(databases_[oldK].store_->flush()); - - eckit::Log::info() << "Closing database " << *databases_[oldK].catalogue_ << std::endl; - databases_.erase(oldK); - } - } - std::unique_ptr cat = CatalogueWriterFactory::instance().build(dbKey, dbConfig_); - ASSERT(cat); + std::unique_ptr cat = CatalogueWriterFactory::instance().build(dbKey, dbConfig_); + ASSERT(cat); - // If this database is locked for writing then this is an error - if (!cat->enabled(ControlIdentifier::Archive)) { - std::ostringstream ss; - ss << "Database " << *cat << " matched for archived is LOCKED against archiving"; - throw eckit::UserError(ss.str(), Here()); - } + // If this database is locked for writing then this is an error + if (!cat->enabled(ControlIdentifier::Archive)) { + std::ostringstream ss; + ss << "Database " << *cat << " matched for archived is LOCKED against archiving"; + throw eckit::UserError(ss.str(), Here()); + } - std::unique_ptr str = cat->buildStore(); - db_ = &(databases_[dbKey] = Database{::time(0), std::move(cat), std::move(str)}); + std::unique_ptr str = cat->buildStore(); + db_ = &(databases_[dbKey] = Database{::time(0), std::move(cat), std::move(str)}); + } } void Archiver::print(std::ostream& out) const { diff --git a/src/fdb5/database/Archiver.h b/src/fdb5/database/Archiver.h index 798986776..e2e525ac6 100644 --- a/src/fdb5/database/Archiver.h +++ b/src/fdb5/database/Archiver.h @@ -81,6 +81,7 @@ class Archiver : public eckit::NonCopyable { Database* db_; std::mutex flushMutex_; + std::mutex cacheMutex_; const ArchiveCallback& callback_; }; diff --git a/src/fdb5/database/EntryVisitMechanism.cc b/src/fdb5/database/EntryVisitMechanism.cc index d174eff78..d022dd509 100644 --- a/src/fdb5/database/EntryVisitMechanism.cc +++ b/src/fdb5/database/EntryVisitMechanism.cc @@ -33,16 +33,14 @@ class FDBVisitException : public eckit::Exception { //---------------------------------------------------------------------------------------------------------------------- -EntryVisitor::EntryVisitor() : currentCatalogue_(nullptr), currentStore_(nullptr), currentIndex_(nullptr) {} +EntryVisitor::~EntryVisitor() {} -EntryVisitor::~EntryVisitor() { - delete currentStore_; -} +EntryVisitor::EntryVisitor() : currentCatalogue_(nullptr), currentStore_(nullptr), currentIndex_(nullptr) {} Store& EntryVisitor::store() const { if (!currentStore_) { ASSERT(currentCatalogue_); - currentStore_ = currentCatalogue_->buildStore().release(); + currentStore_ = currentCatalogue_->buildStore(); ASSERT(currentStore_); } return *currentStore_; @@ -50,7 +48,7 @@ Store& EntryVisitor::store() const { bool EntryVisitor::visitDatabase(const Catalogue& catalogue) { currentCatalogue_ = &catalogue; - currentStore_ = nullptr; + currentStore_.reset(); currentIndex_ = nullptr; rule_ = nullptr; return true; @@ -61,8 +59,7 @@ void EntryVisitor::catalogueComplete(const Catalogue& catalogue) { ASSERT(currentCatalogue_ == &catalogue); } currentCatalogue_ = nullptr; - delete currentStore_; - currentStore_ = nullptr; + currentStore_.reset(); currentIndex_ = nullptr; rule_ = nullptr; } diff --git a/src/fdb5/database/EntryVisitMechanism.h b/src/fdb5/database/EntryVisitMechanism.h index 92139c8ca..512a255c4 100644 --- a/src/fdb5/database/EntryVisitMechanism.h +++ b/src/fdb5/database/EntryVisitMechanism.h @@ -62,7 +62,7 @@ class EntryVisitor : public eckit::NonCopyable { /// Non-owning const Catalogue* currentCatalogue_ = nullptr; /// Owned store - mutable Store* currentStore_ = nullptr; + mutable std::unique_ptr currentStore_ = nullptr; /// Non-owning const Index* currentIndex_ = nullptr; /// Non-owning diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 9e58e5de3..689e42607 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -50,8 +50,8 @@ class Client : eckit::NonCopyable { void dataWrite(remote::Message msg, uint32_t requestID, std::vector> data={}); // handlers for incoming messages - to be defined in the client class - virtual bool handle(Message message, bool control, uint32_t requestID) = 0; - virtual bool handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) = 0; + virtual bool handle(Message message, uint32_t requestID) = 0; + virtual bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) = 0; protected: diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 7fbbd4c20..f85707156 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -360,10 +360,10 @@ void ClientConnection::listeningControlThreadLoop() { } if (hdr.payloadSize == 0) { - handled = client->handle(hdr.message, hdr.control(), hdr.requestID); + handled = client->handle(hdr.message, hdr.requestID); } else { - handled = client->handle(hdr.message, hdr.control(), hdr.requestID, std::move(payload)); + handled = client->handle(hdr.message, hdr.requestID, std::move(payload)); } } @@ -425,10 +425,10 @@ void ClientConnection::listeningDataThreadLoop() { ASSERT(client); ASSERT(!hdr.control()); if (hdr.payloadSize == 0) { - handled = client->handle(hdr.message, hdr.control(), hdr.requestID); + handled = client->handle(hdr.message, hdr.requestID); } else { - handled = client->handle(hdr.message, hdr.control(), hdr.requestID, std::move(payload)); + handled = client->handle(hdr.message, hdr.requestID, std::move(payload)); } if (!handled) { diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index f7ac42924..e0a93d854 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -35,8 +35,6 @@ RemoteCatalogue::RemoteCatalogue(const eckit::URI& uri, const Config& config): NOTIMP; } -RemoteCatalogue::~RemoteCatalogue() {} - void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::shared_ptr fieldLocation) { ASSERT(!datumKey.empty()); @@ -137,12 +135,11 @@ void RemoteCatalogue::loadSchema() { } } -bool RemoteCatalogue::handle(Message message, bool control, uint32_t requestID) { +bool RemoteCatalogue::handle(Message message, uint32_t requestID) { Log::warning() << *this << " - Received [message=" << ((uint) message) << ",requestID=" << requestID << "]" << std::endl; - NOTIMP; return false; } -bool RemoteCatalogue::handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) { +bool RemoteCatalogue::handle(Message message, uint32_t requestID, eckit::Buffer&& payload) { LOG_DEBUG_LIB(LibFdb5) << *this << " - Received [message=" << ((uint) message) << ",requestID=" << requestID << ",payloadSize=" << payload.size() << "]" << std::endl; return false; } diff --git a/src/fdb5/remote/client/RemoteCatalogue.h b/src/fdb5/remote/client/RemoteCatalogue.h index d7a93db29..1d10b9716 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.h +++ b/src/fdb5/remote/client/RemoteCatalogue.h @@ -19,7 +19,7 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C RemoteCatalogue(const Key& key, const Config& config); RemoteCatalogue(const eckit::URI& uri, const Config& config); - ~RemoteCatalogue() override; + ~RemoteCatalogue() override = default; // From CatalogueWriter const Index& currentIndex() override; @@ -66,8 +66,8 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C private: // From Client // handlers for incoming messages - to be defined in the client class - bool handle(Message message, bool control, uint32_t requestID) override; - bool handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) override; + bool handle(Message message, uint32_t requestID) override; + bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) override; protected: diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index 2c0f4d285..d1951f77b 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -305,7 +305,7 @@ void RemoteStore::print(std::ostream &out) const { out << "RemoteStore(host=" << controlEndpoint() << ")"; } -bool RemoteStore::handle(Message message, bool control, uint32_t requestID) { +bool RemoteStore::handle(Message message, uint32_t requestID) { switch (message) { case Message::Complete: { @@ -346,7 +346,7 @@ bool RemoteStore::handle(Message message, bool control, uint32_t requestID) { return false; } } -bool RemoteStore::handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) { +bool RemoteStore::handle(Message message, uint32_t requestID, eckit::Buffer&& payload) { switch (message) { diff --git a/src/fdb5/remote/client/RemoteStore.h b/src/fdb5/remote/client/RemoteStore.h index 8bad69941..3446d318e 100644 --- a/src/fdb5/remote/client/RemoteStore.h +++ b/src/fdb5/remote/client/RemoteStore.h @@ -151,8 +151,8 @@ class RemoteStore : public Store, public Client { private: // methods // handlers for incoming messages - to be defined in the client class - bool handle(Message message, bool control, uint32_t requestID) override; - bool handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) override; + bool handle(Message message, uint32_t requestID) override; + bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) override; private: // members