Skip to content

Commit

Permalink
chore(remoteFDB): some improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
mcakircali committed Jan 16, 2025
1 parent 9bf825b commit 17f4378
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 54 deletions.
9 changes: 7 additions & 2 deletions src/fdb5/remote/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ class RemoteFDBException : public eckit::RemoteException {

class Client : eckit::NonCopyable {
public: // types
using PayloadList = Connection::PayloadList;
using PayloadList = Connection::PayloadList;
using EndpointList = std::vector<std::pair<eckit::net::Endpoint, std::string>>;

static constexpr size_t defaultBufferSizeArchive = 8192;
static constexpr size_t defaultBufferSizeFlush = 1024;
static constexpr size_t defaultBufferSizeKey = 4096;

public: // methods
Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint);

Client(const std::vector<std::pair<eckit::net::Endpoint, std::string>>& endpoints);
Client(const EndpointList& endpoints);

virtual ~Client();

Expand Down
26 changes: 9 additions & 17 deletions src/fdb5/remote/client/RemoteCatalogue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

namespace {

constexpr size_t archiveBufferSize = 8192;
constexpr size_t keyBufferSize = 4096;
}

//----------------------------------------------------------------------------------------------------------------------

RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config):
CatalogueImpl(key, ControlIdentifiers(), config), // xxx what are control identifiers? Setting empty here...
Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""),
Expand Down Expand Up @@ -65,7 +57,7 @@ void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::share
numLocations_++;
}

