From 049e11bd59d0418f6a8b5541796920f2eba13d23 Mon Sep 17 00:00:00 2001 From: Emanuele Danovaro Date: Wed, 3 Jul 2024 10:40:32 +0100 Subject: [PATCH] remoteFDB concurrecy --- VERSION | 2 +- src/fdb5/remote/Connection.cc | 1 + src/fdb5/remote/client/ClientConnection.cc | 44 ++++++------- src/fdb5/remote/server/CatalogueHandler.cc | 76 ++++++++++++++++------ src/fdb5/remote/server/CatalogueHandler.h | 3 +- src/fdb5/remote/server/ServerConnection.cc | 1 + src/fdb5/remote/server/ServerConnection.h | 1 + src/fdb5/remote/server/StoreHandler.cc | 10 ++- src/fdb5/remote/server/StoreHandler.h | 1 - 9 files changed, 90 insertions(+), 49 deletions(-) diff --git a/VERSION b/VERSION index 76d55bb22..fccd8bbd0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.11.123 \ No newline at end of file +5.11.124 \ No newline at end of file diff --git a/src/fdb5/remote/Connection.cc b/src/fdb5/remote/Connection.cc index a17661861..f3df2dffc 100644 --- a/src/fdb5/remote/Connection.cc +++ b/src/fdb5/remote/Connection.cc @@ -131,6 +131,7 @@ void Connection::write(remote::Message msg, bool control, uint32_t clientID, uin // write(msg, false, clientID, requestID, data); // } void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) { + eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl; write(Message::Error, false, clientID, requestID, std::vector>{{msg.c_str(), msg.length()}}); } // void Connection::error(const std::string& msg, const Handler& clientID, uint32_t requestID) { diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 63ce0e753..b5244ca40 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -65,8 +65,8 @@ ClientConnection::ClientConnection(const eckit::net::Endpoint& controlEndpoint, void ClientConnection::add(Client& client) { std::lock_guard lock(clientsMutex_); - auto it = clients_.find(client.id()); - ASSERT(it == clients_.end()); + // auto it = clients_.find(client.id()); + // ASSERT(it == clients_.end()); clients_[client.id()] = &client; } @@ -197,10 +197,9 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const { std::future ClientConnection::controlWrite(Client& client, Message msg, uint32_t requestID, bool dataListener, std::vector> data) { - std::lock_guard lock(clientsMutex_); - - auto it = clients_.find(client.clientId()); - ASSERT(it != clients_.end()); + // std::lock_guard lock(clientsMutex_); + // auto it = clients_.find(client.clientId()); + // ASSERT(it != clients_.end()); std::future f; { @@ -381,23 +380,7 @@ void ClientConnection::listeningControlThreadLoop() { } else { if (hdr.clientID()) { bool handled = false; - Client* client = nullptr; - { - std::lock_guard lock(clientsMutex_); - auto it = clients_.find(hdr.clientID()); - if (it == clients_.end()) { - std::stringstream ss; - ss << "ERROR: Received [clientID="<< hdr.clientID() << ",requestID="<< hdr.requestID << ",message=" << hdr.message << ",payload=" << hdr.payloadSize << "]" << std::endl; - ss << "Unexpected answer for clientID recieved (" << hdr.clientID() << "). ABORTING"; - eckit::Log::status() << ss.str() << std::endl; - eckit::Log::error() << "Retrieving... " << ss.str() << std::endl; - throw eckit::SeriousBug(ss.str(), Here()); - } - client = it->second; - } - - ASSERT(client); ASSERT(hdr.control() || single_); auto pp = promises_.find(hdr.requestID); @@ -412,6 +395,23 @@ void ClientConnection::listeningControlThreadLoop() { promises_.erase(pp); handled = true; } else { + + Client* client = nullptr; + { + std::lock_guard lock(clientsMutex_); + + auto it = clients_.find(hdr.clientID()); + if (it == clients_.end()) { + std::stringstream ss; + ss << "ERROR: Received [clientID="<< hdr.clientID() << ",requestID="<< hdr.requestID << ",message=" << hdr.message << ",payload=" << hdr.payloadSize << "]" << std::endl; + ss << "Unexpected answer for clientID recieved (" << hdr.clientID() << "). ABORTING"; + eckit::Log::status() << ss.str() << std::endl; + eckit::Log::error() << "Retrieving... " << ss.str() << std::endl; + throw eckit::SeriousBug(ss.str(), Here()); + } + client = it->second; + } + if (hdr.payloadSize == 0) { handled = client->handle(hdr.message, hdr.control(), hdr.requestID); } diff --git a/src/fdb5/remote/server/CatalogueHandler.cc b/src/fdb5/remote/server/CatalogueHandler.cc index 95382a2ef..464702150 100644 --- a/src/fdb5/remote/server/CatalogueHandler.cc +++ b/src/fdb5/remote/server/CatalogueHandler.cc @@ -30,7 +30,7 @@ namespace fdb5::remote { // *************************************************************************************** CatalogueHandler::CatalogueHandler(eckit::net::TCPSocket& socket, const Config& config): - ServerConnection(socket, config), fdbId_(0), fdbControlConnection_(false), fdbDataConnection_(false) {} + ServerConnection(socket, config), fdbControlConnection_(false), fdbDataConnection_(false) {} CatalogueHandler::~CatalogueHandler() {} @@ -41,8 +41,9 @@ Handled CatalogueHandler::handleControl(Message message, uint32_t clientID, uint case Message::Schema: // request top-level schema { std::lock_guard lock(handlerMutex_); - if (fdbId_ == 0) { - fdbId_ = clientID; + auto it = fdbs_.find(clientID); + if (it == fdbs_.end()) { + fdbs_[clientID]; fdbControlConnection_ = true; fdbDataConnection_ = !single_; numControlConnection_++; @@ -62,9 +63,9 @@ Handled CatalogueHandler::handleControl(Message message, uint32_t clientID, uint archiver(); return Handled::YesAddArchiveListener; - case Message::Flush: // notification that the client has sent all data locations for archival - flush(clientID, requestID, eckit::Buffer{0}); - return Handled::Yes; + // case Message::Flush: // notification that the client has sent all data locations for archival + // flush(clientID, requestID, eckit::Buffer{0}); + // return Handled::Yes; default: { std::stringstream ss; @@ -251,8 +252,15 @@ void CatalogueHandler::forwardApiCall(uint32_t clientID, uint32_t requestID, eck requestID, std::async(std::launch::async, [request, clientID, requestID, helper, this]() { try { - auto it = fdbs_.find(clientID); - auto iterator = helper.apiCall(it->second, request); + FDB* fdb = nullptr; + { + std::lock_guard lock(handlerMutex_); + auto it = fdbs_.find(clientID); + ASSERT(it != fdbs_.end()); + fdb = &it->second; + ASSERT(fdb); + } + auto iterator = helper.apiCall(*fdb, request); typename decltype(iterator)::value_type elem; while (iterator.next(elem)) { auto encoded(helper.encode(elem, *this)); @@ -375,24 +383,41 @@ void CatalogueHandler::stores(uint32_t clientID, uint32_t requestID) { void CatalogueHandler::flush(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) { - + + ASSERT(payload.size() > 0); + size_t numArchived = 0; - - if (payload.size() > 0) { - MemoryStream s(payload); - s >> numArchived; + MemoryStream s(payload); + s >> numArchived; + + if (numArchived == 0) { + return; } auto it = catalogues_.find(clientID); ASSERT(it != catalogues_.end()); - it->second.locationsExpected = numArchived; - it->second.archivalCompleted = it->second.fieldLocationsReceived.get_future(); + // std::cout << "flush " << clientID << " archived " << it->second.locationsArchived << " expected " << it->second.locationsExpected << std::endl; + + { + std::lock_guard lock(fieldLocationsMutex_); + it->second.locationsExpected = numArchived; // setting locationsExpected also means that a flush has been requested + it->second.archivalCompleted = it->second.fieldLocationsReceived.get_future(); + // std::cout << "flush post lock " << clientID << " archived " << it->second.locationsArchived << " expected " << it->second.locationsExpected << std::endl; + if (it->second.locationsArchived == numArchived) { + it->second.fieldLocationsReceived.set_value(numArchived); + } + } - if (it->second.locationsArchived < numArchived) { - it->second.archivalCompleted.wait(); + // std::cout << "flush wait " << clientID << " archived " << it->second.locationsArchived << " expected " << it->second.locationsExpected << std::endl; + it->second.archivalCompleted.wait(); + { + std::lock_guard lock(fieldLocationsMutex_); it->second.fieldLocationsReceived = std::promise{}; + it->second.locationsExpected = 0; + it->second.locationsArchived = 0; } + // std::cout << "flush post wait " << clientID << " archived " << it->second.locationsArchived << " expected " << it->second.locationsExpected << std::endl; it->second.catalogue->flush(numArchived); @@ -427,16 +452,25 @@ void CatalogueHandler::archiveBlob(const uint32_t clientID, const uint32_t reque it->second.catalogue->selectIndex(idxKey); it->second.catalogue->archive(idxKey, key, std::move(location)); - it->second.locationsArchived++; - if (it->second.archivalCompleted.valid() && it->second.locationsExpected == it->second.locationsArchived) { - it->second.fieldLocationsReceived.set_value(it->second.locationsExpected); + { + // std::cout << "archiveBlob " << clientID << " archived " << it->second.locationsArchived << " expected " << it->second.locationsExpected << std::endl; + std::lock_guard lock(fieldLocationsMutex_); + // std::cout << "archiveBlob post mutex " << std::endl; + it->second.locationsArchived++; + if (it->second.locationsExpected != 0 && it->second.archivalCompleted.valid() && it->second.locationsExpected == it->second.locationsArchived) { + // std::cout << "archiveBlob set_value " << std::endl; + it->second.fieldLocationsReceived.set_value(it->second.locationsExpected); + } } } bool CatalogueHandler::remove(bool control, uint32_t clientID) { std::lock_guard lock(handlerMutex_); - if (clientID == fdbId_) { + + // is the client an FDB + auto it = fdbs_.find(clientID); + if (it != fdbs_.end()) { if (control) { fdbControlConnection_ = false; numControlConnection_--; diff --git a/src/fdb5/remote/server/CatalogueHandler.h b/src/fdb5/remote/server/CatalogueHandler.h index 151752ce2..f60e151b2 100644 --- a/src/fdb5/remote/server/CatalogueHandler.h +++ b/src/fdb5/remote/server/CatalogueHandler.h @@ -73,8 +73,9 @@ class CatalogueHandler : public ServerConnection { std::map fdbs_; std::mutex fdbMutex_; + std::mutex fieldLocationsMutex_; - uint32_t fdbId_; + // uint32_t fdbId_; bool fdbControlConnection_; bool fdbDataConnection_; }; diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 3058149b9..a72d92d8b 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -560,6 +560,7 @@ void ServerConnection::waitForWorkers() { eckit::Log::error() << "Thread complete" << std::endl; } + std::lock_guard lock(readLocationMutex_); if (readLocationWorker_.joinable()) { readLocationWorker_.join(); } diff --git a/src/fdb5/remote/server/ServerConnection.h b/src/fdb5/remote/server/ServerConnection.h index b9d48b0ca..16f10b038 100644 --- a/src/fdb5/remote/server/ServerConnection.h +++ b/src/fdb5/remote/server/ServerConnection.h @@ -141,6 +141,7 @@ class ServerConnection : public Connection, public Handler { eckit::SessionID sessionID_; eckit::LocalConfiguration agreedConf_; + std::mutex readLocationMutex_; std::thread readLocationWorker_; std::map> workerThreads_; diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index 859c87139..c19f7678b 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -86,8 +86,11 @@ Handled StoreHandler::handleControl(Message message, uint32_t clientID, uint32_t void StoreHandler::read(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload) { - if (!readLocationWorker_.joinable()) { - readLocationWorker_ = std::thread([this] { readLocationThreadLoop(); }); + { + std::lock_guard lock(readLocationMutex_); + if (!readLocationWorker_.joinable()) { + readLocationWorker_ = std::thread([this] { readLocationThreadLoop(); }); + } } MemoryStream s(payload); @@ -188,6 +191,7 @@ void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buf ASSERT(numArchived == 0 || archiveFuture_.valid()); + std::lock_guard lock(handlerMutex_); auto it = stores_.find(clientID); ASSERT(it != stores_.end()); it->second.store->flush(); @@ -199,7 +203,6 @@ void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buf bool StoreHandler::remove(bool control, uint32_t clientID) { std::lock_guard lock(handlerMutex_); - auto it = stores_.find(clientID); if (it != stores_.end()) { if (control) { @@ -222,6 +225,7 @@ Store& StoreHandler::store(uint32_t clientID) { auto it = stores_.find(clientID); if (it == stores_.end()) { std::string what("Requested Store has not been loaded id: " + std::to_string(clientID)); + Log::error() << what << std::endl; write(Message::Error, true, 0, 0, what.c_str(), what.length()); throw; } diff --git a/src/fdb5/remote/server/StoreHandler.h b/src/fdb5/remote/server/StoreHandler.h index 0319e6d4b..74b2adec1 100644 --- a/src/fdb5/remote/server/StoreHandler.h +++ b/src/fdb5/remote/server/StoreHandler.h @@ -49,7 +49,6 @@ class StoreHandler : public ServerConnection { void writeToParent(const uint32_t clientID, const uint32_t requestID, std::unique_ptr dh); bool remove(bool control, uint32_t clientID) override; - // bool handlers() override; Store& store(uint32_t clientID); Store& store(uint32_t clientID, const Key& dbKey);