Skip to content

Commit

Permalink
fix callback composition in archival
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Sep 11, 2024
1 parent 3c7f4ec commit b39ee3d
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 15 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5.11.128
5.14.0-rc1
10 changes: 6 additions & 4 deletions src/fdb5/database/ArchiveVisitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,21 @@ ArchiveVisitor::ArchiveVisitor(Archiver& owner, const Key& initialFieldKey, cons
void ArchiveVisitor::callbacks(fdb5::CatalogueWriter* catalogue, const Key& idxKey, const Key& datumKey, std::promise<std::shared_ptr<const FieldLocation>>* p, std::shared_ptr<const FieldLocation> fieldLocation) {
p->set_value(fieldLocation);
catalogue->archive(idxKey, datumKey, std::move(fieldLocation));
delete(p);
}

bool ArchiveVisitor::selectDatum(const TypedKey& datumKey, const TypedKey& fullComputedKey) {

checkMissingKeys(fullComputedKey);
const Key idxKey = catalogue()->currentIndexKey();

std::promise<std::shared_ptr<const FieldLocation>> p;
std::promise<std::shared_ptr<const FieldLocation>>* p = new std::promise<std::shared_ptr<const FieldLocation>>();

auto c = std::async(std::launch::async, [&p, this] {
callback_(initialFieldKey_, data_, size_, p.get_future());
});
callback_(initialFieldKey_, data_, size_, p->get_future());
});
store()->archive(idxKey, data_, size_,
std::bind(&ArchiveVisitor::callbacks, this, catalogue(), idxKey, datumKey.canonical(), &p, std::placeholders::_1));
std::bind(&ArchiveVisitor::callbacks, this, catalogue(), idxKey, datumKey.canonical(), p, std::placeholders::_1));
c.wait();

return true;
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/message/MessageArchiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ eckit::Length MessageArchiver::archive(eckit::DataHandle& source) {
messageToKey(msg, key);

LOG_DEBUG_LIB(LibFdb5) << "Archiving message "
<< " key: " << key_ << " data: " << msg.data() << " length:" << msg.length()
<< " key: " << key << " length:" << msg.length()
<< std::endl;

ASSERT(key.match(key_));
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/remote/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void Connection::write(remote::Message msg, bool control, uint32_t clientID, uin

// std::cout << "WRITE [" << "endpoint=" << ((control || single_) ? controlSocket() : dataSocket()).remotePort() << ",message=" << message.message << ",clientID=" << message.clientID() << ",requestID=" << message.requestID << ",payload=" << message.payloadSize << "]" << std::endl;

LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl;
LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",control=" << control << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl;

std::lock_guard<std::mutex> lock((control || single_) ? controlMutex_ : dataMutex_);
writeUnsafe(control, &message, sizeof(message));
Expand Down
1 change: 1 addition & 0 deletions src/fdb5/remote/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Connection : eckit::NonCopyable {

std::mutex controlMutex_;
std::mutex dataMutex_;

};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
10 changes: 7 additions & 3 deletions src/fdb5/remote/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ void Client::controlWriteCheckResponse(Message msg, uint32_t requestID, bool dat
data.push_back(std::make_pair(payload, payloadLength));
}

eckit::Buffer buf = connection_.controlWrite(*this, msg, requestID, dataListener, data).get();
ASSERT(buf.size() == 0);
std::future<eckit::Buffer> f = connection_.controlWrite(*this, msg, requestID, dataListener, data);
f.wait();
ASSERT(f.get().size() == 0);
}

eckit::Buffer Client::controlWriteReadResponse(Message msg, uint32_t requestID, const void* payload, uint32_t payloadLength) {
Expand All @@ -69,7 +70,10 @@ eckit::Buffer Client::controlWriteReadResponse(Message msg, uint32_t requestID,
if (payloadLength) {
data.push_back(std::make_pair(payload, payloadLength));
}
return eckit::Buffer{std::move(connection_.controlWrite(*this, msg, requestID, false, data).get())};

std::future<eckit::Buffer> f = connection_.controlWrite(*this, msg, requestID, false, data);
f.wait();
return eckit::Buffer{f.get()};
}

void Client::dataWrite(remote::Message msg, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data) {
Expand Down
3 changes: 1 addition & 2 deletions src/fdb5/remote/client/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ void ClientConnection::dataWrite(DataWriteRequest& r) {

void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data) {

static size_t maxQueueLength = eckit::Resource<size_t>("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 32);
static size_t maxQueueLength = 320; // eckit::Resource<size_t>("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 320);
auto it = clients_.find(client.clientId());
ASSERT(it != clients_.end());

Expand Down Expand Up @@ -402,7 +402,6 @@ void ClientConnection::listeningControlThreadLoop() {
promises_.erase(pp);
handled = true;
} else {

Client* client = nullptr;
{
std::lock_guard<std::mutex> lock(clientsMutex_);
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/remote/client/RemoteStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ eckit::DataHandle* RemoteStore::dataHandle(const FieldLocation& fieldLocation, c

uint32_t id = generateRequestID();

static size_t queueSize = eckit::Resource<size_t>("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200);
static size_t queueSize = 320; // eckit::Resource<size_t>("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200);
// auto id = retrieveMessageQueues_.find(requestID);
// ASSERT (it != retrieveMessageQueues_.end());
// it->second->emplace(std::make_pair(message, std::move(payload)));
Expand Down
4 changes: 2 additions & 2 deletions tests/fdb/type/test_toKey.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ CASE( "Date - string ctor - expansion" ) {
eckit::Translator<long, std::string> t;

EXPECT(key.canonicalValue("date") == t(now.yyyymmdd()));
std::cout << key.valuesToString() << std::endl;
std::cout << ("od:0001:oper:ofb:"+t(now.yyyymmdd())+":0000:mhs:3001") << std::endl;
// std::cout << key.valuesToString() << std::endl;
// std::cout << ("od:0001:oper:ofb:"+t(now.yyyymmdd())+":0000:mhs:3001") << std::endl;

EXPECT(key.valuesToString() == "od:0001:oper:ofb:"+t(now.yyyymmdd())+":0000:mhs:3001");

Expand Down

0 comments on commit b39ee3d

Please sign in to comment.