diff --git a/VERSION b/VERSION index 4c263eda1..706c863b3 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.11.128 \ No newline at end of file +5.14.0-rc1 \ No newline at end of file diff --git a/src/fdb5/database/ArchiveVisitor.cc b/src/fdb5/database/ArchiveVisitor.cc index 4ad2fcce0..61340c62f 100644 --- a/src/fdb5/database/ArchiveVisitor.cc +++ b/src/fdb5/database/ArchiveVisitor.cc @@ -26,6 +26,7 @@ ArchiveVisitor::ArchiveVisitor(Archiver& owner, const Key& initialFieldKey, cons void ArchiveVisitor::callbacks(fdb5::CatalogueWriter* catalogue, const Key& idxKey, const Key& datumKey, std::promise>* p, std::shared_ptr fieldLocation) { p->set_value(fieldLocation); catalogue->archive(idxKey, datumKey, std::move(fieldLocation)); + delete(p); } bool ArchiveVisitor::selectDatum(const TypedKey& datumKey, const TypedKey& fullComputedKey) { @@ -33,12 +34,13 @@ bool ArchiveVisitor::selectDatum(const TypedKey& datumKey, const TypedKey& fullC checkMissingKeys(fullComputedKey); const Key idxKey = catalogue()->currentIndexKey(); - std::promise> p; + std::promise>* p = new std::promise>(); + auto c = std::async(std::launch::async, [&p, this] { - callback_(initialFieldKey_, data_, size_, p.get_future()); - }); + callback_(initialFieldKey_, data_, size_, p->get_future()); + }); store()->archive(idxKey, data_, size_, - std::bind(&ArchiveVisitor::callbacks, this, catalogue(), idxKey, datumKey.canonical(), &p, std::placeholders::_1)); + std::bind(&ArchiveVisitor::callbacks, this, catalogue(), idxKey, datumKey.canonical(), p, std::placeholders::_1)); c.wait(); return true; diff --git a/src/fdb5/message/MessageArchiver.cc b/src/fdb5/message/MessageArchiver.cc index 814b8d04c..02997630d 100644 --- a/src/fdb5/message/MessageArchiver.cc +++ b/src/fdb5/message/MessageArchiver.cc @@ -187,7 +187,7 @@ eckit::Length MessageArchiver::archive(eckit::DataHandle& source) { messageToKey(msg, key); LOG_DEBUG_LIB(LibFdb5) << "Archiving message " - << " key: " << key_ << " data: " << msg.data() << " length:" << msg.length() + << " key: " << key << " length:" << msg.length() << std::endl; ASSERT(key.match(key_)); diff --git a/src/fdb5/remote/Connection.cc b/src/fdb5/remote/Connection.cc index f3df2dffc..2b92d891a 100644 --- a/src/fdb5/remote/Connection.cc +++ b/src/fdb5/remote/Connection.cc @@ -108,7 +108,7 @@ void Connection::write(remote::Message msg, bool control, uint32_t clientID, uin // std::cout << "WRITE [" << "endpoint=" << ((control || single_) ? controlSocket() : dataSocket()).remotePort() << ",message=" << message.message << ",clientID=" << message.clientID() << ",requestID=" << message.requestID << ",payload=" << message.payloadSize << "]" << std::endl; - LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl; + LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",control=" << control << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl; std::lock_guard lock((control || single_) ? controlMutex_ : dataMutex_); writeUnsafe(control, &message, sizeof(message)); diff --git a/src/fdb5/remote/Connection.h b/src/fdb5/remote/Connection.h index c82347789..8f6ba872c 100644 --- a/src/fdb5/remote/Connection.h +++ b/src/fdb5/remote/Connection.h @@ -57,6 +57,7 @@ class Connection : eckit::NonCopyable { std::mutex controlMutex_; std::mutex dataMutex_; + }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index 44386a750..e57aec747 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -55,8 +55,9 @@ void Client::controlWriteCheckResponse(Message msg, uint32_t requestID, bool dat data.push_back(std::make_pair(payload, payloadLength)); } - eckit::Buffer buf = connection_.controlWrite(*this, msg, requestID, dataListener, data).get(); - ASSERT(buf.size() == 0); + std::future f = connection_.controlWrite(*this, msg, requestID, dataListener, data); + f.wait(); + ASSERT(f.get().size() == 0); } eckit::Buffer Client::controlWriteReadResponse(Message msg, uint32_t requestID, const void* payload, uint32_t payloadLength) { @@ -69,7 +70,10 @@ eckit::Buffer Client::controlWriteReadResponse(Message msg, uint32_t requestID, if (payloadLength) { data.push_back(std::make_pair(payload, payloadLength)); } - return eckit::Buffer{std::move(connection_.controlWrite(*this, msg, requestID, false, data).get())}; + + std::future f = connection_.controlWrite(*this, msg, requestID, false, data); + f.wait(); + return eckit::Buffer{f.get()}; } void Client::dataWrite(remote::Message msg, uint32_t requestID, std::vector> data) { diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index dd82f63e4..c710f1a40 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -225,7 +225,7 @@ void ClientConnection::dataWrite(DataWriteRequest& r) { void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, std::vector> data) { - static size_t maxQueueLength = eckit::Resource("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 32); + static size_t maxQueueLength = 320; // eckit::Resource("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 320); auto it = clients_.find(client.clientId()); ASSERT(it != clients_.end()); @@ -402,7 +402,6 @@ void ClientConnection::listeningControlThreadLoop() { promises_.erase(pp); handled = true; } else { - Client* client = nullptr; { std::lock_guard lock(clientsMutex_); diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index d56e0c444..ab4c882cb 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -460,7 +460,7 @@ eckit::DataHandle* RemoteStore::dataHandle(const FieldLocation& fieldLocation, c uint32_t id = generateRequestID(); - static size_t queueSize = eckit::Resource("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200); + static size_t queueSize = 320; // eckit::Resource("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200); // auto id = retrieveMessageQueues_.find(requestID); // ASSERT (it != retrieveMessageQueues_.end()); // it->second->emplace(std::make_pair(message, std::move(payload))); diff --git a/tests/fdb/type/test_toKey.cc b/tests/fdb/type/test_toKey.cc index 122b8a11b..b6b013f91 100644 --- a/tests/fdb/type/test_toKey.cc +++ b/tests/fdb/type/test_toKey.cc @@ -272,8 +272,8 @@ CASE( "Date - string ctor - expansion" ) { eckit::Translator t; EXPECT(key.canonicalValue("date") == t(now.yyyymmdd())); - std::cout << key.valuesToString() << std::endl; - std::cout << ("od:0001:oper:ofb:"+t(now.yyyymmdd())+":0000:mhs:3001") << std::endl; + // std::cout << key.valuesToString() << std::endl; + // std::cout << ("od:0001:oper:ofb:"+t(now.yyyymmdd())+":0000:mhs:3001") << std::endl; EXPECT(key.valuesToString() == "od:0001:oper:ofb:"+t(now.yyyymmdd())+":0000:mhs:3001");