Buffer buffer(archiveBufferSize);
Buffer buffer(defaultBufferSizeArchive);
MemoryStream stream(buffer);
stream << idxKey;
stream << datumKey;
Expand Down Expand Up @@ -107,8 +99,8 @@ void RemoteCatalogue::flush(size_t archivedFields) {
// Flush only does anything if there is an ongoing archive();
if (numLocations_ > 0) {

Buffer sendBuf(1024);
MemoryStream s(sendBuf);
eckit::Buffer sendBuf(defaultBufferSizeFlush);
eckit::MemoryStream s(sendBuf);
s << numLocations_;

LOG_DEBUG_LIB(LibFdb5) << " RemoteCatalogue::flush - flushing " << numLocations_ << " fields" << std::endl;
Expand All @@ -126,18 +118,18 @@ void RemoteCatalogue::close() {NOTIMP;}

bool RemoteCatalogue::exists() const {

bool exists = false;
bool result = false;

Buffer sendBuf(keyBufferSize);
MemoryStream sms(sendBuf);
eckit::Buffer sendBuf(defaultBufferSizeKey);
eckit::MemoryStream sms(sendBuf);
sms << dbKey_;

eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position());

eckit::MemoryStream rms(recvBuf);
rms >> exists;
rms >> result;

return exists;
return result;
}

void RemoteCatalogue::checkUID() const {
Expand All @@ -156,7 +148,7 @@ void RemoteCatalogue::loadSchema() {
LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl;

// send dbkey to remote.
eckit::Buffer keyBuffer(keyBufferSize);
eckit::Buffer keyBuffer(defaultBufferSizeKey);
eckit::MemoryStream keyStream(keyBuffer);
keyStream << dbKey_;

Expand Down
16 changes: 7 additions & 9 deletions src/fdb5/remote/client/RemoteStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,16 @@ class FDBRemoteDataHandle : public DataHandle {
bool complete_;
};

using EndPointList = std::vector<std::pair<eckit::net::Endpoint, std::string>>;

EndPointList storeEndpoints(const Config& config) {
Client::EndpointList storeEndpoints(const Config& config) {

ASSERT(config.has("stores"));
ASSERT(config.has("fieldLocationEndpoints"));
std::vector<std::string> stores = config.getStringVector("stores");
std::vector<std::string> fieldLocationEndpoints = config.getStringVector("fieldLocationEndpoints");
const auto stores = config.getStringVector("stores");
const auto fieldLocationEndpoints = config.getStringVector("fieldLocationEndpoints");

ASSERT(stores.size() == fieldLocationEndpoints.size());

EndPointList out;
Client::EndpointList out;
out.reserve(stores.size());
for (size_t i = 0; i < stores.size(); ++i) {
out.emplace_back(eckit::net::Endpoint {stores.at(i)}, fieldLocationEndpoints.at(i));
Expand Down Expand Up @@ -269,8 +267,8 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length
// store the callback, associated with the request id - to be done BEFORE sending the data
locations_.archive(id, catalogue_archive);

Buffer keyBuffer(4096);
MemoryStream keyStream(keyBuffer);
eckit::Buffer keyBuffer(defaultBufferSizeKey);
eckit::MemoryStream keyStream(keyBuffer);
keyStream << dbKey_;
keyStream << key;

Expand Down Expand Up @@ -300,7 +298,7 @@ size_t RemoteStore::flush() {

size_t locations = complete ? locations_.archived() : locations_.wait();

Buffer sendBuf(1024);
Buffer sendBuf(defaultBufferSizeFlush);
MemoryStream s(sendBuf);
s << locations;

Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/remote/server/CatalogueHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ void CatalogueHandler::exists(uint32_t clientID, uint32_t requestID, eckit::Buff
bool exists = false;

{
MemoryStream stream(payload);
const Key dbKey(stream);
eckit::MemoryStream stream(payload);
const Key dbKey(stream);
exists = CatalogueReaderFactory::instance().build(dbKey, config_)->exists();
}

Expand Down
20 changes: 14 additions & 6 deletions src/fdb5/remote/server/StoreHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,33 @@
* does it submit to any jurisdiction.
*/

#include "eckit/config/Resource.h"
#include "eckit/serialisation/MemoryStream.h"
#include "fdb5/remote/server/StoreHandler.h"

#include "fdb5/LibFdb5.h"
#include "fdb5/database/Key.h"
#include "fdb5/database/Store.h"
#include "fdb5/remote/server/StoreHandler.h"
#include "fdb5/remote/Messages.h"
#include "fdb5/remote/server/ServerConnection.h"

#include "eckit/net/TCPSocket.h"
#include "eckit/serialisation/MemoryStream.h"

#include <cstdint>
#include <memory>
#include <mutex>
#include <utility>

using namespace eckit;
using metkit::mars::MarsRequest;

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

StoreHandler::StoreHandler(eckit::net::TCPSocket& socket, const Config& config):
ServerConnection(socket, config) {
LibFdb5::instance().constructorCallback()(*this);
}

StoreHandler::~StoreHandler() {}

Handled StoreHandler::handleControl(Message message, uint32_t clientID, uint32_t requestID) {

try {
Expand Down
42 changes: 24 additions & 18 deletions src/fdb5/remote/server/StoreHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,58 @@
#include "fdb5/database/Store.h"
#include "fdb5/remote/server/ServerConnection.h"

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

struct StoreHelper {
StoreHelper(bool dataConnection, const Key& dbKey, const Config& config) :
controlConnection(true), dataConnection(dataConnection),
store(StoreFactory::instance().build(dbKey, config)) {}
#include <cstdint>
#include <map>
#include <memory>

bool controlConnection;
bool dataConnection;

std::unique_ptr<Store> store;
};
namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------
class StoreHandler : public ServerConnection, public CallbackRegistry {
public: // methods

StoreHandler(eckit::net::TCPSocket& socket, const Config& config);
~StoreHandler() override;

private: // methods

Handled handleControl(Message message, uint32_t clientID, uint32_t requestID) override;
Handled handleControl(Message message, uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) override;

void flush(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload);

void read(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload);

void archiveBlob(const uint32_t clientID, const uint32_t requestID, const void* data, size_t length) override;
void exists(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload) const;

void archiveBlob(uint32_t clientID, uint32_t requestID, const void* data, size_t length) override;

void readLocationThreadLoop();
void writeToParent(const uint32_t clientID, const uint32_t requestID, std::unique_ptr<eckit::DataHandle> dh);

void writeToParent(uint32_t clientID, uint32_t requestID, std::unique_ptr<eckit::DataHandle> dh);

bool remove(bool control, uint32_t clientID) override;

Store& store(uint32_t clientID);

Store& store(uint32_t clientID, const Key& dbKey);

private: // members

struct StoreHelper;
// clientID --> Store
std::map<uint32_t, StoreHelper> stores_;
};

//----------------------------------------------------------------------------------------------------------------------

struct StoreHandler::StoreHelper {
StoreHelper(bool dataConnection, const Key& dbKey, const Config& config)
: dataConnection(dataConnection), store(StoreFactory::instance().build(dbKey, config)) { }

bool controlConnection {true};
bool dataConnection {false};

std::unique_ptr<Store> store;
};

//----------------------------------------------------------------------------------------------------------------------

} // namespace fdb5::remote

0 comments on commit 17f4378

Please sign in to comment.