From d0cbd50a0b71ecdd64a12acd33b6ee24647179a2 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Tue, 14 Jan 2025 16:57:56 +0100 Subject: [PATCH 1/4] fix(remoteFDB): consted connection read/write --- src/fdb5/remote/Connection.cc | 37 ++++---- src/fdb5/remote/Connection.h | 47 +++++++---- src/fdb5/remote/client/Client.cc | 42 +++++++--- src/fdb5/remote/client/Client.h | 33 +++++--- src/fdb5/remote/client/ClientConnection.cc | 47 +++++++---- src/fdb5/remote/client/ClientConnection.h | 34 ++++---- src/fdb5/remote/client/RemoteStore.cc | 9 +- src/fdb5/remote/server/ServerConnection.cc | 98 ++++++++++++++-------- src/fdb5/remote/server/ServerConnection.h | 23 ++--- 9 files changed, 231 insertions(+), 139 deletions(-) diff --git a/src/fdb5/remote/Connection.cc b/src/fdb5/remote/Connection.cc index f4468f85f..bfacf9e34 100644 --- a/src/fdb5/remote/Connection.cc +++ b/src/fdb5/remote/Connection.cc @@ -3,13 +3,13 @@ #include "fdb5/LibFdb5.h" #include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -Connection::Connection() : single_(false) {} -Connection::~Connection() {} +Connection::Connection() : single_(false) { } void Connection::teardown() { @@ -19,20 +19,20 @@ void Connection::teardown() { // all done - disconnecting Connection::write(Message::Exit, false, 0, 0); } catch(...) { - // if connection is already down, no need to escalate + // if connection is already down, no need to escalate } } try { // all done - disconnecting Connection::write(Message::Exit, true, 0, 0); } catch(...) { - // if connection is already down, no need to escalate + // if connection is already down, no need to escalate } } //---------------------------------------------------------------------------------------------------------------------- -void Connection::writeUnsafe(bool control, const void* data, size_t length) { +void Connection::writeUnsafe(const bool control, const void* data, const size_t length) const { long written = 0; if (control || single_) { written = controlSocket().write(data, length); @@ -51,7 +51,7 @@ void Connection::writeUnsafe(bool control, const void* data, size_t length) { } } -void Connection::readUnsafe(bool control, void* data, size_t length) { +void Connection::readUnsafe(bool control, void* data, size_t length) const { long read = 0; if (control || single_) { read = controlSocket().read(data, length); @@ -70,7 +70,7 @@ void Connection::readUnsafe(bool control, void* data, size_t length) { } } -eckit::Buffer Connection::read(bool control, MessageHeader& hdr) { +eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { eckit::FixedString<4> tail; std::lock_guard lock((control || single_) ? readControlMutex_ : readDataMutex_); @@ -99,11 +99,7 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) { return payload; } -void Connection::write(remote::Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) { - write(msg, control, clientID, requestID, std::vector>{{data, length}}); -} - -void Connection::write(remote::Message msg, bool control, uint32_t clientID, uint32_t requestID, std::vector> data) { +void Connection::write(Message msg, const bool control, const uint32_t clientID, const uint32_t requestID, Payload data) const { uint32_t payloadLength = 0; for (auto d: data) { @@ -123,16 +119,25 @@ void Connection::write(remote::Message msg, bool control, uint32_t clientID, uin writeUnsafe(control, &EndMarker, sizeof(EndMarker)); } -void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) { +void Connection::write(Message msg, + const bool control, + const uint32_t clientID, + const uint32_t requestID, + const void* data, + const uint32_t length) const { + write(msg, control, clientID, requestID, {{data, length}}); +} + +void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) const { eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl; - write(Message::Error, false, clientID, requestID, std::vector>{{msg.c_str(), msg.length()}}); + write(Message::Error, false, clientID, requestID, {{msg.c_str(), msg.length()}}); } -eckit::Buffer Connection::readControl(MessageHeader& hdr) { +eckit::Buffer Connection::readControl(MessageHeader& hdr) const { return read(true, hdr); } -eckit::Buffer Connection::readData(MessageHeader& hdr) { +eckit::Buffer Connection::readData(MessageHeader& hdr) const { return read(false, hdr); } diff --git a/src/fdb5/remote/Connection.h b/src/fdb5/remote/Connection.h index 560f8b01c..10393cc0f 100644 --- a/src/fdb5/remote/Connection.h +++ b/src/fdb5/remote/Connection.h @@ -10,11 +10,17 @@ #pragma once +#include "fdb5/remote/Messages.h" + #include "eckit/exception/Exceptions.h" #include "eckit/net/TCPSocket.h" #include "eckit/os/BackTrace.h" -#include "fdb5/remote/Messages.h" +#include +#include +#include +#include +#include namespace eckit { @@ -40,41 +46,46 @@ class TCPException : public eckit::Exception { class Connection : eckit::NonCopyable { +public: // types + using Payload = std::vector>; + public: // methods Connection(); - virtual ~Connection(); - void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length); - void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, std::vector> data = {}); + virtual ~Connection() = default; + + void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, Payload data = {}) const; - void error(const std::string& msg, uint32_t clientID, uint32_t requestID); + void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const; - eckit::Buffer readControl(MessageHeader& hdr); - eckit::Buffer readData(MessageHeader& hdr); + void error(const std::string& msg, uint32_t clientID, uint32_t requestID) const; + + eckit::Buffer readControl(MessageHeader& hdr) const; + + eckit::Buffer readData(MessageHeader& hdr) const; void teardown(); private: // methods + eckit::Buffer read(bool control, MessageHeader& hdr) const; + + void writeUnsafe(bool control, const void* data, size_t length) const; - eckit::Buffer read(bool control, MessageHeader& hdr); + void readUnsafe(bool control, void* data, size_t length) const; - void writeUnsafe(bool control, const void* data, size_t length); - void readUnsafe(bool control, void* data, size_t length); + virtual const eckit::net::TCPSocket& controlSocket() const = 0; - virtual eckit::net::TCPSocket& controlSocket() = 0; - virtual eckit::net::TCPSocket& dataSocket() = 0; + virtual const eckit::net::TCPSocket& dataSocket() const = 0; protected: // members bool single_; private: // members - - std::mutex controlMutex_; - std::mutex dataMutex_; - std::mutex readControlMutex_; - std::mutex readDataMutex_; - + mutable std::mutex controlMutex_; + mutable std::mutex dataMutex_; + mutable std::mutex readControlMutex_; + mutable std::mutex readDataMutex_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index d443e37ea..123d7ccfa 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -8,12 +8,23 @@ * does it submit to any jurisdiction. */ - -#include "fdb5/LibFdb5.h" - #include "fdb5/remote/client/Client.h" + +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" #include "fdb5/remote/client/ClientConnectionRouter.h" +#include "eckit/exception/Exceptions.h" +#include "eckit/io/Buffer.h" +#include "eckit/net/Endpoint.h" + +#include +#include +#include +#include +#include +#include + namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- @@ -44,29 +55,34 @@ Client::~Client() { connection_.remove(id_); } -void Client::controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, const void* payload, uint32_t payloadLength) { +void Client::controlWriteCheckResponse(Message msg, + uint32_t requestID, + bool dataListener, + const void* payload, + uint32_t payloadLength) const { ASSERT(requestID); ASSERT(!(!payloadLength ^ !payload)); std::lock_guard lock(blockingRequestMutex_); - std::vector> data; - if (payloadLength) { - data.push_back(std::make_pair(payload, payloadLength)); - } + Connection::Payload data; + if (payloadLength != 0) { data.push_back(std::make_pair(payload, payloadLength)); } - std::future f = connection_.controlWrite(*this, msg, requestID, dataListener, data); + auto f = connection_.controlWrite(*this, msg, requestID, dataListener, data); f.wait(); ASSERT(f.get().size() == 0); } -eckit::Buffer Client::controlWriteReadResponse(Message msg, uint32_t requestID, const void* payload, uint32_t payloadLength) { +eckit::Buffer Client::controlWriteReadResponse(Message msg, + uint32_t requestID, + const void* payload, + uint32_t payloadLength) const { ASSERT(requestID); ASSERT(!(!payloadLength ^ !payload)); std::lock_guard lock(blockingRequestMutex_); - - std::vector> data{}; + + Connection::Payload data {}; if (payloadLength) { data.push_back(std::make_pair(payload, payloadLength)); } @@ -76,7 +92,7 @@ eckit::Buffer Client::controlWriteReadResponse(Message msg, uint32_t requestID, return eckit::Buffer{f.get()}; } -void Client::dataWrite(remote::Message msg, uint32_t requestID, std::vector> data) { +void Client::dataWrite(remote::Message msg, uint32_t requestID, Connection::Payload data) { connection_.dataWrite(*this, msg, requestID, data); } diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 689e42607..4e1bc4e6f 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -10,11 +10,10 @@ #pragma once -#include "eckit/config/Configuration.h" #include "eckit/memory/NonCopyable.h" #include "eckit/net/Endpoint.h" -#include "fdb5/database/Key.h" +#include "fdb5/remote/Connection.h" #include "fdb5/remote/Messages.h" #include "fdb5/remote/client/ClientConnection.h" @@ -33,28 +32,40 @@ class RemoteFDBException : public eckit::RemoteException { class Client : eckit::NonCopyable { public: Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint); + Client(const std::vector>& endpoints); + virtual ~Client(); uint32_t clientId() const { return id_; } + uint32_t id() const { return id_; } + const eckit::net::Endpoint& controlEndpoint() const { return connection_.controlEndpoint(); } + const std::string& defaultEndpoint() const { return connection_.defaultEndpoint(); } - uint32_t generateRequestID() { return connection_.generateRequestID(); } + uint32_t generateRequestID() const { return connection_.generateRequestID(); } // blocking requests - void controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, const void* payload=nullptr, uint32_t payloadLength=0); - eckit::Buffer controlWriteReadResponse (Message msg, uint32_t requestID, const void* payload=nullptr, uint32_t payloadLength=0); + void controlWriteCheckResponse(Message msg, + uint32_t requestID, + bool dataListener, + const void* payload = nullptr, + uint32_t payloadLength = 0) const; + + eckit::Buffer controlWriteReadResponse(Message msg, + uint32_t requestID, + const void* payload = nullptr, + uint32_t payloadLength = 0) const; + + void dataWrite(Message msg, uint32_t requestID, Connection::Payload data = {}); - void dataWrite(remote::Message msg, uint32_t requestID, std::vector> data={}); - // handlers for incoming messages - to be defined in the client class - virtual bool handle(Message message, uint32_t requestID) = 0; + virtual bool handle(Message message, uint32_t requestID) = 0; virtual bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) = 0; protected: - ClientConnection& connection_; private: @@ -63,7 +74,7 @@ class Client : eckit::NonCopyable { private: uint32_t id_; - std::mutex blockingRequestMutex_; + mutable std::mutex blockingRequestMutex_; }; -} \ No newline at end of file +} // namespace fdb5::remote diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index f85707156..689444edb 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -1,23 +1,33 @@ - -#include -#include +#include "fdb5/remote/client/ClientConnection.h" +#include "fdb5/LibFdb5.h" +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" +#include "fdb5/remote/client/ClientConnectionRouter.h" #include "eckit/config/LocalConfiguration.h" #include "eckit/config/Resource.h" +#include "eckit/container/Queue.h" +#include "eckit/exception/Exceptions.h" #include "eckit/io/Buffer.h" #include "eckit/log/Bytes.h" +#include "eckit/log/CodeLocation.h" #include "eckit/log/Log.h" -#include "eckit/message/Message.h" -#include "eckit/runtime/Main.h" +#include "eckit/net/Endpoint.h" +#include "eckit/runtime/SessionID.h" #include "eckit/serialisation/MemoryStream.h" -#include "eckit/utils/Translator.h" -#include "fdb5/LibFdb5.h" -#include "fdb5/remote/Messages.h" -#include "fdb5/remote/RemoteFieldLocation.h" -#include "fdb5/remote/client/ClientConnection.h" -#include "fdb5/remote/client/ClientConnectionRouter.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace fdb5::remote { @@ -162,9 +172,13 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const { return conf; } -// ----------------------------------------------------------------------------------------------------- +//---------------------------------------------------------------------------------------------------------------------- -std::future ClientConnection::controlWrite(Client& client, Message msg, uint32_t requestID, bool dataListener, std::vector> data) { +std::future ClientConnection::controlWrite(const Client& client, + Message msg, + const uint32_t requestID, + const bool /*dataListener*/, + Payload data) const { std::future f; { std::lock_guard lock(promisesMutex_); @@ -176,11 +190,12 @@ std::future ClientConnection::controlWrite(Client& client, Messag return f; } -void ClientConnection::dataWrite(DataWriteRequest& r) { - Connection::write(r.msg_, false, r.client_->clientId(), r.id_, r.data_.data(), r.data_.size()); +void ClientConnection::dataWrite(DataWriteRequest& request) const { + Connection::write(request.msg_, false, request.client_->clientId(), request.id_, request.data_.data(), + request.data_.size()); } -void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, std::vector> data) { +void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, Payload data) { static size_t maxQueueLength = eckit::Resource("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 320); { diff --git a/src/fdb5/remote/client/ClientConnection.h b/src/fdb5/remote/client/ClientConnection.h index f24cb8112..9a5bb870f 100644 --- a/src/fdb5/remote/client/ClientConnection.h +++ b/src/fdb5/remote/client/ClientConnection.h @@ -10,19 +10,19 @@ #pragma once -#include -#include +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" #include "eckit/config/LocalConfiguration.h" #include "eckit/container/Queue.h" #include "eckit/io/Buffer.h" -#include "eckit/io/Length.h" +#include "eckit/net/Endpoint.h" #include "eckit/net/TCPClient.h" -#include "eckit/net/TCPStream.h" +#include "eckit/net/TCPSocket.h" #include "eckit/runtime/SessionID.h" -#include "fdb5/remote/Messages.h" -#include "fdb5/remote/Connection.h" +#include +#include namespace fdb5::remote { @@ -35,11 +35,15 @@ class DataWriteRequest; class ClientConnection : protected Connection { public: // methods + ~ClientConnection() override; - virtual ~ClientConnection(); + std::future controlWrite(const Client& client, + Message msg, + uint32_t requestID, + bool startDataListener, + Payload data = {}) const; - std::future controlWrite(Client& client, Message msg, uint32_t requestID, bool startDataListener, std::vector> data={}); - void dataWrite(Client& client, Message msg, uint32_t requestID, std::vector> data={}); + void dataWrite(Client& client, Message msg, uint32_t requestID, Payload data = {}); void add(Client& client); bool remove(uint32_t clientID); @@ -57,7 +61,7 @@ class ClientConnection : protected Connection { ClientConnection(const eckit::net::Endpoint& controlEndpoint, const std::string& defaultEndpoint); - void dataWrite(DataWriteRequest& dataWriteRequest); + void dataWrite(DataWriteRequest& request) const; // construct dictionary for protocol negotiation - to be defined in the client class eckit::LocalConfiguration availableFunctionality() const; @@ -73,8 +77,9 @@ class ClientConnection : protected Connection { void listeningDataThreadLoop(); void dataWriteThreadLoop(); - eckit::net::TCPSocket& controlSocket() override { return controlClient_; } - eckit::net::TCPSocket& dataSocket() override { return dataClient_; } + const eckit::net::TCPSocket& controlSocket() const override { return controlClient_; } + + const eckit::net::TCPSocket& dataSocket() const override { return dataClient_; } private: // members @@ -104,8 +109,9 @@ class ClientConnection : protected Connection { bool controlStopping_; bool dataStopping_; - std::mutex promisesMutex_; - std::map> promises_; + mutable std::mutex promisesMutex_; + + mutable std::map> promises_; std::mutex dataWriteMutex_; std::unique_ptr> dataWriteQueue_; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index d1951f77b..e86d4a55c 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -20,11 +20,12 @@ #include "eckit/serialisation/MemoryStream.h" #include "fdb5/LibFdb5.h" -#include "fdb5/rules/Rule.h" #include "fdb5/database/FieldLocation.h" -#include "fdb5/remote/client/RemoteStore.h" -#include "fdb5/remote/RemoteFieldLocation.h" #include "fdb5/io/FDBFileHandle.h" +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/RemoteFieldLocation.h" +#include "fdb5/remote/client/RemoteStore.h" +#include "fdb5/rules/Rule.h" #include @@ -251,7 +252,7 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length keyStream << dbKey_; keyStream << key; - std::vector> payloads; + Connection::Payload payloads; payloads.push_back(std::pair{keyBuffer, keyStream.position()}); payloads.push_back(std::pair{data, length}); diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 8eaf0c5fc..01d3c845c 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -13,33 +13,51 @@ * (Project ID: 671951) www.nextgenio.eu */ -#include -#include +#include "fdb5/remote/server/ServerConnection.h" #include "eckit/config/Resource.h" -#include "eckit/maths/Functions.h" -#include "eckit/net/Endpoint.h" -#include "eckit/runtime/Main.h" -#include "eckit/runtime/SessionID.h" -#include "eckit/serialisation/MemoryStream.h" -#include "eckit/log/Log.h" - #include "fdb5/LibFdb5.h" -#include "fdb5/fdb5_version.h" #include "fdb5/api/helpers/FDBToolRequest.h" -#include "fdb5/database/Key.h" -#include "fdb5/remote/server/AvailablePortList.h" +#include "fdb5/fdb5_version.h" +#include "fdb5/remote/Connection.h" #include "fdb5/remote/Messages.h" -#include "fdb5/remote/RemoteFieldLocation.h" -#include "fdb5/api/FDB.h" +#include "fdb5/remote/server/AvailablePortList.h" -#include "fdb5/remote/server/ServerConnection.h" +#include "eckit/config/LocalConfiguration.h" +#include "eckit/exception/Exceptions.h" +#include "eckit/log/CodeLocation.h" +#include "eckit/log/Log.h" +#include "eckit/net/Endpoint.h" +#include "eckit/net/TCPServer.h" +#include "eckit/net/TCPSocket.h" +#include "eckit/runtime/SessionID.h" +#include "eckit/serialisation/MemoryStream.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace fdb5::remote { +//---------------------------------------------------------------------------------------------------------------------- // helpers + namespace { +constexpr const auto defaultRetrieveQueueSize = 10000; +constexpr const auto defaultArchiveQueueSize = 320; + std::vector intersection(const eckit::LocalConfiguration& c1, const eckit::LocalConfiguration& c2, const std::string& field){ std::vector v1 = c1.getIntVector(field); @@ -57,13 +75,14 @@ std::vector intersection(const eckit::LocalConfiguration& c1, const eckit:: } // namespace -ServerConnection::ServerConnection(eckit::net::TCPSocket& socket, const Config& config) : - Connection(), config_(config), - dataListenHostname_(config.getString("dataListenHostname", "")), - readLocationQueue_(eckit::Resource("fdbRetrieveQueueSize", 10000)), - archiveQueue_(eckit::Resource("fdbServerMaxQueueSize", 320)), - controlSocket_(socket), numControlConnection_(0), numDataConnection_(0), - dataSocket_(nullptr), dataListener_(0) { +//---------------------------------------------------------------------------------------------------------------------- + +ServerConnection::ServerConnection(eckit::net::TCPSocket& socket, const Config& config) + : config_(config), + dataListenHostname_(config.getString("dataListenHostname", "")), + readLocationQueue_(eckit::Resource("fdbRetrieveQueueSize", defaultRetrieveQueueSize)), + archiveQueue_(eckit::Resource("fdbServerMaxQueueSize", defaultArchiveQueueSize)), + controlSocket_(socket) { LOG_DEBUG_LIB(LibFdb5) << "ServerConnection::ServerConnection initialized" << std::endl; } @@ -75,10 +94,22 @@ ServerConnection::~ServerConnection() { if (archiveFuture_.valid()) { archiveFuture_.wait(); } - + eckit::Log::info() << "Done" << std::endl; } +// //---------------------------------------------------------------------------------------------------------------------- +// +// uint32_t ServerConnection::writeSocketControl(const void* buf, const uint32_t length) { +// return controlSocket_.write(buf, length); +// } +// +// uint32_t ServerConnection::writeSocketData(const void* buf, const uint32_t length) { +// ASSERT(dataSocket_); +// return dataSocket_->write(buf, length); +// } + +//---------------------------------------------------------------------------------------------------------------------- Handled ServerConnection::handleData(Message message, uint32_t clientID, uint32_t requestID) { try { @@ -187,7 +218,6 @@ void ServerConnection::initialiseConnections() { agreedConf_ = eckit::LocalConfiguration(); bool compatibleProtocol = true; - std::vector rflCommon = intersection(clientAvailableFunctionality, serverConf, "RemoteFieldLocation"); if (rflCommon.size() > 0) { LOG_DEBUG_LIB(LibFdb5) << "Protocol negotiation - RemoteFieldLocation version " << rflCommon.back() << std::endl; @@ -291,7 +321,7 @@ void ServerConnection::initialiseConnections() { std::stringstream ss; ss << "Session IDs do not match: " << serverSession << " != " << sessionID_; throw eckit::BadValue(ss.str(), Here()); - } + } } if (!errorMsg.empty()) { @@ -360,8 +390,6 @@ size_t ServerConnection::archiveThreadLoop() { return totalArchived; } - - void ServerConnection::listeningThreadLoopData() { MessageHeader hdr; @@ -382,7 +410,7 @@ void ServerConnection::listeningThreadLoopData() { break; } else { - + Handled handled; if (payload.size() == 0) { handled = handleData(hdr.message, hdr.clientID(), hdr.requestID); @@ -396,10 +424,8 @@ void ServerConnection::listeningThreadLoopData() { case Handled::YesRemoveReadListener: { std::lock_guard lock(handlerMutex_); - dataListener_--; - if (dataListener_ == 0) { - return; - } + numDataListener_--; + if (numDataListener_ == 0) { return; } break; } case Handled::Replied: // nothing to do @@ -428,7 +454,7 @@ void ServerConnection::listeningThreadLoopData() { void ServerConnection::handle() { initialiseConnections(); - + std::thread listeningThreadData; MessageHeader hdr; @@ -469,7 +495,6 @@ void ServerConnection::handle() { handled = handleData(hdr.message, hdr.clientID(), hdr.requestID); } } - switch (handled) { @@ -479,8 +504,8 @@ void ServerConnection::handle() { case Handled::YesAddReadListener: { std::lock_guard lock(handlerMutex_); - dataListener_++; - if (dataListener_ == 1 && !single_) { + numDataListener_++; + if (numDataListener_ == 1 && !single_) { listeningThreadData = std::thread([this] { listeningThreadLoopData(); }); } } @@ -540,7 +565,6 @@ void ServerConnection::tidyWorkers() { } } - void ServerConnection::archiver() { // Ensure that we aren't already running a catalogue/store diff --git a/src/fdb5/remote/server/ServerConnection.h b/src/fdb5/remote/server/ServerConnection.h index 66ef81f56..6a4ba27e6 100644 --- a/src/fdb5/remote/server/ServerConnection.h +++ b/src/fdb5/remote/server/ServerConnection.h @@ -19,8 +19,8 @@ #include #include -#include +#include "eckit/container/Queue.h" #include "eckit/io/Buffer.h" #include "eckit/io/DataHandle.h" #include "eckit/net/TCPServer.h" @@ -66,7 +66,7 @@ struct readLocationElem { std::unique_ptr readLocation; readLocationElem() : clientID(0), requestID(0), readLocation(nullptr) {} - + readLocationElem(uint32_t clientID, uint32_t requestID, std::unique_ptr readLocation) : clientID(clientID), requestID(requestID), readLocation(std::move(readLocation)) {} }; @@ -105,14 +105,15 @@ class ServerConnection : public Connection, public Handler { // socket methods int selectDataPort(); eckit::LocalConfiguration availableFunctionality() const; - + // Worker functionality void tidyWorkers(); void waitForWorkers(); // archival thread size_t archiveThreadLoop(); - virtual void archiveBlob(const uint32_t clientID, const uint32_t requestID, const void* data, size_t length) = 0; + + virtual void archiveBlob(uint32_t clientID, uint32_t requestID, const void* data, size_t length) = 0; // archival helper methods void archiver(); @@ -124,8 +125,9 @@ class ServerConnection : public Connection, public Handler { void listeningThreadLoopData(); - eckit::net::TCPSocket& controlSocket() override { return controlSocket_; } - eckit::net::TCPSocket& dataSocket() override { + const eckit::net::TCPSocket& controlSocket() const override { return controlSocket_; } + + const eckit::net::TCPSocket& dataSocket() const override { ASSERT(dataSocket_); return *dataSocket_; } @@ -143,7 +145,7 @@ class ServerConnection : public Connection, public Handler { eckit::LocalConfiguration agreedConf_; std::mutex readLocationMutex_; std::thread readLocationWorker_; - + std::map> workerThreads_; eckit::Queue archiveQueue_; std::future archiveFuture_; @@ -151,8 +153,8 @@ class ServerConnection : public Connection, public Handler { eckit::net::TCPSocket controlSocket_; std::mutex handlerMutex_; - size_t numControlConnection_; - size_t numDataConnection_; + size_t numControlConnection_ {0}; + size_t numDataConnection_ {0}; private: @@ -160,7 +162,8 @@ class ServerConnection : public Connection, public Handler { // data connection std::unique_ptr dataSocket_; - size_t dataListener_; + + size_t numDataListener_ {0}; }; //---------------------------------------------------------------------------------------------------------------------- From e28bde68dcf478b993b5e214165ab5fab66f8eff Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Tue, 14 Jan 2025 20:03:02 +0100 Subject: [PATCH 2/4] chore(remoteFDB): removed commented code --- src/fdb5/remote/server/ServerConnection.cc | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 01d3c845c..1a6d540a3 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -98,17 +98,6 @@ ServerConnection::~ServerConnection() { eckit::Log::info() << "Done" << std::endl; } -// //---------------------------------------------------------------------------------------------------------------------- -// -// uint32_t ServerConnection::writeSocketControl(const void* buf, const uint32_t length) { -// return controlSocket_.write(buf, length); -// } -// -// uint32_t ServerConnection::writeSocketData(const void* buf, const uint32_t length) { -// ASSERT(dataSocket_); -// return dataSocket_->write(buf, length); -// } - //---------------------------------------------------------------------------------------------------------------------- Handled ServerConnection::handleData(Message message, uint32_t clientID, uint32_t requestID) { From 59f6bc917c2e2a06ae159faefb4e94ad16a0e0ca Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Wed, 15 Jan 2025 15:51:08 +0100 Subject: [PATCH 3/4] fix(remoteFDB): Message class type payload and other improvements --- src/fdb5/remote/Connection.cc | 49 +++++++++--------- src/fdb5/remote/Connection.h | 21 ++++---- src/fdb5/remote/Messages.cc | 4 +- src/fdb5/remote/Messages.h | 59 ++++++++++++---------- src/fdb5/remote/client/Client.cc | 36 +++++++------ src/fdb5/remote/client/Client.h | 8 ++- src/fdb5/remote/client/ClientConnection.cc | 32 ++++++------ src/fdb5/remote/client/ClientConnection.h | 6 +-- src/fdb5/remote/client/RemoteCatalogue.cc | 43 ++++++++++++---- src/fdb5/remote/client/RemoteCatalogue.h | 16 ++++-- src/fdb5/remote/client/RemoteStore.cc | 50 ++++++++++++------ src/fdb5/remote/server/CatalogueHandler.cc | 19 ++++--- src/fdb5/remote/server/CatalogueHandler.h | 7 ++- src/fdb5/remote/server/ServerConnection.cc | 10 ++-- src/fdb5/remote/server/StoreHandler.cc | 10 ++-- 15 files changed, 222 insertions(+), 148 deletions(-) diff --git a/src/fdb5/remote/Connection.cc b/src/fdb5/remote/Connection.cc index bfacf9e34..796b423e7 100644 --- a/src/fdb5/remote/Connection.cc +++ b/src/fdb5/remote/Connection.cc @@ -4,6 +4,9 @@ #include "fdb5/LibFdb5.h" #include "fdb5/remote/Connection.h" #include "fdb5/remote/Messages.h" +#include +#include +#include namespace fdb5::remote { @@ -32,7 +35,7 @@ void Connection::teardown() { //---------------------------------------------------------------------------------------------------------------------- -void Connection::writeUnsafe(const bool control, const void* data, const size_t length) const { +void Connection::writeUnsafe(const bool control, const void* const data, const size_t length) const { long written = 0; if (control || single_) { written = controlSocket().write(data, length); @@ -70,14 +73,14 @@ void Connection::readUnsafe(bool control, void* data, size_t length) const { } } -eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { +eckit::Buffer Connection::read(const bool control, MessageHeader& hdr) const { eckit::FixedString<4> tail; std::lock_guard lock((control || single_) ? readControlMutex_ : readDataMutex_); readUnsafe(control, &hdr, sizeof(hdr)); - ASSERT(hdr.marker == StartMarker); - ASSERT(hdr.version == CurrentVersion); + ASSERT(hdr.marker == MessageHeader::StartMarker); + ASSERT(hdr.version == MessageHeader::currentVersion); ASSERT(single_ || hdr.control() == control); eckit::Buffer payload{hdr.payloadSize}; @@ -86,7 +89,7 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { } // Ensure we have consumed exactly the correct amount from the socket. readUnsafe(control, &tail, sizeof(tail)); - ASSERT(tail == EndMarker); + ASSERT(tail == MessageHeader::EndMarker); if (hdr.message == Message::Error) { @@ -99,38 +102,36 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { return payload; } -void Connection::write(Message msg, const bool control, const uint32_t clientID, const uint32_t requestID, Payload data) const { +void Connection::write(const Message msg, + const bool control, + const uint32_t clientID, + const uint32_t requestID, + const PayloadList payloads) const { uint32_t payloadLength = 0; - for (auto d: data) { - ASSERT(d.first); - payloadLength += d.second; + for (const auto& payload : payloads) { + ASSERT(payload.data); + payloadLength += payload.length; } MessageHeader message{msg, control, clientID, requestID, payloadLength}; - LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",control=" << control << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl; + LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() + << ",control=" << control << ",requestID=" << requestID << ",payloadsSize=" << payloads.size() + << ",payloadLength=" << payloadLength << "]" << std::endl; std::lock_guard lock((control || single_) ? controlMutex_ : dataMutex_); + writeUnsafe(control, &message, sizeof(message)); - for (auto d: data) { - writeUnsafe(control, d.first, d.second); - } - writeUnsafe(control, &EndMarker, sizeof(EndMarker)); -} -void Connection::write(Message msg, - const bool control, - const uint32_t clientID, - const uint32_t requestID, - const void* data, - const uint32_t length) const { - write(msg, control, clientID, requestID, {{data, length}}); + for (const auto& payload : payloads) { writeUnsafe(control, payload.data, payload.length); } + + writeUnsafe(control, &MessageHeader::EndMarker, MessageHeader::markerBytes); } -void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) const { +void Connection::error(std::string_view msg, uint32_t clientID, uint32_t requestID) const { eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl; - write(Message::Error, false, clientID, requestID, {{msg.c_str(), msg.length()}}); + write(Message::Error, false, clientID, requestID, msg.data(), msg.length()); } eckit::Buffer Connection::readControl(MessageHeader& hdr) const { diff --git a/src/fdb5/remote/Connection.h b/src/fdb5/remote/Connection.h index 10393cc0f..5dad425c0 100644 --- a/src/fdb5/remote/Connection.h +++ b/src/fdb5/remote/Connection.h @@ -10,16 +10,18 @@ #pragma once +#include "eckit/serialisation/MemoryStream.h" #include "fdb5/remote/Messages.h" #include "eckit/exception/Exceptions.h" #include "eckit/net/TCPSocket.h" #include "eckit/os/BackTrace.h" +#include #include #include #include -#include +#include #include namespace eckit { @@ -47,18 +49,20 @@ class TCPException : public eckit::Exception { class Connection : eckit::NonCopyable { public: // types - using Payload = std::vector>; + using PayloadList = std::vector; public: // methods Connection(); virtual ~Connection() = default; - void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, Payload data = {}) const; + void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, PayloadList payloads = {}) const; - void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const; + void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const { + write(msg, control, clientID, requestID, {{length, data}}); + } - void error(const std::string& msg, uint32_t clientID, uint32_t requestID) const; + void error(std::string_view msg, uint32_t clientID, uint32_t requestID) const; eckit::Buffer readControl(MessageHeader& hdr) const; @@ -66,7 +70,7 @@ class Connection : eckit::NonCopyable { void teardown(); -private: // methods +private: // methods eckit::Buffer read(bool control, MessageHeader& hdr) const; void writeUnsafe(bool control, const void* data, size_t length) const; @@ -77,11 +81,10 @@ class Connection : eckit::NonCopyable { virtual const eckit::net::TCPSocket& dataSocket() const = 0; -protected: // members - +protected: // members bool single_; -private: // members +private: // members mutable std::mutex controlMutex_; mutable std::mutex dataMutex_; mutable std::mutex readControlMutex_; diff --git a/src/fdb5/remote/Messages.cc b/src/fdb5/remote/Messages.cc index 5e96fdab6..04e316312 100644 --- a/src/fdb5/remote/Messages.cc +++ b/src/fdb5/remote/Messages.cc @@ -15,8 +15,6 @@ #include "fdb5/remote/Messages.h" -using namespace eckit; - namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- @@ -62,7 +60,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) { MessageHeader::MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize) : marker(StartMarker), - version(CurrentVersion), + version(currentVersion), message(message), clientID_((clientID<<1) + (control ? 1 : 0)), requestID(requestID), diff --git a/src/fdb5/remote/Messages.h b/src/fdb5/remote/Messages.h index ce203b39b..6f3a2ee1d 100644 --- a/src/fdb5/remote/Messages.h +++ b/src/fdb5/remote/Messages.h @@ -18,8 +18,9 @@ #pragma once -#include #include +#include +#include #include "eckit/types/FixedString.h" @@ -31,10 +32,12 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -const static eckit::FixedString<4> StartMarker {"SFDB"}; -const static eckit::FixedString<4> EndMarker {"EFDB"}; +struct Payload { + Payload(std::size_t length, const void* data) : length {length}, data {data} { } -constexpr uint16_t CurrentVersion = 12; + std::size_t length {0}; + const void* data {nullptr}; +}; enum class Message : uint16_t { @@ -76,21 +79,31 @@ enum class Message : uint16_t { std::ostream& operator<<(std::ostream& s, const Message& m); +//---------------------------------------------------------------------------------------------------------------------- + // Header used for all messages class MessageHeader { -public: // methods +public: // types + constexpr static uint16_t currentVersion = 12; + + constexpr static const auto hashBytes = 16; + + constexpr static const auto markerBytes = 4; + + using MarkerType = eckit::FixedString; + + using HashType = eckit::FixedString; - MessageHeader() : - version(CurrentVersion), - message(Message::None), - clientID_(0), - requestID(0), - payloadSize(0) {} + inline static const MarkerType StartMarker {"SFDB"}; + inline static const MarkerType EndMarker {"EFDB"}; + +public: // methods + MessageHeader() = default; MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize); - + bool control() const { return ((clientID_ & 0x00000001) == 1); } @@ -99,21 +112,13 @@ class MessageHeader { } public: - - eckit::FixedString<4> marker; // 4 bytes --> 4 - - uint16_t version; // 2 bytes --> 6 - - Message message; // 2 bytes --> 8 - - uint32_t clientID_; // 4 bytes --> 12 - - uint32_t requestID; // 4 bytes --> 16 - - uint32_t payloadSize; // 4 bytes --> 20 - - eckit::FixedString<16> hash; // 16 bytes --> 36 - + MarkerType marker; // 4 bytes --> 4 + uint16_t version {currentVersion}; // 2 bytes --> 6 + Message message {Message::None}; // 2 bytes --> 8 + uint32_t clientID_ {0}; // 4 bytes --> 12 + uint32_t requestID {0}; // 4 bytes --> 16 + uint32_t payloadSize {0}; // 4 bytes --> 20 + HashType hash; // 16 bytes --> 36 }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index 123d7ccfa..44257edb9 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -55,45 +55,43 @@ Client::~Client() { connection_.remove(id_); } -void Client::controlWriteCheckResponse(Message msg, - uint32_t requestID, - bool dataListener, - const void* payload, - uint32_t payloadLength) const { +void Client::controlWriteCheckResponse(const Message msg, + const uint32_t requestID, + const bool dataListener, + const void* const payload, + const uint32_t payloadLength) const { ASSERT(requestID); ASSERT(!(!payloadLength ^ !payload)); std::lock_guard lock(blockingRequestMutex_); - Connection::Payload data; - if (payloadLength != 0) { data.push_back(std::make_pair(payload, payloadLength)); } + PayloadList payloads; + if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); } - auto f = connection_.controlWrite(*this, msg, requestID, dataListener, data); + auto f = connection_.controlWrite(*this, msg, requestID, dataListener, payloads); f.wait(); ASSERT(f.get().size() == 0); } -eckit::Buffer Client::controlWriteReadResponse(Message msg, - uint32_t requestID, - const void* payload, - uint32_t payloadLength) const { +eckit::Buffer Client::controlWriteReadResponse(const Message msg, + const uint32_t requestID, + const void* const payload, + const uint32_t payloadLength) const { ASSERT(requestID); ASSERT(!(!payloadLength ^ !payload)); std::lock_guard lock(blockingRequestMutex_); - Connection::Payload data {}; - if (payloadLength) { - data.push_back(std::make_pair(payload, payloadLength)); - } + PayloadList payloads; + if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); } - std::future f = connection_.controlWrite(*this, msg, requestID, false, data); + auto f = connection_.controlWrite(*this, msg, requestID, false, payloads); f.wait(); return eckit::Buffer{f.get()}; } -void Client::dataWrite(remote::Message msg, uint32_t requestID, Connection::Payload data) { - connection_.dataWrite(*this, msg, requestID, data); +void Client::dataWrite(remote::Message msg, uint32_t requestID, PayloadList payloads) { + connection_.dataWrite(*this, msg, requestID, std::move(payloads)); } } // namespace fdb5::remote diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 4e1bc4e6f..360daa72b 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -17,6 +17,10 @@ #include "fdb5/remote/Messages.h" #include "fdb5/remote/client/ClientConnection.h" +#include +#include // std::pair +#include + namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- @@ -30,6 +34,8 @@ class RemoteFDBException : public eckit::RemoteException { //---------------------------------------------------------------------------------------------------------------------- class Client : eckit::NonCopyable { + using PayloadList = Connection::PayloadList; + public: Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint); @@ -59,7 +65,7 @@ class Client : eckit::NonCopyable { const void* payload = nullptr, uint32_t payloadLength = 0) const; - void dataWrite(Message msg, uint32_t requestID, Connection::Payload data = {}); + void dataWrite(Message msg, uint32_t requestID, PayloadList payloads = {}); // handlers for incoming messages - to be defined in the client class virtual bool handle(Message message, uint32_t requestID) = 0; diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 689444edb..1a0fe06d9 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -175,17 +176,17 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const { //---------------------------------------------------------------------------------------------------------------------- std::future ClientConnection::controlWrite(const Client& client, - Message msg, + const Message msg, const uint32_t requestID, const bool /*dataListener*/, - Payload data) const { + const PayloadList payloads) const { std::future f; { std::lock_guard lock(promisesMutex_); auto pp = promises_.emplace(requestID, std::promise{}).first; f = pp->second.get_future(); } - Connection::write(msg, true, client.clientId(), requestID, data); + Connection::write(msg, true, client.clientId(), requestID, payloads); return f; } @@ -195,42 +196,43 @@ void ClientConnection::dataWrite(DataWriteRequest& request) const { request.data_.size()); } -void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, Payload data) { +void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, PayloadList payloads) { static size_t maxQueueLength = eckit::Resource("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 320); + { // retrieve or add client to the list - std::lock_guard lock(clientsMutex_); - auto it = clients_.find(client.clientId()); - ASSERT(it != clients_.end()); + std::lock_guard lock(clientsMutex_); + ASSERT(clients_.find(client.clientId()) != clients_.end()); } + { std::lock_guard lock(dataWriteMutex_); if (!dataWriteThread_.joinable()) { // Reset the queue after previous done/errors ASSERT(!dataWriteQueue_); - dataWriteQueue_.reset(new eckit::Queue{maxQueueLength}); + dataWriteQueue_ = std::make_unique>(maxQueueLength); dataWriteThread_ = std::thread([this] { dataWriteThreadLoop(); }); } } + uint32_t payloadLength = 0; - for (auto d: data) { - ASSERT(d.first); - payloadLength += d.second; + for (auto payload : payloads) { + ASSERT(payload.data); + payloadLength += payload.length; } eckit::Buffer buffer{payloadLength}; uint32_t offset = 0; - for (auto d: data) { - buffer.copy(d.first, d.second, offset); - offset += d.second; + for (auto payload : payloads) { + buffer.copy(payload.data, payload.length, offset); + offset += payload.length; } dataWriteQueue_->emplace(&client, msg, requestID, std::move(buffer)); } - void ClientConnection::dataWriteThreadLoop() { eckit::Timer timer; diff --git a/src/fdb5/remote/client/ClientConnection.h b/src/fdb5/remote/client/ClientConnection.h index 9a5bb870f..d2e362fc6 100644 --- a/src/fdb5/remote/client/ClientConnection.h +++ b/src/fdb5/remote/client/ClientConnection.h @@ -40,10 +40,10 @@ class ClientConnection : protected Connection { std::future controlWrite(const Client& client, Message msg, uint32_t requestID, - bool startDataListener, - Payload data = {}) const; + bool /*dataListener*/, + PayloadList payload = {}) const; - void dataWrite(Client& client, Message msg, uint32_t requestID, Payload data = {}); + void dataWrite(Client& client, Message msg, uint32_t requestID, PayloadList payloads = {}); void add(Client& client); bool remove(uint32_t clientID); diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index e0a93d854..0b813102b 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -8,18 +8,35 @@ * does it submit to any jurisdiction. */ -#include "eckit/config/Resource.h" -#include "eckit/log/Log.h" -#include "eckit/serialisation/MemoryStream.h" +#include "fdb5/remote/client/RemoteCatalogue.h" #include "fdb5/LibFdb5.h" -#include "fdb5/remote/client/RemoteCatalogue.h" +#include "fdb5/database/Key.h" +#include "fdb5/remote/Messages.h" + +#include "eckit/filesystem/URI.h" +#include "eckit/log/Log.h" +#include "eckit/serialisation/MemoryStream.h" -#include +#include +#include +#include using namespace eckit; + namespace fdb5::remote { +//---------------------------------------------------------------------------------------------------------------------- + +namespace { + +constexpr size_t archivePayloadSize = 8192; +constexpr size_t keyPayloadSize = 4096; + +} + +//---------------------------------------------------------------------------------------------------------------------- + RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config): CatalogueImpl(key, ControlIdentifiers(), config), // xxx what are control identifiers? Setting empty here... Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""), @@ -49,14 +66,14 @@ void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::share numLocations_++; } - Buffer buffer(8192); + Buffer buffer(archivePayloadSize); MemoryStream stream(buffer); stream << idxKey; stream << datumKey; stream << *fieldLocation; - std::vector> payloads; - payloads.push_back(std::pair{buffer, stream.position()}); + std::vector payloads; + payloads.emplace_back(stream.position(), buffer.data()); dataWrite(Message::Blob, id, payloads); @@ -124,7 +141,7 @@ void RemoteCatalogue::loadSchema() { LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl; // send dbkey to remote. - eckit::Buffer keyBuffer(4096); + eckit::Buffer keyBuffer(keyPayloadSize); eckit::MemoryStream keyStream(keyBuffer); keyStream << dbKey_; @@ -139,6 +156,7 @@ bool RemoteCatalogue::handle(Message message, uint32_t requestID) { Log::warning() << *this << " - Received [message=" << ((uint) message) << ",requestID=" << requestID << "]" << std::endl; return false; } + bool RemoteCatalogue::handle(Message message, uint32_t requestID, eckit::Buffer&& payload) { LOG_DEBUG_LIB(LibFdb5) << *this << " - Received [message=" << ((uint) message) << ",requestID=" << requestID << ",payloadSize=" << payload.size() << "]" << std::endl; return false; @@ -162,14 +180,19 @@ void RemoteCatalogue::print( std::ostream &out ) const { out << "RemoteCatalogue(endpoint=" << controlEndpoint() << ",clientID=" << clientId() << ")"; } - std::string RemoteCatalogue::type() const { return "remote"; } + bool RemoteCatalogue::open() { return true; } +//---------------------------------------------------------------------------------------------------------------------- + static CatalogueReaderBuilder reader("remote"); static CatalogueWriterBuilder writer("remote"); + +//---------------------------------------------------------------------------------------------------------------------- + } // namespace fdb5::remote diff --git a/src/fdb5/remote/client/RemoteCatalogue.h b/src/fdb5/remote/client/RemoteCatalogue.h index 1d10b9716..29631dab5 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.h +++ b/src/fdb5/remote/client/RemoteCatalogue.h @@ -1,12 +1,22 @@ #pragma once -#include "fdb5/api/FDBStats.h" +#include "fdb5/api/helpers/ControlIterator.h" +#include "fdb5/config/Config.h" #include "fdb5/database/Catalogue.h" +#include "fdb5/database/DbStats.h" #include "fdb5/database/Index.h" #include "fdb5/database/Store.h" #include "fdb5/remote/client/Client.h" +#include "eckit/filesystem/URI.h" + +#include +#include +#include +#include +#include + namespace fdb5::remote { // class RemoteCatalogueArchiver; @@ -29,8 +39,8 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C void reconsolidate() override; //From CatalogueReader - DbStats stats() const override { return DbStats(); } - bool retrieve(const Key& key, Field& field) const override { return false; } + DbStats stats() const override { return {}; } + bool retrieve(const Key& /*key*/, Field& /*field*/) const override { return false; } // From Catalogue bool selectIndex(const Key& idxKey) override; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index e86d4a55c..4f2fcec2d 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -8,26 +8,44 @@ * does it submit to any jurisdiction. */ -#include -#include - -#include "eckit/log/Timer.h" - -#include "eckit/config/Resource.h" -#include "eckit/io/AIOHandle.h" -#include "eckit/io/EmptyHandle.h" -#include "eckit/runtime/Main.h" -#include "eckit/serialisation/MemoryStream.h" +#include "fdb5/remote/client/RemoteStore.h" #include "fdb5/LibFdb5.h" +#include "fdb5/database/Field.h" #include "fdb5/database/FieldLocation.h" -#include "fdb5/io/FDBFileHandle.h" +#include "fdb5/database/Store.h" #include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" #include "fdb5/remote/RemoteFieldLocation.h" -#include "fdb5/remote/client/RemoteStore.h" +#include "fdb5/remote/client/Client.h" #include "fdb5/rules/Rule.h" -#include +#include "eckit/exception/Exceptions.h" +#include "eckit/filesystem/URI.h" +#include "eckit/io/Length.h" +#include "eckit/io/Offset.h" +#include "eckit/log/Log.h" +#include "eckit/net/Endpoint.h" +#include "eckit/runtime/Main.h" +#include "eckit/serialisation/MemoryStream.h" +#include "eckit/serialisation/Reanimator.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include using namespace eckit; @@ -252,9 +270,9 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length keyStream << dbKey_; keyStream << key; - Connection::Payload payloads; - payloads.push_back(std::pair{keyBuffer, keyStream.position()}); - payloads.push_back(std::pair{data, length}); + std::vector payloads; + payloads.emplace_back(keyStream.position(), keyBuffer.data()); + payloads.emplace_back(length, data); dataWrite(Message::Blob, id, payloads); diff --git a/src/fdb5/remote/server/CatalogueHandler.cc b/src/fdb5/remote/server/CatalogueHandler.cc index 22cccf76b..9a3dc8f80 100644 --- a/src/fdb5/remote/server/CatalogueHandler.cc +++ b/src/fdb5/remote/server/CatalogueHandler.cc @@ -8,16 +8,22 @@ * does it submit to any jurisdiction. */ -#include "eckit/config/Resource.h" +#include "fdb5/remote/server/CatalogueHandler.h" +#include "fdb5/LibFdb5.h" +#include "fdb5/api/helpers/FDBToolRequest.h" +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" +#include "fdb5/remote/server/ServerConnection.h" + #include "eckit/net/NetMask.h" +#include "eckit/net/TCPSocket.h" #include "eckit/serialisation/MemoryStream.h" -#include "fdb5/LibFdb5.h" -#include "fdb5/api/helpers/FDBToolRequest.h" -#include "fdb5/remote/server/CatalogueHandler.h" +#include +#include +#include using namespace eckit; -using metkit::mars::MarsRequest; namespace fdb5::remote { @@ -161,6 +167,7 @@ struct BaseHelper { struct ListHelper : public BaseHelper { ListIterator apiCall(FDB& fdb, const FDBToolRequest& request) const { + /// @todo remember to add level_ to this helper return fdb.list(request); } }; @@ -226,7 +233,7 @@ void CatalogueHandler::forwardApiCall(uint32_t clientID, uint32_t requestID, eck typename decltype(iterator)::value_type elem; while (iterator.next(elem)) { auto encoded(helper.encode(elem, *this)); - write(Message::Blob, false, clientID, requestID, std::vector>{{encoded.buf, encoded.position}}); + write(Message::Blob, false, clientID, requestID, encoded.buf, encoded.position); } write(Message::Complete, false, clientID, requestID); } diff --git a/src/fdb5/remote/server/CatalogueHandler.h b/src/fdb5/remote/server/CatalogueHandler.h index f63055b3a..33fdf825e 100644 --- a/src/fdb5/remote/server/CatalogueHandler.h +++ b/src/fdb5/remote/server/CatalogueHandler.h @@ -10,8 +10,11 @@ #pragma once -#include "fdb5/remote/server/ServerConnection.h" #include "fdb5/api/FDB.h" +#include "fdb5/database/Catalogue.h" +#include "fdb5/remote/server/ServerConnection.h" + +#include namespace fdb5::remote { @@ -24,7 +27,7 @@ struct CatalogueArchiver { bool controlConnection; bool dataConnection; - + std::unique_ptr catalogue; size_t locationsExpected; size_t locationsArchived; diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 1a6d540a3..85d24e25b 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -278,7 +278,7 @@ void ServerConnection::initialiseConnections() { LOG_DEBUG_LIB(LibFdb5) << "Protocol negotiation - configuration: " << agreedConf_ <>{{startupBuffer.data(), s.position()}}); + write(Message::Startup, true, 0, 0, startupBuffer.data(), s.position()); if (!single_) { ASSERT(dataSocketFuture.valid()); @@ -292,7 +292,7 @@ void ServerConnection::initialiseConnections() { MessageHeader dataHdr; eckit::Buffer payload2 = readData(dataHdr); - ASSERT(dataHdr.version == CurrentVersion); + ASSERT(dataHdr.version == MessageHeader::currentVersion); ASSERT(dataHdr.message == Message::Startup); ASSERT(dataHdr.requestID == 0); @@ -355,9 +355,9 @@ size_t ServerConnection::archiveThreadLoop() { const void* payloadData = charData; charData += hdr->payloadSize; - const decltype(EndMarker)* e = static_cast(static_cast(charData)); - ASSERT(*e == EndMarker); - charData += sizeof(EndMarker); + const auto* e = static_cast(static_cast(charData)); + ASSERT(*e == MessageHeader::EndMarker); + charData += MessageHeader::markerBytes; archiveBlob(elem.clientID_, elem.requestID_, payloadData, hdr->payloadSize); totalArchived += 1; diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index 39a9cbbec..1baa108b0 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -34,7 +34,7 @@ Handled StoreHandler::handleControl(Message message, uint32_t clientID, uint32_t case Message::Store: // notification that the client is starting to send data for archival archiver(); return Handled::YesAddArchiveListener; - + default: { std::stringstream ss; ss << "ERROR: Unexpected message recieved (" << message << "). ABORTING"; @@ -132,7 +132,7 @@ void StoreHandler::writeToParent(const uint32_t clientID, const uint32_t request << std::endl; while ((dataRead = dh->read(writeBuffer, writeBuffer.size())) != 0) { - write(Message::Blob, false, clientID, requestID, std::vector>{{writeBuffer, dataRead}}); + write(Message::Blob, false, clientID, requestID, writeBuffer, dataRead); } // And when we are done, add a complete message. @@ -157,7 +157,7 @@ void StoreHandler::writeToParent(const uint32_t clientID, const uint32_t request void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID, const void* data, size_t length) { - + MemoryStream s(data, length); fdb5::Key dbKey(s); @@ -168,7 +168,7 @@ void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID const char* charData = static_cast(data); // To allow pointer arithmetic Log::status() << "Archiving data: " << ss_key.str() << std::endl; - + Store& ss = store(clientID, dbKey); std::shared_ptr location = ss.archive(idxKey, charData + s.position(), length - s.position()); @@ -217,7 +217,7 @@ void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buf } bool StoreHandler::remove(bool control, uint32_t clientID) { - + std::lock_guard lock(handlerMutex_); auto it = stores_.find(clientID); if (it != stores_.end()) { From 1c573a650d59109b372271903794e31b2a277f33 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Thu, 16 Jan 2025 11:24:18 +0100 Subject: [PATCH 4/4] fix(remoteFDB): cleanup remote/client --- src/fdb5/remote/client/Client.cc | 2 +- src/fdb5/remote/client/Client.h | 3 ++- src/fdb5/remote/client/RemoteCatalogue.cc | 19 +++++++------------ src/fdb5/remote/client/RemoteCatalogue.h | 12 +++++++----- src/fdb5/remote/client/RemoteStore.cc | 2 +- 5 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index 44257edb9..728f8ae9d 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -90,7 +90,7 @@ eckit::Buffer Client::controlWriteReadResponse(const Message msg, return eckit::Buffer{f.get()}; } -void Client::dataWrite(remote::Message msg, uint32_t requestID, PayloadList payloads) { +void Client::dataWrite(Message msg, uint32_t requestID, PayloadList payloads) { connection_.dataWrite(*this, msg, requestID, std::move(payloads)); } diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 360daa72b..72594a405 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -34,9 +34,10 @@ class RemoteFDBException : public eckit::RemoteException { //---------------------------------------------------------------------------------------------------------------------- class Client : eckit::NonCopyable { +public: // types using PayloadList = Connection::PayloadList; -public: +public: // methods Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint); Client(const std::vector>& endpoints); diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index 0b813102b..fcde73a82 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -30,9 +30,8 @@ namespace fdb5::remote { namespace { -constexpr size_t archivePayloadSize = 8192; -constexpr size_t keyPayloadSize = 4096; - +constexpr size_t archiveBufferSize = 8192; +constexpr size_t keyBufferSize = 4096; } //---------------------------------------------------------------------------------------------------------------------- @@ -66,13 +65,13 @@ void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::share numLocations_++; } - Buffer buffer(archivePayloadSize); + Buffer buffer(archiveBufferSize); MemoryStream stream(buffer); stream << idxKey; stream << datumKey; stream << *fieldLocation; - std::vector payloads; + PayloadList payloads; payloads.emplace_back(stream.position(), buffer.data()); dataWrite(Message::Blob, id, payloads); @@ -141,7 +140,7 @@ void RemoteCatalogue::loadSchema() { LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl; // send dbkey to remote. - eckit::Buffer keyBuffer(keyPayloadSize); + eckit::Buffer keyBuffer(keyBufferSize); eckit::MemoryStream keyStream(keyBuffer); keyStream << dbKey_; @@ -180,18 +179,14 @@ void RemoteCatalogue::print( std::ostream &out ) const { out << "RemoteCatalogue(endpoint=" << controlEndpoint() << ",clientID=" << clientId() << ")"; } -std::string RemoteCatalogue::type() const { - return "remote"; -} - bool RemoteCatalogue::open() { return true; } //---------------------------------------------------------------------------------------------------------------------- -static CatalogueReaderBuilder reader("remote"); -static CatalogueWriterBuilder writer("remote"); +static CatalogueReaderBuilder reader(RemoteCatalogue::typeName()); +static CatalogueWriterBuilder writer(RemoteCatalogue::typeName()); //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/RemoteCatalogue.h b/src/fdb5/remote/client/RemoteCatalogue.h index 29631dab5..b51009780 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.h +++ b/src/fdb5/remote/client/RemoteCatalogue.h @@ -24,13 +24,13 @@ namespace fdb5::remote { class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public CatalogueImpl, public Client { -public: +public: // types + static const char* typeName() { return "remote"; } +public: // methods RemoteCatalogue(const Key& key, const Config& config); RemoteCatalogue(const eckit::URI& uri, const Config& config); - ~RemoteCatalogue() override = default; - // From CatalogueWriter const Index& currentIndex() override; void archive(const Key& idxKey, const Key& datumKey, std::shared_ptr fieldLocation) override; @@ -59,8 +59,10 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C std::vector indexes(bool sorted=false) const override; void maskIndexEntry(const Index& index) const override; void allMasked(std::set>& metadata, std::set& data) const override; - void print( std::ostream &out ) const override; - std::string type() const override; + void print(std::ostream& out) const override; + + std::string type() const override { return typeName(); } + bool open() override; void flush(size_t archivedFields) override; void clean() override; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index 4f2fcec2d..524a90337 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -270,7 +270,7 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length keyStream << dbKey_; keyStream << key; - std::vector payloads; + PayloadList payloads; payloads.emplace_back(keyStream.position(), keyBuffer.data()); payloads.emplace_back(length, data);