Skip to content

Commit

Permalink
remoteFDB concurrecy
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Jul 3, 2024
1 parent f1360ae commit 049e11b
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 49 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5.11.123
5.11.124
1 change: 1 addition & 0 deletions src/fdb5/remote/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ void Connection::write(remote::Message msg, bool control, uint32_t clientID, uin
// write(msg, false, clientID, requestID, data);
// }
void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) {
eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl;
write(Message::Error, false, clientID, requestID, std::vector<std::pair<const void*, uint32_t>>{{msg.c_str(), msg.length()}});
}
// void Connection::error(const std::string& msg, const Handler& clientID, uint32_t requestID) {
Expand Down
44 changes: 22 additions & 22 deletions src/fdb5/remote/client/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ ClientConnection::ClientConnection(const eckit::net::Endpoint& controlEndpoint,

void ClientConnection::add(Client& client) {
std::lock_guard<std::mutex> lock(clientsMutex_);
auto it = clients_.find(client.id());
ASSERT(it == clients_.end());
// auto it = clients_.find(client.id());
// ASSERT(it == clients_.end());

clients_[client.id()] = &client;
}
Expand Down Expand Up @@ -197,10 +197,9 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const {

std::future<eckit::Buffer> ClientConnection::controlWrite(Client& client, Message msg, uint32_t requestID, bool dataListener, std::vector<std::pair<const void*, uint32_t>> data) {

std::lock_guard<std::mutex> lock(clientsMutex_);

auto it = clients_.find(client.clientId());
ASSERT(it != clients_.end());
// std::lock_guard<std::mutex> lock(clientsMutex_);
// auto it = clients_.find(client.clientId());
// ASSERT(it != clients_.end());

std::future<eckit::Buffer> f;
{
Expand Down Expand Up @@ -381,23 +380,7 @@ void ClientConnection::listeningControlThreadLoop() {
} else {
if (hdr.clientID()) {
bool handled = false;
Client* client = nullptr;
{
std::lock_guard<std::mutex> lock(clientsMutex_);

auto it = clients_.find(hdr.clientID());
if (it == clients_.end()) {
std::stringstream ss;
ss << "ERROR: Received [clientID="<< hdr.clientID() << ",requestID="<< hdr.requestID << ",message=" << hdr.message << ",payload=" << hdr.payloadSize << "]" << std::endl;
ss << "Unexpected answer for clientID recieved (" << hdr.clientID() << "). ABORTING";
eckit::Log::status() << ss.str() << std::endl;
eckit::Log::error() << "Retrieving... " << ss.str() << std::endl;
throw eckit::SeriousBug(ss.str(), Here());
}
client = it->second;
}

ASSERT(client);
ASSERT(hdr.control() || single_);

auto pp = promises_.find(hdr.requestID);
Expand All @@ -412,6 +395,23 @@ void ClientConnection::listeningControlThreadLoop() {
promises_.erase(pp);
handled = true;
} else {

Client* client = nullptr;
{
std::lock_guard<std::mutex> lock(clientsMutex_);

auto it = clients_.find(hdr.clientID());
if (it == clients_.end()) {
std::stringstream ss;
ss << "ERROR: Received [clientID="<< hdr.clientID() << ",requestID="<< hdr.requestID << ",message=" << hdr.message << ",payload=" << hdr.payloadSize << "]" << std::endl;
ss << "Unexpected answer for clientID recieved (" << hdr.clientID() << "). ABORTING";
eckit::Log::status() << ss.str() << std::endl;
eckit::Log::error() << "Retrieving... " << ss.str() << std::endl;
throw eckit::SeriousBug(ss.str(), Here());
}
client = it->second;
}

if (hdr.payloadSize == 0) {
handled = client->handle(hdr.message, hdr.control(), hdr.requestID);
}
Expand Down
76 changes: 55 additions & 21 deletions src/fdb5/remote/server/CatalogueHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace fdb5::remote {
// ***************************************************************************************

CatalogueHandler::CatalogueHandler(eckit::net::TCPSocket& socket, const Config& config):
ServerConnection(socket, config), fdbId_(0), fdbControlConnection_(false), fdbDataConnection_(false) {}
ServerConnection(socket, config), fdbControlConnection_(false), fdbDataConnection_(false) {}

CatalogueHandler::~CatalogueHandler() {}

Expand All @@ -41,8 +41,9 @@ Handled CatalogueHandler::handleControl(Message message, uint32_t clientID, uint
case Message::Schema: // request top-level schema
{
std::lock_guard<std::mutex> lock(handlerMutex_);
if (fdbId_ == 0) {
fdbId_ = clientID;
auto it = fdbs_.find(clientID);
if (it == fdbs_.end()) {
fdbs_[clientID];
fdbControlConnection_ = true;
fdbDataConnection_ = !single_;
numControlConnection_++;
Expand All @@ -62,9 +63,9 @@ Handled CatalogueHandler::handleControl(Message message, uint32_t clientID, uint
archiver();
return Handled::YesAddArchiveListener;

case Message::Flush: // notification that the client has sent all data locations for archival
flush(clientID, requestID, eckit::Buffer{0});
return Handled::Yes;
// case Message::Flush: // notification that the client has sent all data locations for archival
// flush(clientID, requestID, eckit::Buffer{0});
// return Handled::Yes;

default: {
std::stringstream ss;
Expand Down Expand Up @@ -251,8 +252,15 @@ void CatalogueHandler::forwardApiCall(uint32_t clientID, uint32_t requestID, eck
requestID, std::async(std::launch::async, [request, clientID, requestID, helper, this]() {

try {
auto it = fdbs_.find(clientID);
auto iterator = helper.apiCall(it->second, request);
FDB* fdb = nullptr;
{
std::lock_guard<std::mutex> lock(handlerMutex_);
auto it = fdbs_.find(clientID);
ASSERT(it != fdbs_.end());
fdb = &it->second;
ASSERT(fdb);
}
auto iterator = helper.apiCall(*fdb, request);
typename decltype(iterator)::value_type elem;
while (iterator.next(elem)) {
auto encoded(helper.encode(elem, *this));
Expand Down Expand Up @@ -375,24 +383,41 @@ void CatalogueHandler::stores(uint32_t clientID, uint32_t requestID) {


void CatalogueHandler::flush(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) {


ASSERT(payload.size() > 0);

size_t numArchived = 0;

if (payload.size() > 0) {
MemoryStream s(payload);
s >> numArchived;
MemoryStream s(payload);
s >> numArchived;

if (numArchived == 0) {
return;
}

auto it = catalogues_.find(clientID);
ASSERT(it != catalogues_.end());

it->second.locationsExpected = numArchived;
it->second.archivalCompleted = it->second.fieldLocationsReceived.get_future();
// std::cout << "flush " << clientID << " archived " << it->second.locationsArchived << " expected " << it->second.locationsExpected << std::endl;

{
std::lock_guard<std::mutex> lock(fieldLocationsMutex_);
it->second.locationsExpected = numArchived; // setting locationsExpected also means that a flush has been requested
it->second.archivalCompleted = it->second.fieldLocationsReceived.get_future();
// std::cout << "flush post lock " << clientID << " archived " << it->second.locationsArchived << " expected " << it->second.locationsExpected << std::endl;
if (it->second.locationsArchived == numArchived) {
it->second.fieldLocationsReceived.set_value(numArchived);
}
}

if (it->second.locationsArchived < numArchived) {
it->second.archivalCompleted.wait();
// std::cout << "flush wait " << clientID << " archived " << it->second.locationsArchived << " expected " << it->second.locationsExpected << std::endl;
it->second.archivalCompleted.wait();
{
std::lock_guard<std::mutex> lock(fieldLocationsMutex_);
it->second.fieldLocationsReceived = std::promise<size_t>{};
it->second.locationsExpected = 0;
it->second.locationsArchived = 0;
}
// std::cout << "flush post wait " << clientID << " archived " << it->second.locationsArchived << " expected " << it->second.locationsExpected << std::endl;

it->second.catalogue->flush(numArchived);

Expand Down Expand Up @@ -427,16 +452,25 @@ void CatalogueHandler::archiveBlob(const uint32_t clientID, const uint32_t reque

it->second.catalogue->selectIndex(idxKey);
it->second.catalogue->archive(idxKey, key, std::move(location));
it->second.locationsArchived++;
if (it->second.archivalCompleted.valid() && it->second.locationsExpected == it->second.locationsArchived) {
it->second.fieldLocationsReceived.set_value(it->second.locationsExpected);
{
// std::cout << "archiveBlob " << clientID << " archived " << it->second.locationsArchived << " expected " << it->second.locationsExpected << std::endl;
std::lock_guard<std::mutex> lock(fieldLocationsMutex_);
// std::cout << "archiveBlob post mutex " << std::endl;
it->second.locationsArchived++;
if (it->second.locationsExpected != 0 && it->second.archivalCompleted.valid() && it->second.locationsExpected == it->second.locationsArchived) {
// std::cout << "archiveBlob set_value " << std::endl;
it->second.fieldLocationsReceived.set_value(it->second.locationsExpected);
}
}
}

bool CatalogueHandler::remove(bool control, uint32_t clientID) {

std::lock_guard<std::mutex> lock(handlerMutex_);
if (clientID == fdbId_) {

// is the client an FDB
auto it = fdbs_.find(clientID);
if (it != fdbs_.end()) {
if (control) {
fdbControlConnection_ = false;
numControlConnection_--;
Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/remote/server/CatalogueHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ class CatalogueHandler : public ServerConnection {
std::map<uint32_t, FDB> fdbs_;

std::mutex fdbMutex_;
std::mutex fieldLocationsMutex_;

uint32_t fdbId_;
// uint32_t fdbId_;
bool fdbControlConnection_;
bool fdbDataConnection_;
};
Expand Down
1 change: 1 addition & 0 deletions src/fdb5/remote/server/ServerConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ void ServerConnection::waitForWorkers() {
eckit::Log::error() << "Thread complete" << std::endl;
}

std::lock_guard<std::mutex> lock(readLocationMutex_);
if (readLocationWorker_.joinable()) {
readLocationWorker_.join();
}
Expand Down
1 change: 1 addition & 0 deletions src/fdb5/remote/server/ServerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class ServerConnection : public Connection, public Handler {

eckit::SessionID sessionID_;
eckit::LocalConfiguration agreedConf_;
std::mutex readLocationMutex_;
std::thread readLocationWorker_;

std::map<uint32_t, std::future<void>> workerThreads_;
Expand Down
10 changes: 7 additions & 3 deletions src/fdb5/remote/server/StoreHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ Handled StoreHandler::handleControl(Message message, uint32_t clientID, uint32_t

void StoreHandler::read(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload) {

if (!readLocationWorker_.joinable()) {
readLocationWorker_ = std::thread([this] { readLocationThreadLoop(); });
{
std::lock_guard<std::mutex> lock(readLocationMutex_);
if (!readLocationWorker_.joinable()) {
readLocationWorker_ = std::thread([this] { readLocationThreadLoop(); });
}
}

MemoryStream s(payload);
Expand Down Expand Up @@ -188,6 +191,7 @@ void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buf

ASSERT(numArchived == 0 || archiveFuture_.valid());

std::lock_guard<std::mutex> lock(handlerMutex_);
auto it = stores_.find(clientID);
ASSERT(it != stores_.end());
it->second.store->flush();
Expand All @@ -199,7 +203,6 @@ void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buf
bool StoreHandler::remove(bool control, uint32_t clientID) {

std::lock_guard<std::mutex> lock(handlerMutex_);

auto it = stores_.find(clientID);
if (it != stores_.end()) {
if (control) {
Expand All @@ -222,6 +225,7 @@ Store& StoreHandler::store(uint32_t clientID) {
auto it = stores_.find(clientID);
if (it == stores_.end()) {
std::string what("Requested Store has not been loaded id: " + std::to_string(clientID));
Log::error() << what << std::endl;
write(Message::Error, true, 0, 0, what.c_str(), what.length());
throw;
}
Expand Down
1 change: 0 additions & 1 deletion src/fdb5/remote/server/StoreHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ class StoreHandler : public ServerConnection {
void writeToParent(const uint32_t clientID, const uint32_t requestID, std::unique_ptr<eckit::DataHandle> dh);

bool remove(bool control, uint32_t clientID) override;
// bool handlers() override;

Store& store(uint32_t clientID);
Store& store(uint32_t clientID, const Key& dbKey);
Expand Down

0 comments on commit 049e11b

Please sign in to comment.