From 17f437887f60dea56c489a3097e98d7b456393a1 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Thu, 16 Jan 2025 15:37:21 +0100 Subject: [PATCH] chore(remoteFDB): some improvements --- src/fdb5/remote/client/Client.h | 9 +++-- src/fdb5/remote/client/RemoteCatalogue.cc | 26 +++++--------- src/fdb5/remote/client/RemoteStore.cc | 16 ++++----- src/fdb5/remote/server/CatalogueHandler.cc | 4 +-- src/fdb5/remote/server/StoreHandler.cc | 20 +++++++---- src/fdb5/remote/server/StoreHandler.h | 42 ++++++++++++---------- 6 files changed, 63 insertions(+), 54 deletions(-) diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 72594a405..dfaa5829b 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -35,12 +35,17 @@ class RemoteFDBException : public eckit::RemoteException { class Client : eckit::NonCopyable { public: // types - using PayloadList = Connection::PayloadList; + using PayloadList = Connection::PayloadList; + using EndpointList = std::vector>; + + static constexpr size_t defaultBufferSizeArchive = 8192; + static constexpr size_t defaultBufferSizeFlush = 1024; + static constexpr size_t defaultBufferSizeKey = 4096; public: // methods Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint); - Client(const std::vector>& endpoints); + Client(const EndpointList& endpoints); virtual ~Client(); diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index 0725419eb..7a5ce9a6a 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -28,14 +28,6 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -namespace { - -constexpr size_t archiveBufferSize = 8192; -constexpr size_t keyBufferSize = 4096; -} - -//---------------------------------------------------------------------------------------------------------------------- - RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config): CatalogueImpl(key, ControlIdentifiers(), config), // xxx what are control identifiers? Setting empty here... Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""), @@ -65,7 +57,7 @@ void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::share numLocations_++; } - Buffer buffer(archiveBufferSize); + Buffer buffer(defaultBufferSizeArchive); MemoryStream stream(buffer); stream << idxKey; stream << datumKey; @@ -107,8 +99,8 @@ void RemoteCatalogue::flush(size_t archivedFields) { // Flush only does anything if there is an ongoing archive(); if (numLocations_ > 0) { - Buffer sendBuf(1024); - MemoryStream s(sendBuf); + eckit::Buffer sendBuf(defaultBufferSizeFlush); + eckit::MemoryStream s(sendBuf); s << numLocations_; LOG_DEBUG_LIB(LibFdb5) << " RemoteCatalogue::flush - flushing " << numLocations_ << " fields" << std::endl; @@ -126,18 +118,18 @@ void RemoteCatalogue::close() {NOTIMP;} bool RemoteCatalogue::exists() const { - bool exists = false; + bool result = false; - Buffer sendBuf(keyBufferSize); - MemoryStream sms(sendBuf); + eckit::Buffer sendBuf(defaultBufferSizeKey); + eckit::MemoryStream sms(sendBuf); sms << dbKey_; eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); eckit::MemoryStream rms(recvBuf); - rms >> exists; + rms >> result; - return exists; + return result; } void RemoteCatalogue::checkUID() const { @@ -156,7 +148,7 @@ void RemoteCatalogue::loadSchema() { LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl; // send dbkey to remote. - eckit::Buffer keyBuffer(keyBufferSize); + eckit::Buffer keyBuffer(defaultBufferSizeKey); eckit::MemoryStream keyStream(keyBuffer); keyStream << dbKey_; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index d1e545f99..8df0bb25e 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -196,18 +196,16 @@ class FDBRemoteDataHandle : public DataHandle { bool complete_; }; -using EndPointList = std::vector>; - -EndPointList storeEndpoints(const Config& config) { +Client::EndpointList storeEndpoints(const Config& config) { ASSERT(config.has("stores")); ASSERT(config.has("fieldLocationEndpoints")); - std::vector stores = config.getStringVector("stores"); - std::vector fieldLocationEndpoints = config.getStringVector("fieldLocationEndpoints"); + const auto stores = config.getStringVector("stores"); + const auto fieldLocationEndpoints = config.getStringVector("fieldLocationEndpoints"); ASSERT(stores.size() == fieldLocationEndpoints.size()); - EndPointList out; + Client::EndpointList out; out.reserve(stores.size()); for (size_t i = 0; i < stores.size(); ++i) { out.emplace_back(eckit::net::Endpoint {stores.at(i)}, fieldLocationEndpoints.at(i)); @@ -269,8 +267,8 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length // store the callback, associated with the request id - to be done BEFORE sending the data locations_.archive(id, catalogue_archive); - Buffer keyBuffer(4096); - MemoryStream keyStream(keyBuffer); + eckit::Buffer keyBuffer(defaultBufferSizeKey); + eckit::MemoryStream keyStream(keyBuffer); keyStream << dbKey_; keyStream << key; @@ -300,7 +298,7 @@ size_t RemoteStore::flush() { size_t locations = complete ? locations_.archived() : locations_.wait(); - Buffer sendBuf(1024); + Buffer sendBuf(defaultBufferSizeFlush); MemoryStream s(sendBuf); s << locations; diff --git a/src/fdb5/remote/server/CatalogueHandler.cc b/src/fdb5/remote/server/CatalogueHandler.cc index 46a504bac..75ac5c01a 100644 --- a/src/fdb5/remote/server/CatalogueHandler.cc +++ b/src/fdb5/remote/server/CatalogueHandler.cc @@ -361,8 +361,8 @@ void CatalogueHandler::exists(uint32_t clientID, uint32_t requestID, eckit::Buff bool exists = false; { - MemoryStream stream(payload); - const Key dbKey(stream); + eckit::MemoryStream stream(payload); + const Key dbKey(stream); exists = CatalogueReaderFactory::instance().build(dbKey, config_)->exists(); } diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index 1baa108b0..e11d4e491 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -8,25 +8,33 @@ * does it submit to any jurisdiction. */ -#include "eckit/config/Resource.h" -#include "eckit/serialisation/MemoryStream.h" +#include "fdb5/remote/server/StoreHandler.h" #include "fdb5/LibFdb5.h" +#include "fdb5/database/Key.h" #include "fdb5/database/Store.h" -#include "fdb5/remote/server/StoreHandler.h" +#include "fdb5/remote/Messages.h" +#include "fdb5/remote/server/ServerConnection.h" + +#include "eckit/net/TCPSocket.h" +#include "eckit/serialisation/MemoryStream.h" + +#include +#include +#include +#include using namespace eckit; -using metkit::mars::MarsRequest; namespace fdb5::remote { +//---------------------------------------------------------------------------------------------------------------------- + StoreHandler::StoreHandler(eckit::net::TCPSocket& socket, const Config& config): ServerConnection(socket, config) { LibFdb5::instance().constructorCallback()(*this); } -StoreHandler::~StoreHandler() {} - Handled StoreHandler::handleControl(Message message, uint32_t clientID, uint32_t requestID) { try { diff --git a/src/fdb5/remote/server/StoreHandler.h b/src/fdb5/remote/server/StoreHandler.h index 40362b09a..4f342589a 100644 --- a/src/fdb5/remote/server/StoreHandler.h +++ b/src/fdb5/remote/server/StoreHandler.h @@ -14,27 +14,16 @@ #include "fdb5/database/Store.h" #include "fdb5/remote/server/ServerConnection.h" -namespace fdb5::remote { - -//---------------------------------------------------------------------------------------------------------------------- - -struct StoreHelper { - StoreHelper(bool dataConnection, const Key& dbKey, const Config& config) : - controlConnection(true), dataConnection(dataConnection), - store(StoreFactory::instance().build(dbKey, config)) {} +#include +#include +#include - bool controlConnection; - bool dataConnection; - - std::unique_ptr store; -}; +namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- class StoreHandler : public ServerConnection, public CallbackRegistry { public: // methods - StoreHandler(eckit::net::TCPSocket& socket, const Config& config); - ~StoreHandler() override; private: // methods @@ -42,24 +31,41 @@ class StoreHandler : public ServerConnection, public CallbackRegistry { Handled handleControl(Message message, uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) override; void flush(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload); + void read(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload); - void archiveBlob(const uint32_t clientID, const uint32_t requestID, const void* data, size_t length) override; + void exists(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload) const; + + void archiveBlob(uint32_t clientID, uint32_t requestID, const void* data, size_t length) override; void readLocationThreadLoop(); - void writeToParent(const uint32_t clientID, const uint32_t requestID, std::unique_ptr dh); + + void writeToParent(uint32_t clientID, uint32_t requestID, std::unique_ptr dh); bool remove(bool control, uint32_t clientID) override; Store& store(uint32_t clientID); + Store& store(uint32_t clientID, const Key& dbKey); private: // members - + struct StoreHelper; // clientID --> Store std::map stores_; }; //---------------------------------------------------------------------------------------------------------------------- +struct StoreHandler::StoreHelper { + StoreHelper(bool dataConnection, const Key& dbKey, const Config& config) + : dataConnection(dataConnection), store(StoreFactory::instance().build(dbKey, config)) { } + + bool controlConnection {true}; + bool dataConnection {false}; + + std::unique_ptr store; +}; + +//---------------------------------------------------------------------------------------------------------------------- + } // namespace fdb5::remote