Skip to content

Commit

Permalink
concurrent remote readers
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Jul 3, 2024
1 parent 36e7bc9 commit 5df53a6
Showing 2 changed files with 28 additions and 11 deletions.
36 changes: 26 additions & 10 deletions src/fdb5/remote/client/RemoteStore.cc
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ class FDBRemoteDataHandle : public DataHandle {
public: // methods

FDBRemoteDataHandle(uint32_t requestID, Length estimate,
RemoteStore::MessageQueue& queue,
std::shared_ptr<RemoteStore::MessageQueue> queue,
const net::Endpoint& remoteEndpoint) :
requestID_(requestID),
estimate_(estimate),
@@ -98,7 +98,7 @@ class FDBRemoteDataHandle : public DataHandle {

// If we are in the DataHandle, then there MUST be data to read
RemoteStore::StoredMessage msg = std::make_pair(remote::Message{}, eckit::Buffer{0});
ASSERT(queue_.pop(msg) != -1);
ASSERT(queue_->pop(msg) != -1);

// Handle any remote errors communicated from the server
if (msg.first == Message::Error) {
@@ -109,7 +109,7 @@ class FDBRemoteDataHandle : public DataHandle {
// Are we now complete?
if (msg.first == Message::Complete) {
if (overallPosition_ == eckit::Offset(0)) {
ASSERT(queue_.pop(msg) != -1);
ASSERT(queue_->pop(msg) != -1);
} else {
complete_ = true;
return total;
@@ -169,7 +169,7 @@ class FDBRemoteDataHandle : public DataHandle {

uint32_t requestID_;
Length estimate_;
RemoteStore::MessageQueue& queue_;
std::shared_ptr<RemoteStore::MessageQueue> queue_;
net::Endpoint remoteEndpoint_;
size_t pos_;
Offset overallPosition_;
@@ -197,14 +197,14 @@ std::vector<std::pair<eckit::net::Endpoint, std::string>> storeEndpoints(const C
RemoteStore::RemoteStore(const Key& dbKey, const Config& config) :
Client(storeEndpoints(config)),
dbKey_(dbKey), config_(config),
retrieveMessageQueue_(eckit::Resource<size_t>("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200)),
// retrieveMessageQueue_(eckit::Resource<size_t>("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200)),
fieldsArchived_(0), locationsReceived_(0) {}

// this is used only in retrieval, with an URI already referring to an accessible Store
RemoteStore::RemoteStore(const eckit::URI& uri, const Config& config) :
Client(eckit::net::Endpoint(uri.hostport()), uri.hostport()),
dbKey_(Key()), config_(config),
retrieveMessageQueue_(eckit::Resource<size_t>("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200)),
// retrieveMessageQueue_(eckit::Resource<size_t>("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200)),
fieldsArchived_(0), locationsReceived_(0) {

// no need to set the local_ flag on the read path
@@ -348,7 +348,12 @@ bool RemoteStore::handle(Message message, bool control, uint32_t requestID) {
messageQueues_.erase(it);

} else {
retrieveMessageQueue_.emplace(std::make_pair(message, Buffer(0)));
auto id = retrieveMessageQueues_.find(requestID);
ASSERT (id != retrieveMessageQueues_.end());

id->second->emplace(std::make_pair(message, Buffer(0)));

retrieveMessageQueues_.erase(id);
}
return true;
}
@@ -403,7 +408,9 @@ bool RemoteStore::handle(Message message, bool control, uint32_t requestID, ecki
if (it != messageQueues_.end()) {
it->second->emplace(message, std::move(payload));
} else {
retrieveMessageQueue_.emplace(message, std::move(payload));
auto id = retrieveMessageQueues_.find(requestID);
ASSERT (id != retrieveMessageQueues_.end());
id->second->emplace(std::make_pair(message, std::move(payload)));
}
return true;
}
@@ -425,7 +432,7 @@ bool RemoteStore::handle(Message message, bool control, uint32_t requestID, ecki
if (it != locations_.end()) {
// archiver_->error(std::move(payload), controlEndpoint());
} else {
retrieveMessageQueue_.emplace(message, std::move(payload));
// retrieveMessageQueue_.emplace(message, std::move(payload));
}
}
return true;
@@ -450,9 +457,18 @@ eckit::DataHandle* RemoteStore::dataHandle(const FieldLocation& fieldLocation, c
s << remapKey;

uint32_t id = generateRequestID();

static size_t queueSize = eckit::Resource<size_t>("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200);
// auto id = retrieveMessageQueues_.find(requestID);
// ASSERT (it != retrieveMessageQueues_.end());
// it->second->emplace(std::make_pair(message, std::move(payload)));

auto entry = retrieveMessageQueues_.emplace(id, std::make_shared<MessageQueue>(queueSize));
ASSERT(entry.second);

controlWriteCheckResponse(fdb5::remote::Message::Read, id, true, encodeBuffer, s.position());

return new FDBRemoteDataHandle(id, fieldLocation.length(), retrieveMessageQueue_, controlEndpoint());
return new FDBRemoteDataHandle(id, fieldLocation.length(), entry.first->second, controlEndpoint());
}

RemoteStore& RemoteStore::get(const eckit::URI& uri) {
3 changes: 2 additions & 1 deletion src/fdb5/remote/client/RemoteStore.h
Original file line number Diff line number Diff line change
@@ -94,7 +94,8 @@ class RemoteStore : public Store, public Client {
// The shared_ptr allows this removal to be asynchronous with the actual task
// cleaning up and returning to the client.
std::map<uint32_t, std::shared_ptr<MessageQueue>> messageQueues_;
MessageQueue retrieveMessageQueue_;
std::map<uint32_t, std::shared_ptr<MessageQueue>> retrieveMessageQueues_;
// MessageQueue retrieveMessageQueue_;

std::mutex locationMutex_;
std::map<uint32_t, std::function<void(const std::unique_ptr<FieldLocation> fieldLocation)>> locations_;

0 comments on commit 5df53a6

Please sign in to comment.