Skip to content

Commit

Permalink
fix single connection + archival ack
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Apr 2, 2024
1 parent e97ebc8 commit e8baf96
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 17 deletions.
6 changes: 5 additions & 1 deletion src/fdb5/io/FieldHandle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ void FieldHandle::openCurrent() {
read += len;
}

ASSERT(read == currentSize);
if (read != currentSize) {
std::stringstream ss;
ss << "Error reading from " << *current_ << " - read " << read << ", expected " << currentSize;
throw eckit::ReadError(ss.str());
}
current_ = new eckit::MemoryHandle(buffer_, currentSize);
current_->openForRead();
currentMemoryHandle_ = true;
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/remote/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) {

ASSERT(hdr.marker == StartMarker);
ASSERT(hdr.version == CurrentVersion);
ASSERT(hdr.control() == control);
ASSERT(single_ || hdr.control() == control);

eckit::Buffer payload{hdr.payloadSize};
if (hdr.payloadSize > 0) {
Expand Down
23 changes: 13 additions & 10 deletions src/fdb5/remote/client/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,16 @@ bool ClientConnection::connect(bool singleAttempt) {
writeControlStartupMessage();
eckit::SessionID serverSession = verifyServerStartupResponse();

// Connect to the specified data port
LOG_DEBUG_LIB(LibFdb5) << "Received data endpoint from host: " << dataEndpoint_ << std::endl;
dataClient_.connect(dataEndpoint_, fdbMaxConnectRetries, fdbConnectTimeout);
writeDataStartupMessage(serverSession);
if (!single_) {
// Connect to the specified data port
LOG_DEBUG_LIB(LibFdb5) << "Received data endpoint from host: " << dataEndpoint_ << std::endl;
dataClient_.connect(dataEndpoint_, fdbMaxConnectRetries, fdbConnectTimeout);
writeDataStartupMessage(serverSession);

listeningDataThread_ = std::thread([this] { listeningDataThreadLoop(); });
}

// And the connections are set up. Let everything start up!
listeningControlThread_ = std::thread([this] { listeningControlThreadLoop(); });
listeningDataThread_ = std::thread([this] { listeningDataThreadLoop(); });

connected_ = true;
return true;
Expand Down Expand Up @@ -181,6 +183,9 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const {
eckit::LocalConfiguration conf;
std::vector<int> remoteFieldLocationVersions = {1};
conf.set("RemoteFieldLocation", remoteFieldLocationVersions);
std::vector<int> numberOfConnections = {1,2};
conf.set("NumberOfConnections", numberOfConnections);
conf.set("PreferSingleConnection", false);
return conf;
}

Expand All @@ -202,7 +207,7 @@ void ClientConnection::dataWrite(DataWriteRequest& r) {

void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data) {

static size_t maxQueueLength = eckit::Resource<size_t>("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 2);
static size_t maxQueueLength = eckit::Resource<size_t>("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 32);
auto it = clients_.find(client.clientId());
ASSERT(it != clients_.end());

Expand Down Expand Up @@ -347,13 +352,12 @@ void ClientConnection::listeningControlThreadLoop() {
try {

MessageHeader hdr;
eckit::FixedString<4> tail;

while (true) {

eckit::Buffer payload = Connection::readControl(hdr);

LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningControlThreadLoop - got [message=" << hdr.message << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl;
LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningControlThreadLoop - got [message=" << hdr.message << ",clientID=" << hdr.clientID() << ",control=" << hdr.control() << ",requestID=" << hdr.requestID << ",payload=" << hdr.payloadSize << "]" << std::endl;

if (hdr.message == Message::Exit) {
return;
Expand Down Expand Up @@ -416,7 +420,6 @@ void ClientConnection::listeningDataThreadLoop() {
try {

MessageHeader hdr;
eckit::FixedString<4> tail;

while (true) {

Expand Down
8 changes: 6 additions & 2 deletions src/fdb5/remote/client/RemoteStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,10 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length
}
}

locations_[id] = catalogue_archive;
{
std::lock_guard<std::mutex> lock(locationMutex_);
locations_[id] = catalogue_archive;
}

Buffer keyBuffer(4096);
MemoryStream keyStream(keyBuffer);
Expand Down Expand Up @@ -365,8 +368,9 @@ bool RemoteStore::handle(Message message, bool control, uint32_t requestID, ecki
switch (message) {

case Message::Store: { // received a Field location from the remote store, can forward to the archiver for the indexing
std::lock_guard<std::mutex> lock(locationMutex_);

auto it = locations_.find(requestID);
ASSERT(it != locations_.end());
if (it != locations_.end()) {
MemoryStream s(payload);
std::unique_ptr<FieldLocation> location(eckit::Reanimator<FieldLocation>::reanimate(s));
Expand Down
1 change: 1 addition & 0 deletions src/fdb5/remote/client/RemoteStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class RemoteStore : public Store, public Client {
std::map<uint32_t, std::shared_ptr<MessageQueue>> messageQueues_;
MessageQueue retrieveMessageQueue_;

std::mutex locationMutex_;
std::map<uint32_t, std::function<void(const std::unique_ptr<FieldLocation> fieldLocation)>> locations_;
FDBStats internalStats_;
FDBStats archivalStats_;
Expand Down
6 changes: 3 additions & 3 deletions src/fdb5/remote/server/ServerConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ Handled ServerConnection::handleData(Message message, uint32_t clientID, uint32_
case Message::Blob:
case Message::MultiBlob:
queue(message, clientID, requestID, std::move(payload));
return Handled::Yes;
return Handled::Replied;

default: {
std::stringstream ss;
Expand All @@ -142,7 +142,7 @@ eckit::LocalConfiguration ServerConnection::availableFunctionality() const {
// Add to the configuration all the components that require to be versioned, as in the following example, with a vector of supported version numbers
std::vector<int> remoteFieldLocationVersions = {1};
conf.set("RemoteFieldLocation", remoteFieldLocationVersions);
std::vector<int> numberOfConnections = {2};
std::vector<int> numberOfConnections = {1,2};
conf.set("NumberOfConnections", numberOfConnections);
return conf;
}
Expand Down Expand Up @@ -480,7 +480,7 @@ void ServerConnection::handle() {
{
std::lock_guard<std::mutex> lock(handlerMutex_);
dataListener_++;
if (dataListener_ == 1) {
if (dataListener_ == 1 && !single_) {
listeningThreadData = std::thread([this] { listeningThreadLoopData(); });
}
}
Expand Down

0 comments on commit e8baf96

Please sign in to comment.