Skip to content

Commit

Permalink
feat(remoteFDB): RemoteCatalogue loads schema lazily
Browse files Browse the repository at this point in the history
related to issue of creating empty DBs
  • Loading branch information
mcakircali committed Jan 17, 2025
1 parent 77b365c commit 5c18872
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/fdb5/remote/client/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t r
}

uint32_t payloadLength = 0;
for (auto payload : payloads) {
for (const auto& payload : payloads) {
ASSERT(payload.data);
payloadLength += payload.length;
}

eckit::Buffer buffer{payloadLength};
uint32_t offset = 0;
for (auto payload : payloads) {
for (const auto& payload : payloads) {
buffer.copy(payload.data, payload.length, offset);
offset += payload.length;
}
Expand Down
59 changes: 36 additions & 23 deletions src/fdb5/remote/client/RemoteCatalogue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include "eckit/filesystem/URI.h"
#include "eckit/log/Log.h"
#include "eckit/serialisation/MemoryStream.h"
#include "fdb5/rules/Schema.h"

#include <cstddef>
#include <memory>
#include <mutex>
#include <vector>

Expand All @@ -28,18 +30,38 @@ namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config):
CatalogueImpl(key, ControlIdentifiers(), config), // xxx what are control identifiers? Setting empty here...
Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""),
config_(config), schema_(nullptr), numLocations_(0) {
namespace {

loadSchema();
Schema* fetchSchema(const Key& dbKey, const RemoteCatalogue& catalogue) {
LOG_DEBUG_LIB(LibFdb5) << "Fetching schema from remote catalogue: " << catalogue.controlEndpoint() << std::endl;

// send dbkey to remote
eckit::Buffer keyBuffer(RemoteCatalogue::defaultBufferSizeKey);
eckit::MemoryStream keyStream(keyBuffer);
keyStream << dbKey;

const auto requestID = catalogue.generateRequestID();

// receive schema from remote
auto recvBuf = catalogue.controlWriteReadResponse(Message::Schema, requestID, keyBuffer, keyStream.position());

eckit::MemoryStream schemaStream(recvBuf);
return eckit::Reanimator<Schema>::reanimate(schemaStream);
}

} // namespace

//----------------------------------------------------------------------------------------------------------------------

RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config)
: CatalogueImpl(key, {}, config), // xxx what are control identifiers? Setting empty here...
Client({config.getString("host"), config.getInt("port")}, ""),
config_(config) { }

// Catalogue(URI, Config) is only used by the Visitors to traverse the catalogue. In the remote, we use the RemoteFDB for catalogue traversal
// this ctor is here only to comply with the factory
RemoteCatalogue::RemoteCatalogue(const eckit::URI& uri, const Config& config):
Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""), config_(config), schema_(nullptr), numLocations_(0) {
RemoteCatalogue::RemoteCatalogue(const eckit::URI& /*uri*/, const Config& config)
: Client({config.getString("host"), config.getInt("port")}, ""), config_(config) {
NOTIMP;
}

Expand Down Expand Up @@ -87,7 +109,11 @@ void RemoteCatalogue::deselectIndex() {
currentIndexKey_ = Key();
}
const Schema& RemoteCatalogue::schema() const {
ASSERT(schema_);
// lazy loading schema
if (!schema_) {
schema_.reset(fetchSchema(dbKey_, *this));
ASSERT(schema_);
}
return *schema_;
}

Expand Down Expand Up @@ -124,7 +150,7 @@ bool RemoteCatalogue::exists() const {
eckit::MemoryStream sms(sendBuf);
sms << dbKey_;

eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position());
auto recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position());

eckit::MemoryStream rms(recvBuf);
rms >> result;
Expand All @@ -143,20 +169,7 @@ eckit::URI RemoteCatalogue::uri() const {
void RemoteCatalogue::loadSchema() {
// NB we're at the db level, so get the db schema. We will want to get the master schema beforehand.
// (outside of the catalogue)

if (!schema_) {
LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl;

// send dbkey to remote.
eckit::Buffer keyBuffer(defaultBufferSizeKey);
eckit::MemoryStream keyStream(keyBuffer);
keyStream << dbKey_;

eckit::Buffer buf = controlWriteReadResponse(Message::Schema, generateRequestID(), keyBuffer, keyStream.position());

eckit::MemoryStream s(buf);
schema_.reset(eckit::Reanimator<fdb5::Schema>::reanimate(s));
}
if (!schema_) { schema_.reset(fetchSchema(dbKey_, *this)); }
}

bool RemoteCatalogue::handle(Message message, uint32_t requestID) {
Expand Down
6 changes: 4 additions & 2 deletions src/fdb5/remote/client/RemoteCatalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "fdb5/database/Catalogue.h"
#include "fdb5/database/DbStats.h"
#include "fdb5/database/Index.h"
#include "fdb5/database/Key.h"
#include "fdb5/database/Store.h"
#include "fdb5/remote/client/Client.h"

Expand All @@ -14,6 +15,7 @@
#include <cstddef>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <vector>

Expand Down Expand Up @@ -89,10 +91,10 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C
private:

Key currentIndexKey_;
std::unique_ptr<Schema> schema_;
mutable std::unique_ptr<Schema> schema_;

std::mutex archiveMutex_;
size_t numLocations_;
size_t numLocations_ {0};
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/remote/client/RemoteStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ bool RemoteStore::exists() const {
eckit::MemoryStream sms(sendBuf);
sms << dbKey_;

eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position());
auto recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position());

eckit::MemoryStream rms(recvBuf);
rms >> result;
Expand Down

0 comments on commit 5c18872

Please sign in to comment.