From 5c18872086ba5a1c8a20d5c04feb612c5b3add56 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Fri, 17 Jan 2025 12:32:34 +0100 Subject: [PATCH] feat(remoteFDB): RemoteCatalogue loads schema lazily related to issue of creating empty DBs --- src/fdb5/remote/client/ClientConnection.cc | 4 +- src/fdb5/remote/client/RemoteCatalogue.cc | 59 +++++++++++++--------- src/fdb5/remote/client/RemoteCatalogue.h | 6 ++- src/fdb5/remote/client/RemoteStore.cc | 2 +- 4 files changed, 43 insertions(+), 28 deletions(-) diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 1a0fe06d9..2501f7b83 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -218,14 +218,14 @@ void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t r } uint32_t payloadLength = 0; - for (auto payload : payloads) { + for (const auto& payload : payloads) { ASSERT(payload.data); payloadLength += payload.length; } eckit::Buffer buffer{payloadLength}; uint32_t offset = 0; - for (auto payload : payloads) { + for (const auto& payload : payloads) { buffer.copy(payload.data, payload.length, offset); offset += payload.length; } diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index 7a5ce9a6a..319ae9c97 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -17,8 +17,10 @@ #include "eckit/filesystem/URI.h" #include "eckit/log/Log.h" #include "eckit/serialisation/MemoryStream.h" +#include "fdb5/rules/Schema.h" #include +#include #include #include @@ -28,18 +30,38 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config): - CatalogueImpl(key, ControlIdentifiers(), config), // xxx what are control identifiers? Setting empty here... - Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""), - config_(config), schema_(nullptr), numLocations_(0) { +namespace { - loadSchema(); +Schema* fetchSchema(const Key& dbKey, const RemoteCatalogue& catalogue) { + LOG_DEBUG_LIB(LibFdb5) << "Fetching schema from remote catalogue: " << catalogue.controlEndpoint() << std::endl; + + // send dbkey to remote + eckit::Buffer keyBuffer(RemoteCatalogue::defaultBufferSizeKey); + eckit::MemoryStream keyStream(keyBuffer); + keyStream << dbKey; + + const auto requestID = catalogue.generateRequestID(); + + // receive schema from remote + auto recvBuf = catalogue.controlWriteReadResponse(Message::Schema, requestID, keyBuffer, keyStream.position()); + + eckit::MemoryStream schemaStream(recvBuf); + return eckit::Reanimator::reanimate(schemaStream); } +} // namespace + +//---------------------------------------------------------------------------------------------------------------------- + +RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config) + : CatalogueImpl(key, {}, config), // xxx what are control identifiers? Setting empty here... + Client({config.getString("host"), config.getInt("port")}, ""), + config_(config) { } + // Catalogue(URI, Config) is only used by the Visitors to traverse the catalogue. In the remote, we use the RemoteFDB for catalogue traversal // this ctor is here only to comply with the factory -RemoteCatalogue::RemoteCatalogue(const eckit::URI& uri, const Config& config): - Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""), config_(config), schema_(nullptr), numLocations_(0) { +RemoteCatalogue::RemoteCatalogue(const eckit::URI& /*uri*/, const Config& config) + : Client({config.getString("host"), config.getInt("port")}, ""), config_(config) { NOTIMP; } @@ -87,7 +109,11 @@ void RemoteCatalogue::deselectIndex() { currentIndexKey_ = Key(); } const Schema& RemoteCatalogue::schema() const { - ASSERT(schema_); + // lazy loading schema + if (!schema_) { + schema_.reset(fetchSchema(dbKey_, *this)); + ASSERT(schema_); + } return *schema_; } @@ -124,7 +150,7 @@ bool RemoteCatalogue::exists() const { eckit::MemoryStream sms(sendBuf); sms << dbKey_; - eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); + auto recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); eckit::MemoryStream rms(recvBuf); rms >> result; @@ -143,20 +169,7 @@ eckit::URI RemoteCatalogue::uri() const { void RemoteCatalogue::loadSchema() { // NB we're at the db level, so get the db schema. We will want to get the master schema beforehand. // (outside of the catalogue) - - if (!schema_) { - LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl; - - // send dbkey to remote. - eckit::Buffer keyBuffer(defaultBufferSizeKey); - eckit::MemoryStream keyStream(keyBuffer); - keyStream << dbKey_; - - eckit::Buffer buf = controlWriteReadResponse(Message::Schema, generateRequestID(), keyBuffer, keyStream.position()); - - eckit::MemoryStream s(buf); - schema_.reset(eckit::Reanimator::reanimate(s)); - } + if (!schema_) { schema_.reset(fetchSchema(dbKey_, *this)); } } bool RemoteCatalogue::handle(Message message, uint32_t requestID) { diff --git a/src/fdb5/remote/client/RemoteCatalogue.h b/src/fdb5/remote/client/RemoteCatalogue.h index b51009780..dca8b7e84 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.h +++ b/src/fdb5/remote/client/RemoteCatalogue.h @@ -6,6 +6,7 @@ #include "fdb5/database/Catalogue.h" #include "fdb5/database/DbStats.h" #include "fdb5/database/Index.h" +#include "fdb5/database/Key.h" #include "fdb5/database/Store.h" #include "fdb5/remote/client/Client.h" @@ -14,6 +15,7 @@ #include #include #include +#include #include #include @@ -89,10 +91,10 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C private: Key currentIndexKey_; - std::unique_ptr schema_; + mutable std::unique_ptr schema_; std::mutex archiveMutex_; - size_t numLocations_; + size_t numLocations_ {0}; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index 946862239..ea340595a 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -251,7 +251,7 @@ bool RemoteStore::exists() const { eckit::MemoryStream sms(sendBuf); sms << dbKey_; - eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); + auto recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); eckit::MemoryStream rms(recvBuf); rms >> result;