Skip to content

Add callback interface class for serverside callback registration #63

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/fdb5/api/FDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ FDB::FDB(const Config &config) :
internal_(FDBFactory::instance().build(config)),
dirty_(false),
reportStats_(config.getBool("statistics", false)) {
LibFdb5::instance().constructorCallback()(*this);
LibFdb5::instance().constructorCallback()(*internal_);
}


Expand Down Expand Up @@ -290,7 +290,6 @@ void FDB::flush() {
timer.start();

internal_->flush();
flushCallback_();
dirty_ = false;

timer.stop();
Expand Down Expand Up @@ -328,12 +327,12 @@ bool FDB::enabled(const ControlIdentifier& controlIdentifier) const {
return internal_->enabled(controlIdentifier);
}

void FDB::registerArchiveCallback(ArchiveCallback callback) { // todo rename
void FDB::registerArchiveCallback(ArchiveCallback callback) {
internal_->registerArchiveCallback(callback);
}

void FDB::registerFlushCallback(FlushCallback callback) { // todo rename
flushCallback_ = callback;
void FDB::registerFlushCallback(FlushCallback callback) {
internal_->registerFlushCallback(callback);
}

//----------------------------------------------------------------------------------------------------------------------
Expand Down
2 changes: 0 additions & 2 deletions src/fdb5/api/FDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ class FDB {
bool reportStats_;

FDBStats stats_;

FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
5 changes: 1 addition & 4 deletions src/fdb5/api/FDBFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class FDBToolRequest;

/// The base class that FDB implementations are derived from

class FDBBase : private eckit::NonCopyable {
class FDBBase : private eckit::NonCopyable, public CallbackInterface {

public: // methods

Expand Down Expand Up @@ -95,8 +95,6 @@ class FDBBase : private eckit::NonCopyable {

virtual AxesIterator axesIterator(const FDBToolRequest& request, int axes) = 0;

void registerArchiveCallback(ArchiveCallback callback) {callback_ = callback;}

// -------------- API management ----------------------------

/// ID used for hashing in the Rendezvous hash. Should be unique amongst those used
Expand Down Expand Up @@ -133,7 +131,6 @@ class FDBBase : private eckit::NonCopyable {

bool disabled_;

ArchiveCallback callback_ = CALLBACK_NOOP;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/api/LocalFDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void LocalFDB::archive(const Key& key, const void* data, size_t length) {

if (!archiver_) {
LOG_DEBUG_LIB(LibFdb5) << *this << ": Constructing new archiver" << std::endl;
archiver_.reset(new Archiver(config_, callback_));
archiver_.reset(new Archiver(config_, archiveCallback_));
}

archiver_->archive(key, data, length);
Expand Down Expand Up @@ -133,6 +133,7 @@ AxesIterator LocalFDB::axesIterator(const FDBToolRequest& request, int level) {
void LocalFDB::flush() {
if (archiver_) {
archiver_->flush();
flushCallback_();
}
}

Expand Down
23 changes: 20 additions & 3 deletions src/fdb5/api/helpers/Callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,30 @@
namespace fdb5 {

class FDB;
class CallbackInterface;

using ArchiveCallback = std::function<void(const Key& key, const void* data, size_t length, std::future<std::shared_ptr<const FieldLocation>>)>;
using FlushCallback = std::function<void()>;
using ConstructorCallback = std::function<void(FDB&)>;
using ConstructorCallback = std::function<void(CallbackInterface&)>;

static const ArchiveCallback CALLBACK_NOOP = [](const Key& key, const void* data, size_t length, std::future<std::shared_ptr<const FieldLocation>>) {};
static const ArchiveCallback CALLBACK_ARCHIVE_NOOP = [](const Key& key, const void* data, size_t length, std::future<std::shared_ptr<const FieldLocation>>) {};
static const FlushCallback CALLBACK_FLUSH_NOOP = []() {};
static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](FDB&) {};
static const ConstructorCallback CALLBACK_CONSTRUCTOR_NOOP = [](CallbackInterface&) {};

// -------------------------------------------------------------------------------------------------

// This class provides a common interface for registering callbacks with an FDB object or a Store/Catalogue Handler.
class CallbackInterface {
public:

void registerFlushCallback(FlushCallback callback) {flushCallback_ = callback;}
void registerArchiveCallback(ArchiveCallback callback) {archiveCallback_ = callback;}

protected:

FlushCallback flushCallback_ = CALLBACK_FLUSH_NOOP;
ArchiveCallback archiveCallback_ = CALLBACK_ARCHIVE_NOOP;

};

} // namespace fdb5
2 changes: 1 addition & 1 deletion src/fdb5/database/ArchiveVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ArchiveVisitor : public BaseArchiveVisitor {

public: // methods

ArchiveVisitor(Archiver& owner, const Key& dataKey, const void* data, size_t size, const ArchiveCallback& callback = CALLBACK_NOOP);
ArchiveVisitor(Archiver& owner, const Key& dataKey, const void* data, size_t size, const ArchiveCallback& callback = CALLBACK_ARCHIVE_NOOP);

protected: // methods

Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/Archiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class Archiver : public eckit::NonCopyable {

public: // methods

Archiver(const Config& dbConfig = Config().expandConfig(), const ArchiveCallback& callback = CALLBACK_NOOP);
Archiver(const Config& dbConfig = Config().expandConfig(), const ArchiveCallback& callback = CALLBACK_ARCHIVE_NOOP);

virtual ~Archiver();

Expand Down
20 changes: 17 additions & 3 deletions src/fdb5/remote/server/StoreHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ using metkit::mars::MarsRequest;
namespace fdb5::remote {

StoreHandler::StoreHandler(eckit::net::TCPSocket& socket, const Config& config):
ServerConnection(socket, config) {}
ServerConnection(socket, config) {
LibFdb5::instance().constructorCallback()(*this);
}

StoreHandler::~StoreHandler() {}

Expand Down Expand Up @@ -169,9 +171,19 @@ void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID

Store& ss = store(clientID, dbKey);

std::unique_ptr<const FieldLocation> location = ss.archive(idxKey, charData + s.position(), length - s.position());
std::shared_ptr<const FieldLocation> location = ss.archive(idxKey, charData + s.position(), length - s.position());

std::promise<std::shared_ptr<const FieldLocation>> promise;
promise.set_value(location);

eckit::StringDict dict = dbKey.keyDict();
dict.insert(idxKey.keyDict().begin(), idxKey.keyDict().end());
const Key fullkey(dict); /// @note: we do not have the third level of the key.

archiveCallback_(fullkey, charData + s.position(), length - s.position(), promise.get_future());

Log::status() << "Archiving done: " << ss_key.str() << std::endl;

eckit::Buffer buffer(16 * 1024);
MemoryStream stream(buffer);
stream << (*location);
Expand All @@ -196,6 +208,8 @@ void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buf
auto it = stores_.find(clientID);
ASSERT(it != stores_.end());
it->second.store->flush();

flushCallback_();
}

Log::info() << "Flush complete" << std::endl;
Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/remote/server/StoreHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#pragma once

#include "fdb5/api/helpers/Callback.h"
#include "fdb5/database/Store.h"
#include "fdb5/remote/server/ServerConnection.h"

Expand All @@ -29,7 +30,7 @@ struct StoreHelper {
};

//----------------------------------------------------------------------------------------------------------------------
class StoreHandler : public ServerConnection {
class StoreHandler : public ServerConnection, public CallbackInterface {
public: // methods

StoreHandler(eckit::net::TCPSocket& socket, const Config& config);
Expand Down
Loading