From 59f6bc917c2e2a06ae159faefb4e94ad16a0e0ca Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Wed, 15 Jan 2025 15:51:08 +0100 Subject: [PATCH] fix(remoteFDB): Message class type payload and other improvements --- src/fdb5/remote/Connection.cc | 49 +++++++++--------- src/fdb5/remote/Connection.h | 21 ++++---- src/fdb5/remote/Messages.cc | 4 +- src/fdb5/remote/Messages.h | 59 ++++++++++++---------- src/fdb5/remote/client/Client.cc | 36 +++++++------ src/fdb5/remote/client/Client.h | 8 ++- src/fdb5/remote/client/ClientConnection.cc | 32 ++++++------ src/fdb5/remote/client/ClientConnection.h | 6 +-- src/fdb5/remote/client/RemoteCatalogue.cc | 43 ++++++++++++---- src/fdb5/remote/client/RemoteCatalogue.h | 16 ++++-- src/fdb5/remote/client/RemoteStore.cc | 50 ++++++++++++------ src/fdb5/remote/server/CatalogueHandler.cc | 19 ++++--- src/fdb5/remote/server/CatalogueHandler.h | 7 ++- src/fdb5/remote/server/ServerConnection.cc | 10 ++-- src/fdb5/remote/server/StoreHandler.cc | 10 ++-- 15 files changed, 222 insertions(+), 148 deletions(-) diff --git a/src/fdb5/remote/Connection.cc b/src/fdb5/remote/Connection.cc index bfacf9e34..796b423e7 100644 --- a/src/fdb5/remote/Connection.cc +++ b/src/fdb5/remote/Connection.cc @@ -4,6 +4,9 @@ #include "fdb5/LibFdb5.h" #include "fdb5/remote/Connection.h" #include "fdb5/remote/Messages.h" +#include +#include +#include namespace fdb5::remote { @@ -32,7 +35,7 @@ void Connection::teardown() { //---------------------------------------------------------------------------------------------------------------------- -void Connection::writeUnsafe(const bool control, const void* data, const size_t length) const { +void Connection::writeUnsafe(const bool control, const void* const data, const size_t length) const { long written = 0; if (control || single_) { written = controlSocket().write(data, length); @@ -70,14 +73,14 @@ void Connection::readUnsafe(bool control, void* data, size_t length) const { } } -eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { +eckit::Buffer Connection::read(const bool control, MessageHeader& hdr) const { eckit::FixedString<4> tail; std::lock_guard lock((control || single_) ? readControlMutex_ : readDataMutex_); readUnsafe(control, &hdr, sizeof(hdr)); - ASSERT(hdr.marker == StartMarker); - ASSERT(hdr.version == CurrentVersion); + ASSERT(hdr.marker == MessageHeader::StartMarker); + ASSERT(hdr.version == MessageHeader::currentVersion); ASSERT(single_ || hdr.control() == control); eckit::Buffer payload{hdr.payloadSize}; @@ -86,7 +89,7 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { } // Ensure we have consumed exactly the correct amount from the socket. readUnsafe(control, &tail, sizeof(tail)); - ASSERT(tail == EndMarker); + ASSERT(tail == MessageHeader::EndMarker); if (hdr.message == Message::Error) { @@ -99,38 +102,36 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { return payload; } -void Connection::write(Message msg, const bool control, const uint32_t clientID, const uint32_t requestID, Payload data) const { +void Connection::write(const Message msg, + const bool control, + const uint32_t clientID, + const uint32_t requestID, + const PayloadList payloads) const { uint32_t payloadLength = 0; - for (auto d: data) { - ASSERT(d.first); - payloadLength += d.second; + for (const auto& payload : payloads) { + ASSERT(payload.data); + payloadLength += payload.length; } MessageHeader message{msg, control, clientID, requestID, payloadLength}; - LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",control=" << control << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl; + LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() + << ",control=" << control << ",requestID=" << requestID << ",payloadsSize=" << payloads.size() + << ",payloadLength=" << payloadLength << "]" << std::endl; std::lock_guard lock((control || single_) ? controlMutex_ : dataMutex_); + writeUnsafe(control, &message, sizeof(message)); - for (auto d: data) { - writeUnsafe(control, d.first, d.second); - } - writeUnsafe(control, &EndMarker, sizeof(EndMarker)); -} -void Connection::write(Message msg, - const bool control, - const uint32_t clientID, - const uint32_t requestID, - const void* data, - const uint32_t length) const { - write(msg, control, clientID, requestID, {{data, length}}); + for (const auto& payload : payloads) { writeUnsafe(control, payload.data, payload.length); } + + writeUnsafe(control, &MessageHeader::EndMarker, MessageHeader::markerBytes); } -void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) const { +void Connection::error(std::string_view msg, uint32_t clientID, uint32_t requestID) const { eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl; - write(Message::Error, false, clientID, requestID, {{msg.c_str(), msg.length()}}); + write(Message::Error, false, clientID, requestID, msg.data(), msg.length()); } eckit::Buffer Connection::readControl(MessageHeader& hdr) const { diff --git a/src/fdb5/remote/Connection.h b/src/fdb5/remote/Connection.h index 10393cc0f..5dad425c0 100644 --- a/src/fdb5/remote/Connection.h +++ b/src/fdb5/remote/Connection.h @@ -10,16 +10,18 @@ #pragma once +#include "eckit/serialisation/MemoryStream.h" #include "fdb5/remote/Messages.h" #include "eckit/exception/Exceptions.h" #include "eckit/net/TCPSocket.h" #include "eckit/os/BackTrace.h" +#include #include #include #include -#include +#include #include namespace eckit { @@ -47,18 +49,20 @@ class TCPException : public eckit::Exception { class Connection : eckit::NonCopyable { public: // types - using Payload = std::vector>; + using PayloadList = std::vector; public: // methods Connection(); virtual ~Connection() = default; - void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, Payload data = {}) const; + void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, PayloadList payloads = {}) const; - void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const; + void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const { + write(msg, control, clientID, requestID, {{length, data}}); + } - void error(const std::string& msg, uint32_t clientID, uint32_t requestID) const; + void error(std::string_view msg, uint32_t clientID, uint32_t requestID) const; eckit::Buffer readControl(MessageHeader& hdr) const; @@ -66,7 +70,7 @@ class Connection : eckit::NonCopyable { void teardown(); -private: // methods +private: // methods eckit::Buffer read(bool control, MessageHeader& hdr) const; void writeUnsafe(bool control, const void* data, size_t length) const; @@ -77,11 +81,10 @@ class Connection : eckit::NonCopyable { virtual const eckit::net::TCPSocket& dataSocket() const = 0; -protected: // members - +protected: // members bool single_; -private: // members +private: // members mutable std::mutex controlMutex_; mutable std::mutex dataMutex_; mutable std::mutex readControlMutex_; diff --git a/src/fdb5/remote/Messages.cc b/src/fdb5/remote/Messages.cc index 5e96fdab6..04e316312 100644 --- a/src/fdb5/remote/Messages.cc +++ b/src/fdb5/remote/Messages.cc @@ -15,8 +15,6 @@ #include "fdb5/remote/Messages.h" -using namespace eckit; - namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- @@ -62,7 +60,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) { MessageHeader::MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize) : marker(StartMarker), - version(CurrentVersion), + version(currentVersion), message(message), clientID_((clientID<<1) + (control ? 1 : 0)), requestID(requestID), diff --git a/src/fdb5/remote/Messages.h b/src/fdb5/remote/Messages.h index ce203b39b..6f3a2ee1d 100644 --- a/src/fdb5/remote/Messages.h +++ b/src/fdb5/remote/Messages.h @@ -18,8 +18,9 @@ #pragma once -#include #include +#include +#include #include "eckit/types/FixedString.h" @@ -31,10 +32,12 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -const static eckit::FixedString<4> StartMarker {"SFDB"}; -const static eckit::FixedString<4> EndMarker {"EFDB"}; +struct Payload { + Payload(std::size_t length, const void* data) : length {length}, data {data} { } -constexpr uint16_t CurrentVersion = 12; + std::size_t length {0}; + const void* data {nullptr}; +}; enum class Message : uint16_t { @@ -76,21 +79,31 @@ enum class Message : uint16_t { std::ostream& operator<<(std::ostream& s, const Message& m); +//---------------------------------------------------------------------------------------------------------------------- + // Header used for all messages class MessageHeader { -public: // methods +public: // types + constexpr static uint16_t currentVersion = 12; + + constexpr static const auto hashBytes = 16; + + constexpr static const auto markerBytes = 4; + + using MarkerType = eckit::FixedString; + + using HashType = eckit::FixedString; - MessageHeader() : - version(CurrentVersion), - message(Message::None), - clientID_(0), - requestID(0), - payloadSize(0) {} + inline static const MarkerType StartMarker {"SFDB"}; + inline static const MarkerType EndMarker {"EFDB"}; + +public: // methods + MessageHeader() = default; MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize); - + bool control() const { return ((clientID_ & 0x00000001) == 1); } @@ -99,21 +112,13 @@ class MessageHeader { } public: - - eckit::FixedString<4> marker; // 4 bytes --> 4 - - uint16_t version; // 2 bytes --> 6 - - Message message; // 2 bytes --> 8 - - uint32_t clientID_; // 4 bytes --> 12 - - uint32_t requestID; // 4 bytes --> 16 - - uint32_t payloadSize; // 4 bytes --> 20 - - eckit::FixedString<16> hash; // 16 bytes --> 36 - + MarkerType marker; // 4 bytes --> 4 + uint16_t version {currentVersion}; // 2 bytes --> 6 + Message message {Message::None}; // 2 bytes --> 8 + uint32_t clientID_ {0}; // 4 bytes --> 12 + uint32_t requestID {0}; // 4 bytes --> 16 + uint32_t payloadSize {0}; // 4 bytes --> 20 + HashType hash; // 16 bytes --> 36 }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index 123d7ccfa..44257edb9 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -55,45 +55,43 @@ Client::~Client() { connection_.remove(id_); } -void Client::controlWriteCheckResponse(Message msg, - uint32_t requestID, - bool dataListener, - const void* payload, - uint32_t payloadLength) const { +void Client::controlWriteCheckResponse(const Message msg, + const uint32_t requestID, + const bool dataListener, + const void* const payload, + const uint32_t payloadLength) const { ASSERT(requestID); ASSERT(!(!payloadLength ^ !payload)); std::lock_guard lock(blockingRequestMutex_); - Connection::Payload data; - if (payloadLength != 0) { data.push_back(std::make_pair(payload, payloadLength)); } + PayloadList payloads; + if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); } - auto f = connection_.controlWrite(*this, msg, requestID, dataListener, data); + auto f = connection_.controlWrite(*this, msg, requestID, dataListener, payloads); f.wait(); ASSERT(f.get().size() == 0); } -eckit::Buffer Client::controlWriteReadResponse(Message msg, - uint32_t requestID, - const void* payload, - uint32_t payloadLength) const { +eckit::Buffer Client::controlWriteReadResponse(const Message msg, + const uint32_t requestID, + const void* const payload, + const uint32_t payloadLength) const { ASSERT(requestID); ASSERT(!(!payloadLength ^ !payload)); std::lock_guard lock(blockingRequestMutex_); - Connection::Payload data {}; - if (payloadLength) { - data.push_back(std::make_pair(payload, payloadLength)); - } + PayloadList payloads; + if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); } - std::future f = connection_.controlWrite(*this, msg, requestID, false, data); + auto f = connection_.controlWrite(*this, msg, requestID, false, payloads); f.wait(); return eckit::Buffer{f.get()}; } -void Client::dataWrite(remote::Message msg, uint32_t requestID, Connection::Payload data) { - connection_.dataWrite(*this, msg, requestID, data); +void Client::dataWrite(remote::Message msg, uint32_t requestID, PayloadList payloads) { + connection_.dataWrite(*this, msg, requestID, std::move(payloads)); } } // namespace fdb5::remote diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 4e1bc4e6f..360daa72b 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -17,6 +17,10 @@ #include "fdb5/remote/Messages.h" #include "fdb5/remote/client/ClientConnection.h" +#include +#include // std::pair +#include + namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- @@ -30,6 +34,8 @@ class RemoteFDBException : public eckit::RemoteException { //---------------------------------------------------------------------------------------------------------------------- class Client : eckit::NonCopyable { + using PayloadList = Connection::PayloadList; + public: Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint); @@ -59,7 +65,7 @@ class Client : eckit::NonCopyable { const void* payload = nullptr, uint32_t payloadLength = 0) const; - void dataWrite(Message msg, uint32_t requestID, Connection::Payload data = {}); + void dataWrite(Message msg, uint32_t requestID, PayloadList payloads = {}); // handlers for incoming messages - to be defined in the client class virtual bool handle(Message message, uint32_t requestID) = 0; diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 689444edb..1a0fe06d9 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -175,17 +176,17 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const { //---------------------------------------------------------------------------------------------------------------------- std::future ClientConnection::controlWrite(const Client& client, - Message msg, + const Message msg, const uint32_t requestID, const bool /*dataListener*/, - Payload data) const { + const PayloadList payloads) const { std::future f; { std::lock_guard lock(promisesMutex_); auto pp = promises_.emplace(requestID, std::promise{}).first; f = pp->second.get_future(); } - Connection::write(msg, true, client.clientId(), requestID, data); + Connection::write(msg, true, client.clientId(), requestID, payloads); return f; } @@ -195,42 +196,43 @@ void ClientConnection::dataWrite(DataWriteRequest& request) const { request.data_.size()); } -void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, Payload data) { +void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, PayloadList payloads) { static size_t maxQueueLength = eckit::Resource("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 320); + { // retrieve or add client to the list - std::lock_guard lock(clientsMutex_); - auto it = clients_.find(client.clientId()); - ASSERT(it != clients_.end()); + std::lock_guard lock(clientsMutex_); + ASSERT(clients_.find(client.clientId()) != clients_.end()); } + { std::lock_guard lock(dataWriteMutex_); if (!dataWriteThread_.joinable()) { // Reset the queue after previous done/errors ASSERT(!dataWriteQueue_); - dataWriteQueue_.reset(new eckit::Queue{maxQueueLength}); + dataWriteQueue_ = std::make_unique>(maxQueueLength); dataWriteThread_ = std::thread([this] { dataWriteThreadLoop(); }); } } + uint32_t payloadLength = 0; - for (auto d: data) { - ASSERT(d.first); - payloadLength += d.second; + for (auto payload : payloads) { + ASSERT(payload.data); + payloadLength += payload.length; } eckit::Buffer buffer{payloadLength}; uint32_t offset = 0; - for (auto d: data) { - buffer.copy(d.first, d.second, offset); - offset += d.second; + for (auto payload : payloads) { + buffer.copy(payload.data, payload.length, offset); + offset += payload.length; } dataWriteQueue_->emplace(&client, msg, requestID, std::move(buffer)); } - void ClientConnection::dataWriteThreadLoop() { eckit::Timer timer; diff --git a/src/fdb5/remote/client/ClientConnection.h b/src/fdb5/remote/client/ClientConnection.h index 9a5bb870f..d2e362fc6 100644 --- a/src/fdb5/remote/client/ClientConnection.h +++ b/src/fdb5/remote/client/ClientConnection.h @@ -40,10 +40,10 @@ class ClientConnection : protected Connection { std::future controlWrite(const Client& client, Message msg, uint32_t requestID, - bool startDataListener, - Payload data = {}) const; + bool /*dataListener*/, + PayloadList payload = {}) const; - void dataWrite(Client& client, Message msg, uint32_t requestID, Payload data = {}); + void dataWrite(Client& client, Message msg, uint32_t requestID, PayloadList payloads = {}); void add(Client& client); bool remove(uint32_t clientID); diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index e0a93d854..0b813102b 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -8,18 +8,35 @@ * does it submit to any jurisdiction. */ -#include "eckit/config/Resource.h" -#include "eckit/log/Log.h" -#include "eckit/serialisation/MemoryStream.h" +#include "fdb5/remote/client/RemoteCatalogue.h" #include "fdb5/LibFdb5.h" -#include "fdb5/remote/client/RemoteCatalogue.h" +#include "fdb5/database/Key.h" +#include "fdb5/remote/Messages.h" + +#include "eckit/filesystem/URI.h" +#include "eckit/log/Log.h" +#include "eckit/serialisation/MemoryStream.h" -#include +#include +#include +#include using namespace eckit; + namespace fdb5::remote { +//---------------------------------------------------------------------------------------------------------------------- + +namespace { + +constexpr size_t archivePayloadSize = 8192; +constexpr size_t keyPayloadSize = 4096; + +} + +//---------------------------------------------------------------------------------------------------------------------- + RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config): CatalogueImpl(key, ControlIdentifiers(), config), // xxx what are control identifiers? Setting empty here... Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""), @@ -49,14 +66,14 @@ void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::share numLocations_++; } - Buffer buffer(8192); + Buffer buffer(archivePayloadSize); MemoryStream stream(buffer); stream << idxKey; stream << datumKey; stream << *fieldLocation; - std::vector> payloads; - payloads.push_back(std::pair{buffer, stream.position()}); + std::vector payloads; + payloads.emplace_back(stream.position(), buffer.data()); dataWrite(Message::Blob, id, payloads); @@ -124,7 +141,7 @@ void RemoteCatalogue::loadSchema() { LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl; // send dbkey to remote. - eckit::Buffer keyBuffer(4096); + eckit::Buffer keyBuffer(keyPayloadSize); eckit::MemoryStream keyStream(keyBuffer); keyStream << dbKey_; @@ -139,6 +156,7 @@ bool RemoteCatalogue::handle(Message message, uint32_t requestID) { Log::warning() << *this << " - Received [message=" << ((uint) message) << ",requestID=" << requestID << "]" << std::endl; return false; } + bool RemoteCatalogue::handle(Message message, uint32_t requestID, eckit::Buffer&& payload) { LOG_DEBUG_LIB(LibFdb5) << *this << " - Received [message=" << ((uint) message) << ",requestID=" << requestID << ",payloadSize=" << payload.size() << "]" << std::endl; return false; @@ -162,14 +180,19 @@ void RemoteCatalogue::print( std::ostream &out ) const { out << "RemoteCatalogue(endpoint=" << controlEndpoint() << ",clientID=" << clientId() << ")"; } - std::string RemoteCatalogue::type() const { return "remote"; } + bool RemoteCatalogue::open() { return true; } +//---------------------------------------------------------------------------------------------------------------------- + static CatalogueReaderBuilder reader("remote"); static CatalogueWriterBuilder writer("remote"); + +//---------------------------------------------------------------------------------------------------------------------- + } // namespace fdb5::remote diff --git a/src/fdb5/remote/client/RemoteCatalogue.h b/src/fdb5/remote/client/RemoteCatalogue.h index 1d10b9716..29631dab5 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.h +++ b/src/fdb5/remote/client/RemoteCatalogue.h @@ -1,12 +1,22 @@ #pragma once -#include "fdb5/api/FDBStats.h" +#include "fdb5/api/helpers/ControlIterator.h" +#include "fdb5/config/Config.h" #include "fdb5/database/Catalogue.h" +#include "fdb5/database/DbStats.h" #include "fdb5/database/Index.h" #include "fdb5/database/Store.h" #include "fdb5/remote/client/Client.h" +#include "eckit/filesystem/URI.h" + +#include +#include +#include +#include +#include + namespace fdb5::remote { // class RemoteCatalogueArchiver; @@ -29,8 +39,8 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C void reconsolidate() override; //From CatalogueReader - DbStats stats() const override { return DbStats(); } - bool retrieve(const Key& key, Field& field) const override { return false; } + DbStats stats() const override { return {}; } + bool retrieve(const Key& /*key*/, Field& /*field*/) const override { return false; } // From Catalogue bool selectIndex(const Key& idxKey) override; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index e86d4a55c..4f2fcec2d 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -8,26 +8,44 @@ * does it submit to any jurisdiction. */ -#include -#include - -#include "eckit/log/Timer.h" - -#include "eckit/config/Resource.h" -#include "eckit/io/AIOHandle.h" -#include "eckit/io/EmptyHandle.h" -#include "eckit/runtime/Main.h" -#include "eckit/serialisation/MemoryStream.h" +#include "fdb5/remote/client/RemoteStore.h" #include "fdb5/LibFdb5.h" +#include "fdb5/database/Field.h" #include "fdb5/database/FieldLocation.h" -#include "fdb5/io/FDBFileHandle.h" +#include "fdb5/database/Store.h" #include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" #include "fdb5/remote/RemoteFieldLocation.h" -#include "fdb5/remote/client/RemoteStore.h" +#include "fdb5/remote/client/Client.h" #include "fdb5/rules/Rule.h" -#include +#include "eckit/exception/Exceptions.h" +#include "eckit/filesystem/URI.h" +#include "eckit/io/Length.h" +#include "eckit/io/Offset.h" +#include "eckit/log/Log.h" +#include "eckit/net/Endpoint.h" +#include "eckit/runtime/Main.h" +#include "eckit/serialisation/MemoryStream.h" +#include "eckit/serialisation/Reanimator.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include using namespace eckit; @@ -252,9 +270,9 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length keyStream << dbKey_; keyStream << key; - Connection::Payload payloads; - payloads.push_back(std::pair{keyBuffer, keyStream.position()}); - payloads.push_back(std::pair{data, length}); + std::vector payloads; + payloads.emplace_back(keyStream.position(), keyBuffer.data()); + payloads.emplace_back(length, data); dataWrite(Message::Blob, id, payloads); diff --git a/src/fdb5/remote/server/CatalogueHandler.cc b/src/fdb5/remote/server/CatalogueHandler.cc index 22cccf76b..9a3dc8f80 100644 --- a/src/fdb5/remote/server/CatalogueHandler.cc +++ b/src/fdb5/remote/server/CatalogueHandler.cc @@ -8,16 +8,22 @@ * does it submit to any jurisdiction. */ -#include "eckit/config/Resource.h" +#include "fdb5/remote/server/CatalogueHandler.h" +#include "fdb5/LibFdb5.h" +#include "fdb5/api/helpers/FDBToolRequest.h" +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" +#include "fdb5/remote/server/ServerConnection.h" + #include "eckit/net/NetMask.h" +#include "eckit/net/TCPSocket.h" #include "eckit/serialisation/MemoryStream.h" -#include "fdb5/LibFdb5.h" -#include "fdb5/api/helpers/FDBToolRequest.h" -#include "fdb5/remote/server/CatalogueHandler.h" +#include +#include +#include using namespace eckit; -using metkit::mars::MarsRequest; namespace fdb5::remote { @@ -161,6 +167,7 @@ struct BaseHelper { struct ListHelper : public BaseHelper { ListIterator apiCall(FDB& fdb, const FDBToolRequest& request) const { + /// @todo remember to add level_ to this helper return fdb.list(request); } }; @@ -226,7 +233,7 @@ void CatalogueHandler::forwardApiCall(uint32_t clientID, uint32_t requestID, eck typename decltype(iterator)::value_type elem; while (iterator.next(elem)) { auto encoded(helper.encode(elem, *this)); - write(Message::Blob, false, clientID, requestID, std::vector>{{encoded.buf, encoded.position}}); + write(Message::Blob, false, clientID, requestID, encoded.buf, encoded.position); } write(Message::Complete, false, clientID, requestID); } diff --git a/src/fdb5/remote/server/CatalogueHandler.h b/src/fdb5/remote/server/CatalogueHandler.h index f63055b3a..33fdf825e 100644 --- a/src/fdb5/remote/server/CatalogueHandler.h +++ b/src/fdb5/remote/server/CatalogueHandler.h @@ -10,8 +10,11 @@ #pragma once -#include "fdb5/remote/server/ServerConnection.h" #include "fdb5/api/FDB.h" +#include "fdb5/database/Catalogue.h" +#include "fdb5/remote/server/ServerConnection.h" + +#include namespace fdb5::remote { @@ -24,7 +27,7 @@ struct CatalogueArchiver { bool controlConnection; bool dataConnection; - + std::unique_ptr catalogue; size_t locationsExpected; size_t locationsArchived; diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 1a6d540a3..85d24e25b 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -278,7 +278,7 @@ void ServerConnection::initialiseConnections() { LOG_DEBUG_LIB(LibFdb5) << "Protocol negotiation - configuration: " << agreedConf_ <>{{startupBuffer.data(), s.position()}}); + write(Message::Startup, true, 0, 0, startupBuffer.data(), s.position()); if (!single_) { ASSERT(dataSocketFuture.valid()); @@ -292,7 +292,7 @@ void ServerConnection::initialiseConnections() { MessageHeader dataHdr; eckit::Buffer payload2 = readData(dataHdr); - ASSERT(dataHdr.version == CurrentVersion); + ASSERT(dataHdr.version == MessageHeader::currentVersion); ASSERT(dataHdr.message == Message::Startup); ASSERT(dataHdr.requestID == 0); @@ -355,9 +355,9 @@ size_t ServerConnection::archiveThreadLoop() { const void* payloadData = charData; charData += hdr->payloadSize; - const decltype(EndMarker)* e = static_cast(static_cast(charData)); - ASSERT(*e == EndMarker); - charData += sizeof(EndMarker); + const auto* e = static_cast(static_cast(charData)); + ASSERT(*e == MessageHeader::EndMarker); + charData += MessageHeader::markerBytes; archiveBlob(elem.clientID_, elem.requestID_, payloadData, hdr->payloadSize); totalArchived += 1; diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index 39a9cbbec..1baa108b0 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -34,7 +34,7 @@ Handled StoreHandler::handleControl(Message message, uint32_t clientID, uint32_t case Message::Store: // notification that the client is starting to send data for archival archiver(); return Handled::YesAddArchiveListener; - + default: { std::stringstream ss; ss << "ERROR: Unexpected message recieved (" << message << "). ABORTING"; @@ -132,7 +132,7 @@ void StoreHandler::writeToParent(const uint32_t clientID, const uint32_t request << std::endl; while ((dataRead = dh->read(writeBuffer, writeBuffer.size())) != 0) { - write(Message::Blob, false, clientID, requestID, std::vector>{{writeBuffer, dataRead}}); + write(Message::Blob, false, clientID, requestID, writeBuffer, dataRead); } // And when we are done, add a complete message. @@ -157,7 +157,7 @@ void StoreHandler::writeToParent(const uint32_t clientID, const uint32_t request void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID, const void* data, size_t length) { - + MemoryStream s(data, length); fdb5::Key dbKey(s); @@ -168,7 +168,7 @@ void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID const char* charData = static_cast(data); // To allow pointer arithmetic Log::status() << "Archiving data: " << ss_key.str() << std::endl; - + Store& ss = store(clientID, dbKey); std::shared_ptr location = ss.archive(idxKey, charData + s.position(), length - s.position()); @@ -217,7 +217,7 @@ void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buf } bool StoreHandler::remove(bool control, uint32_t clientID) { - + std::lock_guard lock(handlerMutex_); auto it = stores_.find(clientID); if (it != stores_.end()) {