Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remoteFDB - constness improvements #67

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions src/fdb5/remote/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@

#include "fdb5/LibFdb5.h"
#include "fdb5/remote/Connection.h"
#include "fdb5/remote/Messages.h"
#include <cstdint>
#include <mutex>
#include <string_view>

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

Connection::Connection() : single_(false) {}
Connection::~Connection() {}
Connection::Connection() : single_(false) { }

void Connection::teardown() {

Expand All @@ -19,20 +22,20 @@ void Connection::teardown() {
// all done - disconnecting
Connection::write(Message::Exit, false, 0, 0);
} catch(...) {
// if connection is already down, no need to escalate
// if connection is already down, no need to escalate
}
}
try {
// all done - disconnecting
Connection::write(Message::Exit, true, 0, 0);
} catch(...) {
// if connection is already down, no need to escalate
// if connection is already down, no need to escalate
}
}

//----------------------------------------------------------------------------------------------------------------------

void Connection::writeUnsafe(bool control, const void* data, size_t length) {
void Connection::writeUnsafe(const bool control, const void* const data, const size_t length) const {
long written = 0;
if (control || single_) {
written = controlSocket().write(data, length);
Expand All @@ -51,7 +54,7 @@ void Connection::writeUnsafe(bool control, const void* data, size_t length) {
}
}

void Connection::readUnsafe(bool control, void* data, size_t length) {
void Connection::readUnsafe(bool control, void* data, size_t length) const {
long read = 0;
if (control || single_) {
read = controlSocket().read(data, length);
Expand All @@ -70,14 +73,14 @@ void Connection::readUnsafe(bool control, void* data, size_t length) {
}
}

eckit::Buffer Connection::read(bool control, MessageHeader& hdr) {
eckit::Buffer Connection::read(const bool control, MessageHeader& hdr) const {
eckit::FixedString<4> tail;

std::lock_guard<std::mutex> lock((control || single_) ? readControlMutex_ : readDataMutex_);
readUnsafe(control, &hdr, sizeof(hdr));

ASSERT(hdr.marker == StartMarker);
ASSERT(hdr.version == CurrentVersion);
ASSERT(hdr.marker == MessageHeader::StartMarker);
ASSERT(hdr.version == MessageHeader::currentVersion);
ASSERT(single_ || hdr.control() == control);

eckit::Buffer payload{hdr.payloadSize};
Expand All @@ -86,7 +89,7 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) {
}
// Ensure we have consumed exactly the correct amount from the socket.
readUnsafe(control, &tail, sizeof(tail));
ASSERT(tail == EndMarker);
ASSERT(tail == MessageHeader::EndMarker);

if (hdr.message == Message::Error) {

Expand All @@ -99,40 +102,43 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) {
return payload;
}

void Connection::write(remote::Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) {
write(msg, control, clientID, requestID, std::vector<std::pair<const void*, uint32_t>>{{data, length}});
}

void Connection::write(remote::Message msg, bool control, uint32_t clientID, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data) {
void Connection::write(const Message msg,
const bool control,
const uint32_t clientID,
const uint32_t requestID,
const PayloadList payloads) const {

uint32_t payloadLength = 0;
for (auto d: data) {
ASSERT(d.first);
payloadLength += d.second;
for (const auto& payload : payloads) {
ASSERT(payload.data);
payloadLength += payload.length;
}

MessageHeader message{msg, control, clientID, requestID, payloadLength};

LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",control=" << control << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl;
LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID()
<< ",control=" << control << ",requestID=" << requestID << ",payloadsSize=" << payloads.size()
<< ",payloadLength=" << payloadLength << "]" << std::endl;

std::lock_guard<std::mutex> lock((control || single_) ? controlMutex_ : dataMutex_);

writeUnsafe(control, &message, sizeof(message));
for (auto d: data) {
writeUnsafe(control, d.first, d.second);
}
writeUnsafe(control, &EndMarker, sizeof(EndMarker));

for (const auto& payload : payloads) { writeUnsafe(control, payload.data, payload.length); }

writeUnsafe(control, &MessageHeader::EndMarker, MessageHeader::markerBytes);
}

void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) {
void Connection::error(std::string_view msg, uint32_t clientID, uint32_t requestID) const {
eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl;
write(Message::Error, false, clientID, requestID, std::vector<std::pair<const void*, uint32_t>>{{msg.c_str(), msg.length()}});
write(Message::Error, false, clientID, requestID, msg.data(), msg.length());
}

eckit::Buffer Connection::readControl(MessageHeader& hdr) {
eckit::Buffer Connection::readControl(MessageHeader& hdr) const {
return read(true, hdr);
}

eckit::Buffer Connection::readData(MessageHeader& hdr) {
eckit::Buffer Connection::readData(MessageHeader& hdr) const {
return read(false, hdr);
}

Expand Down
56 changes: 35 additions & 21 deletions src/fdb5/remote/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@

#pragma once

#include "eckit/serialisation/MemoryStream.h"
#include "fdb5/remote/Messages.h"

#include "eckit/exception/Exceptions.h"
#include "eckit/net/TCPSocket.h"
#include "eckit/os/BackTrace.h"

#include "fdb5/remote/Messages.h"
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <string_view>
#include <vector>

namespace eckit {

Expand All @@ -40,41 +48,47 @@ class TCPException : public eckit::Exception {

class Connection : eckit::NonCopyable {

public: // types
using PayloadList = std::vector<Payload>;

public: // methods
Connection();
virtual ~Connection();

void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length);
void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data = {});
virtual ~Connection() = default;

void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, PayloadList payloads = {}) const;

void error(const std::string& msg, uint32_t clientID, uint32_t requestID);
void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const {
write(msg, control, clientID, requestID, {{length, data}});
}

eckit::Buffer readControl(MessageHeader& hdr);
eckit::Buffer readData(MessageHeader& hdr);
void error(std::string_view msg, uint32_t clientID, uint32_t requestID) const;

void teardown();
eckit::Buffer readControl(MessageHeader& hdr) const;

private: // methods
eckit::Buffer readData(MessageHeader& hdr) const;

eckit::Buffer read(bool control, MessageHeader& hdr);
void teardown();

void writeUnsafe(bool control, const void* data, size_t length);
void readUnsafe(bool control, void* data, size_t length);
private: // methods
eckit::Buffer read(bool control, MessageHeader& hdr) const;

virtual eckit::net::TCPSocket& controlSocket() = 0;
virtual eckit::net::TCPSocket& dataSocket() = 0;
void writeUnsafe(bool control, const void* data, size_t length) const;

protected: // members
void readUnsafe(bool control, void* data, size_t length) const;

bool single_;
virtual const eckit::net::TCPSocket& controlSocket() const = 0;

private: // members
virtual const eckit::net::TCPSocket& dataSocket() const = 0;

std::mutex controlMutex_;
std::mutex dataMutex_;
std::mutex readControlMutex_;
std::mutex readDataMutex_;
protected: // members
bool single_;

private: // members
mutable std::mutex controlMutex_;
mutable std::mutex dataMutex_;
mutable std::mutex readControlMutex_;
mutable std::mutex readDataMutex_;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
4 changes: 1 addition & 3 deletions src/fdb5/remote/Messages.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

#include "fdb5/remote/Messages.h"

using namespace eckit;

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -62,7 +60,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) {

MessageHeader::MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize) :
marker(StartMarker),
version(CurrentVersion),
version(currentVersion),
message(message),
clientID_((clientID<<1) + (control ? 1 : 0)),
requestID(requestID),
Expand Down
59 changes: 32 additions & 27 deletions src/fdb5/remote/Messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

#pragma once

#include <cstdint>
#include <cmath>
#include <cstddef>
#include <cstdint>

#include "eckit/types/FixedString.h"

Expand All @@ -31,10 +32,12 @@ namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

const static eckit::FixedString<4> StartMarker {"SFDB"};
const static eckit::FixedString<4> EndMarker {"EFDB"};
struct Payload {
Payload(std::size_t length, const void* data) : length {length}, data {data} { }

constexpr uint16_t CurrentVersion = 12;
std::size_t length {0};
const void* data {nullptr};
};

enum class Message : uint16_t {

Expand Down Expand Up @@ -76,21 +79,31 @@ enum class Message : uint16_t {

std::ostream& operator<<(std::ostream& s, const Message& m);

//----------------------------------------------------------------------------------------------------------------------

// Header used for all messages
class MessageHeader {

public: // methods
public: // types
constexpr static uint16_t currentVersion = 12;

constexpr static const auto hashBytes = 16;

constexpr static const auto markerBytes = 4;

using MarkerType = eckit::FixedString<markerBytes>;

using HashType = eckit::FixedString<hashBytes>;

MessageHeader() :
version(CurrentVersion),
message(Message::None),
clientID_(0),
requestID(0),
payloadSize(0) {}
inline static const MarkerType StartMarker {"SFDB"};

inline static const MarkerType EndMarker {"EFDB"};

public: // methods
MessageHeader() = default;

MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize);

bool control() const {
return ((clientID_ & 0x00000001) == 1);
}
Expand All @@ -99,21 +112,13 @@ class MessageHeader {
}

public:

eckit::FixedString<4> marker; // 4 bytes --> 4

uint16_t version; // 2 bytes --> 6

Message message; // 2 bytes --> 8

uint32_t clientID_; // 4 bytes --> 12

uint32_t requestID; // 4 bytes --> 16

uint32_t payloadSize; // 4 bytes --> 20

eckit::FixedString<16> hash; // 16 bytes --> 36

MarkerType marker; // 4 bytes --> 4
uint16_t version {currentVersion}; // 2 bytes --> 6
Message message {Message::None}; // 2 bytes --> 8
uint32_t clientID_ {0}; // 4 bytes --> 12
uint32_t requestID {0}; // 4 bytes --> 16
uint32_t payloadSize {0}; // 4 bytes --> 20
HashType hash; // 16 bytes --> 36
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
Loading
Loading