Skip to content

Commit

Permalink
fix(remoteFDB): Message class type payload
Browse files Browse the repository at this point in the history
and other improvements
  • Loading branch information
mcakircali committed Jan 16, 2025
1 parent e28bde6 commit 59f6bc9
Show file tree
Hide file tree
Showing 15 changed files with 222 additions and 148 deletions.
49 changes: 25 additions & 24 deletions src/fdb5/remote/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#include "fdb5/LibFdb5.h"
#include "fdb5/remote/Connection.h"
#include "fdb5/remote/Messages.h"
#include <cstdint>
#include <mutex>
#include <string_view>

namespace fdb5::remote {

Expand Down Expand Up @@ -32,7 +35,7 @@ void Connection::teardown() {

//----------------------------------------------------------------------------------------------------------------------

void Connection::writeUnsafe(const bool control, const void* data, const size_t length) const {
void Connection::writeUnsafe(const bool control, const void* const data, const size_t length) const {
long written = 0;
if (control || single_) {
written = controlSocket().write(data, length);
Expand Down Expand Up @@ -70,14 +73,14 @@ void Connection::readUnsafe(bool control, void* data, size_t length) const {
}
}

eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const {
eckit::Buffer Connection::read(const bool control, MessageHeader& hdr) const {
eckit::FixedString<4> tail;

std::lock_guard<std::mutex> lock((control || single_) ? readControlMutex_ : readDataMutex_);
readUnsafe(control, &hdr, sizeof(hdr));

ASSERT(hdr.marker == StartMarker);
ASSERT(hdr.version == CurrentVersion);
ASSERT(hdr.marker == MessageHeader::StartMarker);
ASSERT(hdr.version == MessageHeader::currentVersion);
ASSERT(single_ || hdr.control() == control);

eckit::Buffer payload{hdr.payloadSize};
Expand All @@ -86,7 +89,7 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const {
}
// Ensure we have consumed exactly the correct amount from the socket.
readUnsafe(control, &tail, sizeof(tail));
ASSERT(tail == EndMarker);
ASSERT(tail == MessageHeader::EndMarker);

if (hdr.message == Message::Error) {

Expand All @@ -99,38 +102,36 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const {
return payload;
}

void Connection::write(Message msg, const bool control, const uint32_t clientID, const uint32_t requestID, Payload data) const {
void Connection::write(const Message msg,
const bool control,
const uint32_t clientID,
const uint32_t requestID,
const PayloadList payloads) const {

uint32_t payloadLength = 0;
for (auto d: data) {
ASSERT(d.first);
payloadLength += d.second;
for (const auto& payload : payloads) {
ASSERT(payload.data);
payloadLength += payload.length;
}

MessageHeader message{msg, control, clientID, requestID, payloadLength};

LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",control=" << control << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl;
LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID()
<< ",control=" << control << ",requestID=" << requestID << ",payloadsSize=" << payloads.size()
<< ",payloadLength=" << payloadLength << "]" << std::endl;

std::lock_guard<std::mutex> lock((control || single_) ? controlMutex_ : dataMutex_);

writeUnsafe(control, &message, sizeof(message));
for (auto d: data) {
writeUnsafe(control, d.first, d.second);
}
writeUnsafe(control, &EndMarker, sizeof(EndMarker));
}

void Connection::write(Message msg,
const bool control,
const uint32_t clientID,
const uint32_t requestID,
const void* data,
const uint32_t length) const {
write(msg, control, clientID, requestID, {{data, length}});
for (const auto& payload : payloads) { writeUnsafe(control, payload.data, payload.length); }

writeUnsafe(control, &MessageHeader::EndMarker, MessageHeader::markerBytes);
}

void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) const {
void Connection::error(std::string_view msg, uint32_t clientID, uint32_t requestID) const {
eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl;
write(Message::Error, false, clientID, requestID, {{msg.c_str(), msg.length()}});
write(Message::Error, false, clientID, requestID, msg.data(), msg.length());
}

eckit::Buffer Connection::readControl(MessageHeader& hdr) const {
Expand Down
21 changes: 12 additions & 9 deletions src/fdb5/remote/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@

#pragma once

#include "eckit/serialisation/MemoryStream.h"
#include "fdb5/remote/Messages.h"

#include "eckit/exception/Exceptions.h"
#include "eckit/net/TCPSocket.h"
#include "eckit/os/BackTrace.h"

#include <cstddef>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <utility>
#include <string_view>
#include <vector>

namespace eckit {
Expand Down Expand Up @@ -47,26 +49,28 @@ class TCPException : public eckit::Exception {
class Connection : eckit::NonCopyable {

public: // types
using Payload = std::vector<std::pair<const void*, uint32_t>>;
using PayloadList = std::vector<Payload>;

public: // methods
Connection();

virtual ~Connection() = default;

void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, Payload data = {}) const;
void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, PayloadList payloads = {}) const;

void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const;
void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const {
write(msg, control, clientID, requestID, {{length, data}});
}

void error(const std::string& msg, uint32_t clientID, uint32_t requestID) const;
void error(std::string_view msg, uint32_t clientID, uint32_t requestID) const;

eckit::Buffer readControl(MessageHeader& hdr) const;

eckit::Buffer readData(MessageHeader& hdr) const;

void teardown();

private: // methods
private: // methods
eckit::Buffer read(bool control, MessageHeader& hdr) const;

void writeUnsafe(bool control, const void* data, size_t length) const;
Expand All @@ -77,11 +81,10 @@ class Connection : eckit::NonCopyable {

virtual const eckit::net::TCPSocket& dataSocket() const = 0;

protected: // members

protected: // members
bool single_;

private: // members
private: // members
mutable std::mutex controlMutex_;
mutable std::mutex dataMutex_;
mutable std::mutex readControlMutex_;
Expand Down
4 changes: 1 addition & 3 deletions src/fdb5/remote/Messages.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

#include "fdb5/remote/Messages.h"

using namespace eckit;

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -62,7 +60,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) {

MessageHeader::MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize) :
marker(StartMarker),
version(CurrentVersion),
version(currentVersion),
message(message),
clientID_((clientID<<1) + (control ? 1 : 0)),
requestID(requestID),
Expand Down
59 changes: 32 additions & 27 deletions src/fdb5/remote/Messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

#pragma once

#include <cstdint>
#include <cmath>
#include <cstddef>
#include <cstdint>

#include "eckit/types/FixedString.h"

Expand All @@ -31,10 +32,12 @@ namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

const static eckit::FixedString<4> StartMarker {"SFDB"};
const static eckit::FixedString<4> EndMarker {"EFDB"};
struct Payload {
Payload(std::size_t length, const void* data) : length {length}, data {data} { }

constexpr uint16_t CurrentVersion = 12;
std::size_t length {0};
const void* data {nullptr};
};

enum class Message : uint16_t {

Expand Down Expand Up @@ -76,21 +79,31 @@ enum class Message : uint16_t {

std::ostream& operator<<(std::ostream& s, const Message& m);

//----------------------------------------------------------------------------------------------------------------------

// Header used for all messages
class MessageHeader {

public: // methods
public: // types
constexpr static uint16_t currentVersion = 12;

constexpr static const auto hashBytes = 16;

constexpr static const auto markerBytes = 4;

using MarkerType = eckit::FixedString<markerBytes>;

using HashType = eckit::FixedString<hashBytes>;

MessageHeader() :
version(CurrentVersion),
message(Message::None),
clientID_(0),
requestID(0),
payloadSize(0) {}
inline static const MarkerType StartMarker {"SFDB"};

inline static const MarkerType EndMarker {"EFDB"};

public: // methods
MessageHeader() = default;

MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize);

bool control() const {
return ((clientID_ & 0x00000001) == 1);
}
Expand All @@ -99,21 +112,13 @@ class MessageHeader {
}

public:

eckit::FixedString<4> marker; // 4 bytes --> 4

uint16_t version; // 2 bytes --> 6

Message message; // 2 bytes --> 8

uint32_t clientID_; // 4 bytes --> 12

uint32_t requestID; // 4 bytes --> 16

uint32_t payloadSize; // 4 bytes --> 20

eckit::FixedString<16> hash; // 16 bytes --> 36

MarkerType marker; // 4 bytes --> 4
uint16_t version {currentVersion}; // 2 bytes --> 6
Message message {Message::None}; // 2 bytes --> 8
uint32_t clientID_ {0}; // 4 bytes --> 12
uint32_t requestID {0}; // 4 bytes --> 16
uint32_t payloadSize {0}; // 4 bytes --> 20
HashType hash; // 16 bytes --> 36
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
36 changes: 17 additions & 19 deletions src/fdb5/remote/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,45 +55,43 @@ Client::~Client() {
connection_.remove(id_);
}

void Client::controlWriteCheckResponse(Message msg,
uint32_t requestID,
bool dataListener,
const void* payload,
uint32_t payloadLength) const {
void Client::controlWriteCheckResponse(const Message msg,
const uint32_t requestID,
const bool dataListener,
const void* const payload,
const uint32_t payloadLength) const {

ASSERT(requestID);
ASSERT(!(!payloadLength ^ !payload));
std::lock_guard<std::mutex> lock(blockingRequestMutex_);

Connection::Payload data;
if (payloadLength != 0) { data.push_back(std::make_pair(payload, payloadLength)); }
PayloadList payloads;
if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); }

auto f = connection_.controlWrite(*this, msg, requestID, dataListener, data);
auto f = connection_.controlWrite(*this, msg, requestID, dataListener, payloads);
f.wait();
ASSERT(f.get().size() == 0);
}

eckit::Buffer Client::controlWriteReadResponse(Message msg,
uint32_t requestID,
const void* payload,
uint32_t payloadLength) const {
eckit::Buffer Client::controlWriteReadResponse(const Message msg,
const uint32_t requestID,
const void* const payload,
const uint32_t payloadLength) const {

ASSERT(requestID);
ASSERT(!(!payloadLength ^ !payload));
std::lock_guard<std::mutex> lock(blockingRequestMutex_);

Connection::Payload data {};
if (payloadLength) {
data.push_back(std::make_pair(payload, payloadLength));
}
PayloadList payloads;
if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); }

std::future<eckit::Buffer> f = connection_.controlWrite(*this, msg, requestID, false, data);
auto f = connection_.controlWrite(*this, msg, requestID, false, payloads);
f.wait();
return eckit::Buffer{f.get()};
}

void Client::dataWrite(remote::Message msg, uint32_t requestID, Connection::Payload data) {
connection_.dataWrite(*this, msg, requestID, data);
void Client::dataWrite(remote::Message msg, uint32_t requestID, PayloadList payloads) {
connection_.dataWrite(*this, msg, requestID, std::move(payloads));
}

} // namespace fdb5::remote
8 changes: 7 additions & 1 deletion src/fdb5/remote/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
#include "fdb5/remote/Messages.h"
#include "fdb5/remote/client/ClientConnection.h"

#include <mutex>
#include <utility> // std::pair
#include <vector>

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------
Expand All @@ -30,6 +34,8 @@ class RemoteFDBException : public eckit::RemoteException {
//----------------------------------------------------------------------------------------------------------------------

class Client : eckit::NonCopyable {
using PayloadList = Connection::PayloadList;

public:
Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint);

Expand Down Expand Up @@ -59,7 +65,7 @@ class Client : eckit::NonCopyable {
const void* payload = nullptr,
uint32_t payloadLength = 0) const;

void dataWrite(Message msg, uint32_t requestID, Connection::Payload data = {});
void dataWrite(Message msg, uint32_t requestID, PayloadList payloads = {});

// handlers for incoming messages - to be defined in the client class
virtual bool handle(Message message, uint32_t requestID) = 0;
Expand Down
Loading

0 comments on commit 59f6bc9

Please sign in to comment.