From 4fb2629ae42602d967973488abf7ef2e3f956357 Mon Sep 17 00:00:00 2001 From: Emanuele Danovaro Date: Tue, 12 Mar 2024 10:53:10 +0000 Subject: [PATCH] fixed handler termination (closed archivalQueue) --- src/fdb5/remote/FdbServer.cc | 19 ------------ src/fdb5/remote/server/ServerConnection.cc | 34 ++-------------------- 2 files changed, 2 insertions(+), 51 deletions(-) diff --git a/src/fdb5/remote/FdbServer.cc b/src/fdb5/remote/FdbServer.cc index 51f2d5e0d..d15ddc559 100644 --- a/src/fdb5/remote/FdbServer.cc +++ b/src/fdb5/remote/FdbServer.cc @@ -57,19 +57,12 @@ void FDBForker::run() { eckit::Log::info() << "FDB using Catalogue Handler" << std::endl; CatalogueHandler handler(socket_, config_); handler.handle(); - stop(); } else if (config_.getString("type", "local") == "store" || (::getenv("FDB_IS_STORE") && ::getenv("FDB_IS_STORE")[0] == '1')) { eckit::Log::info() << "FDB using Store Handler" << std::endl; StoreHandler handler(socket_, config_); handler.handle(); - stop(); } - // else { - // eckit::Log::info() << "FDB using Remote Handler" << std::endl; - // RemoteHandler handler(socket_, config_); - // handler.handle(); - // } } //---------------------------------------------------------------------------------------------------------------------- @@ -98,12 +91,8 @@ FDBServerThread::FDBServerThread(net::TCPSocket& socket, const Config& config) : config_(config) {} void FDBServerThread::run() { - std::cout << "FDBServerThread::run()" << std::endl; eckit::Log::info() << "FDB started handler thread" << std::endl; - // ServerConnection handler(socket_, config_); - // handler.handle(); - if (config_.getString("type", "local") == "catalogue" || (::getenv("FDB_IS_CAT") && ::getenv("FDB_IS_CAT")[0] == '1')) { eckit::Log::info() << "FDB using Catalogue Handler" << std::endl; CatalogueHandler handler(socket_, config_); @@ -114,14 +103,6 @@ void FDBServerThread::run() { StoreHandler handler(socket_, config_); handler.handle(); } - // else { - // eckit::Log::info() << "FDB using Remote Handler" << std::endl; - // RemoteHandler handler(socket_, config_); - // handler.handle(); - // } - - // // RemoteHandler handler(socket_, config_); - // // handler.handle(); } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 000c595a4..1a662f6d6 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -80,12 +80,6 @@ ServerConnection::~ServerConnection() { // We don't want to die before the worker threads are cleaned up waitForWorkers(); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - // And notify the client that we are done. -// eckit::Log::info() << "Sending exit message to client" << std::endl; -// // write(Message::Exit, true, 0, 0); -// write(Message::Exit, false, 0, 0); eckit::Log::info() << "Done" << std::endl; } @@ -426,32 +420,14 @@ void ServerConnection::listeningThreadLoopData() { } } - // // Trigger cleanup of the workers - // auto q = archiveQueues_.find(archiverID); - // ASSERT(q != archiveQueues_.end()); - // q->second.close(); - - // auto w = archiveFuture_.find(archiverID); - // ASSERT(w != archiveFuture_.end()); - // // Ensure worker is done - // ASSERT(w->second.valid()); - // totalArchived = worker.get(); // n.b. use of async, get() propagates any exceptions. } catch (std::exception& e) { // n.b. more general than eckit::Exception error(e.what(), hdr.clientID(), hdr.requestID); - // auto q = archiveQueues_.find(archiverID); - // if(q != archiveQueues_.end()) { - // q->second.interrupt(std::current_exception()); - // } throw; } catch (...) { error("Caught unexpected, unknown exception in retrieve worker", hdr.clientID(), hdr.requestID); - // auto q = archiveQueues_.find(archiverID); - // if(q != archiveQueues_.end()) { - // q->second.interrupt(std::current_exception()); - // } throw; } } @@ -460,9 +436,6 @@ void ServerConnection::handle() { initialiseConnections(); std::thread listeningThreadData; - // if (!single_) { - // listeningThreadData = std::thread([this] { listeningThreadLoopData(); }); - // } MessageHeader hdr; @@ -556,6 +529,8 @@ void ServerConnection::handle() { if (listeningThreadData.joinable()) { listeningThreadData.join(); } + ASSERT(archiveQueue_.empty()); + archiveQueue_.close(); } void ServerConnection::handleException(std::exception_ptr e) { @@ -600,11 +575,6 @@ void ServerConnection::archiver() { // Start archive worker thread archiveFuture_ = std::async(std::launch::async, [this] { return archiveThreadLoop(); }); } - - // // Start data reader thread if double connection and we aren't already running it - // if (!single_ && !dataReader_.valid()) { - // dataReader_ = std::async(std::launch::async, [this] { return listeningThreadLoopData(); }); - // } } void ServerConnection::waitForWorkers() {