Skip to content

Commit

Permalink
addressed PR comments (mutex on flush while evicting a catalogue from…
Browse files Browse the repository at this point in the history
… cache + Client handle simplified)
  • Loading branch information
danovaro committed Dec 18, 2024
1 parent 43ce247 commit 5974d92
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 57 deletions.
4 changes: 2 additions & 2 deletions src/fdb5/api/RemoteFDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ void RemoteFDB::print(std::ostream& s) const {
}

// Client
bool RemoteFDB::handle(remote::Message message, bool control, uint32_t requestID) {
bool RemoteFDB::handle(remote::Message message, uint32_t requestID) {

switch (message) {
case fdb5::remote::Message::Complete: {
Expand Down Expand Up @@ -279,7 +279,7 @@ bool RemoteFDB::handle(remote::Message message, bool control, uint32_t requestID
return false;
}
}
bool RemoteFDB::handle(remote::Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) {
bool RemoteFDB::handle(remote::Message message, uint32_t requestID, eckit::Buffer&& payload) {

switch (message) {
case fdb5::remote::Message::Blob: {
Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/api/RemoteFDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class RemoteFDB : public LocalFDB, public remote::Client {
FDBStats stats() const override { NOTIMP; }

// Client
bool handle(remote::Message message, bool control, uint32_t requestID) override;
bool handle(remote::Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) override;
bool handle(remote::Message message, uint32_t requestID) override;
bool handle(remote::Message message, uint32_t requestID, eckit::Buffer&& payload) override;

private: // members

Expand Down
58 changes: 32 additions & 26 deletions src/fdb5/database/Archiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,37 +77,43 @@ void Archiver::selectDatabase(const Key& dbKey) {

static size_t fdbMaxNbDBsOpen = eckit::Resource<size_t>("fdbMaxNbDBsOpen", 64);

if (databases_.size() >= fdbMaxNbDBsOpen) {
bool found = false;
time_t oldest = ::time(0) + 24 * 60 * 60;
Key oldK;
for (auto i = databases_.begin(); i != databases_.end(); ++i) {
if (i->second.time_ <= oldest) {
found = true;
oldK = i->first;
oldest = i->second.time_;
{
std::lock_guard<std::mutex> cacheLock(cacheMutex_);
if (databases_.size() >= fdbMaxNbDBsOpen) {
bool found = false;
time_t oldest = ::time(0) + 24 * 60 * 60;
Key oldK;
for (auto i = databases_.begin(); i != databases_.end(); ++i) {
if (i->second.time_ <= oldest) {
found = true;
oldK = i->first;
oldest = i->second.time_;
}
}
if (found) {
// flushing before evicting from cache
std::lock_guard<std::mutex> lock(flushMutex_);

databases_[oldK].catalogue_->flush(databases_[oldK].store_->flush());

eckit::Log::info() << "Closing database " << *databases_[oldK].catalogue_ << std::endl;
databases_.erase(oldK);
}
}
if (found) {
databases_[oldK].catalogue_->flush(databases_[oldK].store_->flush());

eckit::Log::info() << "Closing database " << *databases_[oldK].catalogue_ << std::endl;
databases_.erase(oldK);
}
}

std::unique_ptr<CatalogueWriter> cat = CatalogueWriterFactory::instance().build(dbKey, dbConfig_);
ASSERT(cat);
std::unique_ptr<CatalogueWriter> cat = CatalogueWriterFactory::instance().build(dbKey, dbConfig_);
ASSERT(cat);

// If this database is locked for writing then this is an error
if (!cat->enabled(ControlIdentifier::Archive)) {
std::ostringstream ss;
ss << "Database " << *cat << " matched for archived is LOCKED against archiving";
throw eckit::UserError(ss.str(), Here());
}
// If this database is locked for writing then this is an error
if (!cat->enabled(ControlIdentifier::Archive)) {
std::ostringstream ss;
ss << "Database " << *cat << " matched for archived is LOCKED against archiving";
throw eckit::UserError(ss.str(), Here());
}

std::unique_ptr<Store> str = cat->buildStore();
db_ = &(databases_[dbKey] = Database{::time(0), std::move(cat), std::move(str)});
std::unique_ptr<Store> str = cat->buildStore();
db_ = &(databases_[dbKey] = Database{::time(0), std::move(cat), std::move(str)});
}
}

void Archiver::print(std::ostream& out) const {
Expand Down
1 change: 1 addition & 0 deletions src/fdb5/database/Archiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Archiver : public eckit::NonCopyable {
Database* db_;

std::mutex flushMutex_;
std::mutex cacheMutex_;
const ArchiveCallback& callback_;
};

Expand Down
13 changes: 5 additions & 8 deletions src/fdb5/database/EntryVisitMechanism.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,22 @@ class FDBVisitException : public eckit::Exception {

//----------------------------------------------------------------------------------------------------------------------

EntryVisitor::EntryVisitor() : currentCatalogue_(nullptr), currentStore_(nullptr), currentIndex_(nullptr) {}
EntryVisitor::~EntryVisitor() {}

EntryVisitor::~EntryVisitor() {
delete currentStore_;
}
EntryVisitor::EntryVisitor() : currentCatalogue_(nullptr), currentStore_(nullptr), currentIndex_(nullptr) {}

Store& EntryVisitor::store() const {
if (!currentStore_) {
ASSERT(currentCatalogue_);
currentStore_ = currentCatalogue_->buildStore().release();
currentStore_ = currentCatalogue_->buildStore();
ASSERT(currentStore_);
}
return *currentStore_;
}

bool EntryVisitor::visitDatabase(const Catalogue& catalogue) {
currentCatalogue_ = &catalogue;
currentStore_ = nullptr;
currentStore_.reset();
currentIndex_ = nullptr;
rule_ = nullptr;
return true;
Expand All @@ -61,8 +59,7 @@ void EntryVisitor::catalogueComplete(const Catalogue& catalogue) {
ASSERT(currentCatalogue_ == &catalogue);
}
currentCatalogue_ = nullptr;
delete currentStore_;
currentStore_ = nullptr;
currentStore_.reset();
currentIndex_ = nullptr;
rule_ = nullptr;
}
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/EntryVisitMechanism.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class EntryVisitor : public eckit::NonCopyable {
/// Non-owning
const Catalogue* currentCatalogue_ = nullptr;
/// Owned store
mutable Store* currentStore_ = nullptr;
mutable std::unique_ptr<Store> currentStore_ = nullptr;
/// Non-owning
const Index* currentIndex_ = nullptr;
/// Non-owning
Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/remote/client/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ class Client : eckit::NonCopyable {
void dataWrite(remote::Message msg, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data={});

// handlers for incoming messages - to be defined in the client class
virtual bool handle(Message message, bool control, uint32_t requestID) = 0;
virtual bool handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) = 0;
virtual bool handle(Message message, uint32_t requestID) = 0;
virtual bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) = 0;

protected:

Expand Down
8 changes: 4 additions & 4 deletions src/fdb5/remote/client/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,10 @@ void ClientConnection::listeningControlThreadLoop() {
}

if (hdr.payloadSize == 0) {
handled = client->handle(hdr.message, hdr.control(), hdr.requestID);
handled = client->handle(hdr.message, hdr.requestID);
}
else {
handled = client->handle(hdr.message, hdr.control(), hdr.requestID, std::move(payload));
handled = client->handle(hdr.message, hdr.requestID, std::move(payload));
}
}

Expand Down Expand Up @@ -425,10 +425,10 @@ void ClientConnection::listeningDataThreadLoop() {
ASSERT(client);
ASSERT(!hdr.control());
if (hdr.payloadSize == 0) {
handled = client->handle(hdr.message, hdr.control(), hdr.requestID);
handled = client->handle(hdr.message, hdr.requestID);
}
else {
handled = client->handle(hdr.message, hdr.control(), hdr.requestID, std::move(payload));
handled = client->handle(hdr.message, hdr.requestID, std::move(payload));
}

if (!handled) {
Expand Down
7 changes: 2 additions & 5 deletions src/fdb5/remote/client/RemoteCatalogue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ RemoteCatalogue::RemoteCatalogue(const eckit::URI& uri, const Config& config):
NOTIMP;
}

RemoteCatalogue::~RemoteCatalogue() {}

void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::shared_ptr<const FieldLocation> fieldLocation) {

ASSERT(!datumKey.empty());
Expand Down Expand Up @@ -137,12 +135,11 @@ void RemoteCatalogue::loadSchema() {
}
}

bool RemoteCatalogue::handle(Message message, bool control, uint32_t requestID) {
bool RemoteCatalogue::handle(Message message, uint32_t requestID) {
Log::warning() << *this << " - Received [message=" << ((uint) message) << ",requestID=" << requestID << "]" << std::endl;
NOTIMP;
return false;
}
bool RemoteCatalogue::handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) {
bool RemoteCatalogue::handle(Message message, uint32_t requestID, eckit::Buffer&& payload) {
LOG_DEBUG_LIB(LibFdb5) << *this << " - Received [message=" << ((uint) message) << ",requestID=" << requestID << ",payloadSize=" << payload.size() << "]" << std::endl;
return false;
}
Expand Down
6 changes: 3 additions & 3 deletions src/fdb5/remote/client/RemoteCatalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C
RemoteCatalogue(const Key& key, const Config& config);
RemoteCatalogue(const eckit::URI& uri, const Config& config);

~RemoteCatalogue() override;
~RemoteCatalogue() override = default;

// From CatalogueWriter
const Index& currentIndex() override;
Expand Down Expand Up @@ -66,8 +66,8 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C
private:
// From Client
// handlers for incoming messages - to be defined in the client class
bool handle(Message message, bool control, uint32_t requestID) override;
bool handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) override;
bool handle(Message message, uint32_t requestID) override;
bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) override;

protected:

Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/remote/client/RemoteStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ void RemoteStore::print(std::ostream &out) const {
out << "RemoteStore(host=" << controlEndpoint() << ")";
}

bool RemoteStore::handle(Message message, bool control, uint32_t requestID) {
bool RemoteStore::handle(Message message, uint32_t requestID) {

switch (message) {
case Message::Complete: {
Expand Down Expand Up @@ -346,7 +346,7 @@ bool RemoteStore::handle(Message message, bool control, uint32_t requestID) {
return false;
}
}
bool RemoteStore::handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) {
bool RemoteStore::handle(Message message, uint32_t requestID, eckit::Buffer&& payload) {

switch (message) {

Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/remote/client/RemoteStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ class RemoteStore : public Store, public Client {
private: // methods

// handlers for incoming messages - to be defined in the client class
bool handle(Message message, bool control, uint32_t requestID) override;
bool handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) override;
bool handle(Message message, uint32_t requestID) override;
bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) override;

private: // members

Expand Down

0 comments on commit 5974d92

Please sign in to comment.