From fe2a8574af2657613e57067a1cf19c869f387967 Mon Sep 17 00:00:00 2001 From: Rex Schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Wed, 6 Nov 2024 09:36:33 +0100 Subject: [PATCH] [core] service client connection state handling (#1780) service client "IsConnected" will return true if matching service is connected (not only registered) --- ecal/core/src/service/ecal_clientgate.cpp | 16 - ecal/core/src/service/ecal_clientgate.h | 2 - .../src/service/ecal_service_client_impl.cpp | 341 +++++++++--------- .../src/service/ecal_service_client_impl.h | 18 +- .../minimal_client/src/minimal_client.cpp | 10 +- 5 files changed, 190 insertions(+), 197 deletions(-) diff --git a/ecal/core/src/service/ecal_clientgate.cpp b/ecal/core/src/service/ecal_clientgate.cpp index 9f76498d3a..c0f35517f8 100644 --- a/ecal/core/src/service/ecal_clientgate.cpp +++ b/ecal/core/src/service/ecal_clientgate.cpp @@ -141,22 +141,6 @@ namespace eCAL } } - std::vector CClientGate::GetServiceAttr(const std::string& service_name_) - { - std::vector ret_vec; - const std::shared_lock lock(m_service_register_map_sync); - - // look for requested services - for (auto service : m_service_register_map) - { - if (service.second.sname == service_name_) - { - ret_vec.push_back(service.second); - } - } - return(ret_vec); - } - void CClientGate::GetRegistrations(Registration::SampleList& reg_sample_list_) { if (!m_created) return; diff --git a/ecal/core/src/service/ecal_clientgate.h b/ecal/core/src/service/ecal_clientgate.h index 910c0c0b77..3fd23a9be9 100644 --- a/ecal/core/src/service/ecal_clientgate.h +++ b/ecal/core/src/service/ecal_clientgate.h @@ -53,8 +53,6 @@ namespace eCAL void ApplyServiceRegistration(const Registration::Sample& ecal_sample_); - std::vector GetServiceAttr(const std::string& service_name_); - void GetRegistrations(Registration::SampleList& reg_sample_list_); protected: diff --git a/ecal/core/src/service/ecal_service_client_impl.cpp b/ecal/core/src/service/ecal_service_client_impl.cpp index 7f6ca3f75d..bd601eba7a 100644 --- a/ecal/core/src/service/ecal_service_client_impl.cpp +++ b/ecal/core/src/service/ecal_service_client_impl.cpp @@ -23,7 +23,6 @@ #include "ecal_global_accessors.h" -#include "ecal_clientgate.h" #include "ecal_service_client_impl.h" #include "ecal_service_singleton_manager.h" #include "registration/ecal_registration_provider.h" @@ -301,12 +300,6 @@ namespace eCAL { // TODO: implement timeout - if (g_clientgate() == nullptr) - { - ErrorCallback(method_name_, "Clientgate error."); - return false; - } - if (!m_created) { ErrorCallback(method_name_, "Client hasn't been created yet."); @@ -320,9 +313,6 @@ namespace eCAL return false; } - // check for new server - CheckForNewServices(); - // Copy raw request in a protocol buffer // TODO: The next version of the service protocol should omit the double-serialization (i.e. copying the binary data in a protocol buffer and then serializing that again) Service::Request request; @@ -333,53 +323,52 @@ namespace eCAL auto request_shared_ptr = std::make_shared(); SerializeToBuffer(request, *request_shared_ptr); - bool at_least_one_service_was_called (false); + bool at_least_one_service_was_called(false); // Call all services - std::vector const service_vec = g_clientgate()->GetServiceAttr(m_service_name); - for (const auto& service : service_vec) { - if (m_host_name.empty() || (m_host_name == service.hname)) + std::lock_guard const client_map_lock(m_client_map_sync); + for (const auto& client : m_client_map) { - std::lock_guard const lock(m_client_map_sync); - auto client = m_client_map.find(service.key); - if (client != m_client_map.end()) + const auto& client_data = client.second; + + if (m_host_name.empty() || (m_host_name == client_data.service_attr.hname)) { const eCAL::service::ClientResponseCallbackT response_callback - = [weak_me = std::weak_ptr(shared_from_this()), hostname = service.hname, servicename = service.sname] - (const eCAL::service::Error& response_error, const std::shared_ptr& response_) - { - auto me = weak_me.lock(); - if (!me) - { - return; - } - - std::lock_guard const lock(me->m_response_callback_sync); - - if (me->m_response_callback) - { - eCAL::SServiceResponse service_response_struct; - - service_response_struct.host_name = hostname; - service_response_struct.service_name = servicename; - - if (response_error) - { - service_response_struct.error_msg = response_error.ToString(); - service_response_struct.call_state = eCallState::call_state_failed; - service_response_struct.ret_state = 0; - } - else - { - fromSerializedProtobuf(*response_, service_response_struct); - } - - me->m_response_callback(service_response_struct); - } - }; - - if (client->second->async_call_service(request_shared_ptr, response_callback)) + = [weak_me = std::weak_ptr(shared_from_this()), hostname = client_data.service_attr.hname, servicename = client_data.service_attr.sname] + (const eCAL::service::Error& response_error, const std::shared_ptr& response_) + { + auto me = weak_me.lock(); + if (!me) + { + return; + } + + std::lock_guard const lock(me->m_response_callback_sync); + + if (me->m_response_callback) + { + eCAL::SServiceResponse service_response_struct; + + service_response_struct.host_name = hostname; + service_response_struct.service_name = servicename; + + if (response_error) + { + service_response_struct.error_msg = response_error.ToString(); + service_response_struct.call_state = eCallState::call_state_failed; + service_response_struct.ret_state = 0; + } + else + { + fromSerializedProtobuf(*response_, service_response_struct); + } + + me->m_response_callback(service_response_struct); + } + }; + + if (client.second.client_session->async_call_service(request_shared_ptr, response_callback)) { IncrementMethodCallCount(method_name_); at_least_one_service_was_called = true; @@ -396,44 +385,69 @@ namespace eCAL { if (!m_created) return false; - // check for connected clients - std::lock_guard const lock(m_connected_services_map_sync); - return !m_connected_services_map.empty(); + // check the connection state of these clients + std::lock_guard const lock(m_client_map_sync); + for (const auto& client : m_client_map) + { + if (client.second.connected) return true; + } + + // no client found that got an registration and is connected + return false; } // called by the eCAL::CClientGate to register a service void CServiceClientImpl::RegisterService(const std::string& key_, const SServiceAttr& service_) { - // check connections - std::lock_guard const lock(m_connected_services_map_sync); + // lock client map + const std::lock_guard lock(m_client_map_sync); - // is this a new connection ? - if (m_connected_services_map.find(key_) == m_connected_services_map.end()) + // add service if it's a new registration + if (m_client_map.find(key_) == m_client_map.end()) { - // call connect event - std::lock_guard const lock_eb(m_event_callback_map_sync); - auto e_iter = m_event_callback_map.find(client_event_connected); - if (e_iter != m_event_callback_map.end()) + // new client + SClient client; + + // apply attributes + client.service_attr = service_; + + // create client session + auto client_manager = eCAL::service::ServiceManager::instance()->get_client_manager(); + if (client_manager == nullptr || client_manager->is_stopped()) return; + + // Event callback (unused) + const eCAL::service::ClientSession::EventCallbackT event_callback + = [/*this, service_ = iter*/] // Using the this pointer here is extremely unsafe, as it actually forces us to manage the lifetime of this object. UPDATE: this class now inherits from shared_from_this, so when implementing this function, we can store a weak_ptr to this class. + (eCAL::service::ClientEventType /*event*/, const std::string& /*message*/) -> void + { + // I have no idea why, but for some reason the event callbacks of the actual connetions are not even used. The connect / disconnect callbacks are executed whenever a new connection is found, and not when the client has actually connected or disconnected. I am preserving the previous behavior. + }; + + // Only connect via V0 protocol / V0 port, if V1 port is not available + const auto protocol_version = (service_.tcp_port_v1 != 0 ? service_.version : 0); + const auto port_to_use = (protocol_version == 0 ? service_.tcp_port_v0 : service_.tcp_port_v1); + + // Create the client and add it to the map + const std::vector> endpoint_list + { + {service_.hname, port_to_use}, + {service_.hname + ".local", port_to_use}, // TODO: Make this configurable from the ecal.yaml + }; + client.client_session = client_manager->create_client(static_cast(protocol_version), endpoint_list, event_callback); + + // insert new client if creation was successful + if (client.client_session) { - SClientEventCallbackData sdata; - sdata.type = client_event_connected; - sdata.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - sdata.attr = service_; - (e_iter->second)(m_service_name.c_str(), &sdata); + m_client_map.insert({ key_, client }); } - // add service - m_connected_services_map[key_] = service_; } } // called by eCAL:CClientGate every second to update registration layer Registration::Sample CServiceClientImpl::GetRegistration() { - // refresh connected services map - CheckForNewServices(); - - // check for disconnected services - CheckForDisconnectedServices(); + // check service connection states + UpdateConnectionStates(); return GetRegistrationSample(); } @@ -449,9 +463,6 @@ namespace eCAL if (m_service_name.empty() || method_name_.empty()) return nullptr; - // check for new server - CheckForNewServices(); - // Copy raw request in a protocol buffer // TODO: The next version of the service protocol should omit the double-serialization (i.e. copying the binary data in a protocol buffer and then serializing that again) Service::Request request; @@ -461,8 +472,6 @@ namespace eCAL auto request_shared_ptr = std::make_shared(); SerializeToBuffer(request, *request_shared_ptr); - std::vector const service_vec = g_clientgate()->GetServiceAttr(m_service_name); - // Create a condition variable and a mutex to wait for the response // All variables are in shared pointers, as we need to pass them to the // callback function via the lambda capture. When the user uses the timeout, @@ -480,28 +489,27 @@ namespace eCAL // Each successfull call will increment the finished_service_call_count. // By comparing that with the expected_service_call_count we later know // Whether all service calls have been completed. - for (const auto& service : service_vec) { - // Only call service if host name matches - if (m_host_name.empty() || (m_host_name == service.hname)) + std::lock_guard const client_map_lock(m_client_map_sync); + for (const auto& client : m_client_map) { - // Lock mutex for iterating over client session map - std::lock_guard const client_map_lock(m_client_map_sync); + const auto& client_data = client.second; - // Find the actual client session in the map - auto client = m_client_map.find(service.key); - if (client != m_client_map.end()) + // Only call service if host name matches + if (m_host_name.empty() || (m_host_name == client_data.service_attr.hname)) { + // Find the actual client session in the map eCAL::service::ClientResponseCallbackT response_callback; { const std::lock_guard lock(*mutex); + (*expected_service_call_count)++; responses->emplace_back(); - responses->back().first = false; // If this stays false, we have a timout - responses->back().second.host_name = service.hname; - responses->back().second.service_name = service.sname; - responses->back().second.service_id = service.key; + responses->back().first = false; // If this stays false, we have a timeout + responses->back().second.host_name = client_data.service_attr.hname; + responses->back().second.service_name = client_data.service_attr.sname; + responses->back().second.service_id = client_data.service_attr.key; responses->back().second.method_name = method_name_; responses->back().second.error_msg = "Timeout"; responses->back().second.ret_state = 0; @@ -510,34 +518,34 @@ namespace eCAL // Create a response callback, that will set the response and notify the condition variable response_callback - = [mutex, condition_variable, responses, block_modifying_responses, finished_service_call_count, i = (responses->size() - 1)] - (const eCAL::service::Error& response_error, const std::shared_ptr& response_) - { - const std::lock_guard lock(*mutex); - - if (!(*block_modifying_responses)) - { - // This calback has not timeouted. This does not tell us anything about the success, though. - (*responses)[i].first = true; - - if (response_error) - { - (*responses)[i].second.error_msg = response_error.ToString(); - (*responses)[i].second.call_state = eCallState::call_state_failed; - (*responses)[i].second.ret_state = 0; - } - else - { - fromSerializedProtobuf(*response_, (*responses)[i].second); - } - } - - (*finished_service_call_count)++; - condition_variable->notify_all(); - }; + = [mutex, condition_variable, responses, block_modifying_responses, finished_service_call_count, i = (responses->size() - 1)] + (const eCAL::service::Error& response_error, const std::shared_ptr& response_) + { + const std::lock_guard lock(*mutex); + + if (!(*block_modifying_responses)) + { + // This calback has not timeouted. This does not tell us anything about the success, though. + (*responses)[i].first = true; + + if (response_error) + { + (*responses)[i].second.error_msg = response_error.ToString(); + (*responses)[i].second.call_state = eCallState::call_state_failed; + (*responses)[i].second.ret_state = 0; + } + else + { + fromSerializedProtobuf(*response_, (*responses)[i].second); + } + } + + (*finished_service_call_count)++; + condition_variable->notify_all(); + }; // Call service asynchronously - const bool call_success = client->second->async_call_service(request_shared_ptr, response_callback); + const bool call_success = client_data.client_session->async_call_service(request_shared_ptr, response_callback); if (!call_success) { @@ -546,9 +554,9 @@ namespace eCAL (*finished_service_call_count)++; // We also store an error in the response vector - responses->back().second.error_msg = "Stopped by user"; - responses->back().second.ret_state = 0; - responses->back().second.call_state = eCallState::call_state_failed; + responses->back().second.error_msg = "Stopped by user"; + responses->back().second.ret_state = 0; + responses->back().second.call_state = eCallState::call_state_failed; } else { @@ -558,8 +566,8 @@ namespace eCAL } // unlock mutex } - } + } // Lock mutex, call service asynchronously and wait for the condition variable to be notified @@ -651,7 +659,7 @@ namespace eCAL service_client.sname = m_service_name; { - const std::lock_guard lock(m_method_sync); + const std::lock_guard lock(m_method_information_map_sync); for (const auto& method_information_pair : m_method_information_map) { @@ -708,60 +716,46 @@ namespace eCAL if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(GetUnregistrationSample()); } - void CServiceClientImpl::CheckForNewServices() + void CServiceClientImpl::UpdateConnectionStates() { - if (g_clientgate() == nullptr) return; + std::lock_guard const lock(m_client_map_sync); - // check for new services - std::vector const service_vec = g_clientgate()->GetServiceAttr(m_service_name); - for (const auto& iter : service_vec) + for (auto it = m_client_map.begin(); it != m_client_map.end(); ) { - std::lock_guard const lock(m_client_map_sync); - auto client = m_client_map.find(iter.key); - if (client == m_client_map.end()) - { - auto client_manager = eCAL::service::ServiceManager::instance()->get_client_manager(); - if (client_manager == nullptr || client_manager->is_stopped()) return; + auto& client_data = it->second; - // Event callback (unused) - const eCAL::service::ClientSession::EventCallbackT event_callback - = [/*this, service_ = iter*/] // Using the this pointer here is extremely unsafe, as it actually forces us to manage the lifetime of this object. UPDATE: this class now inherits from shared_from_this, so when implementing this function, we can store a weak_ptr to this class. - (eCAL::service::ClientEventType /*event*/, const std::string& /*message*/) -> void - { - // I have no idea why, but for some reason the event callbacks of the actual connetions are not even used. The connect / disconnect callbacks are executed whenever a new connection is found, and not when the client has actually connected or disconnected. I am preserving the previous behavior. - }; + // get current state of the client session + auto state = client_data.client_session->get_state(); - // Only connect via V0 protocol / V0 port, if V1 port is not available - const auto protocol_version = (iter.tcp_port_v1 != 0 ? iter.version : 0); - const auto port_to_use = (protocol_version == 0 ? iter.tcp_port_v0 : iter.tcp_port_v1); + // Transition from not connected to connected + if (!client_data.connected && state == eCAL::service::State::CONNECTED) + { + client_data.connected = true; - // Create the client and add it to the map - const std::vector> endpoint_list - { - {iter.hname, port_to_use}, - {iter.hname + ".local", port_to_use}, // TODO: Make this configurable from the ecal.yaml - }; - const auto new_client_session = client_manager->create_client(static_cast(protocol_version), endpoint_list, event_callback); - if (new_client_session) - m_client_map[iter.key] = new_client_session; - } - } - } + // call connect event + { + std::lock_guard const lock_cb(m_event_callback_map_sync); + auto e_iter = m_event_callback_map.find(client_event_connected); + if (e_iter != m_event_callback_map.end()) + { + SClientEventCallbackData sdata; + sdata.type = client_event_connected; + sdata.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + sdata.attr = client_data.service_attr; + (e_iter->second)(m_service_name.c_str(), &sdata); + } + } - void CServiceClientImpl::CheckForDisconnectedServices() - { - std::lock_guard const lock(m_client_map_sync); - for (auto& client : m_client_map) - { - if (client.second->get_state() == eCAL::service::State::FAILED) + // move to the next client + ++it; + } + // transition from connected to not connected + else if (client_data.connected && state == eCAL::service::State::FAILED) { - std::string const service_key = client.first; + client_data.connected = false; - // is the service still in the connecting map ? - auto iter = m_connected_services_map.find(service_key); - if (iter != m_connected_services_map.end()) + // call disconnect event { - // call disconnect event std::lock_guard const lock_cb(m_event_callback_map_sync); auto e_iter = m_event_callback_map.find(client_event_disconnected); if (e_iter != m_event_callback_map.end()) @@ -769,16 +763,23 @@ namespace eCAL SClientEventCallbackData sdata; sdata.type = client_event_disconnected; sdata.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); - sdata.attr = iter->second; + sdata.attr = client_data.service_attr; (e_iter->second)(m_service_name.c_str(), &sdata); } - // remove service - m_connected_services_map.erase(iter); } + + // remove client and advance iterator + it = m_client_map.erase(it); + } + else + { + // no state change; move to the next client + ++it; } } } + void CServiceClientImpl::ErrorCallback(const std::string& method_name_, const std::string& error_message_) { std::lock_guard const lock(m_response_callback_sync); @@ -796,7 +797,7 @@ namespace eCAL void CServiceClientImpl::IncrementMethodCallCount(const std::string& method_name_) { - const std::lock_guard lock(m_method_sync); + const std::lock_guard lock(m_method_information_map_sync); if (m_method_information_map.find(method_name_) == m_method_information_map.end()) { m_method_information_map[method_name_] = SServiceMethodInformation(); diff --git a/ecal/core/src/service/ecal_service_client_impl.h b/ecal/core/src/service/ecal_service_client_impl.h index cb9059d124..b7b5d1baa7 100644 --- a/ecal/core/src/service/ecal_service_client_impl.h +++ b/ecal/core/src/service/ecal_service_client_impl.h @@ -102,15 +102,21 @@ namespace eCAL void Register(); void Unregister(); - void CheckForNewServices(); - void CheckForDisconnectedServices(); + void UpdateConnectionStates(); void ErrorCallback(const std::string &method_name_, const std::string &error_message_); void IncrementMethodCallCount(const std::string& method_name_); - using ClientMapT = std::map>; + struct SClient + { + SServiceAttr service_attr; + std::shared_ptr client_session; + bool connected = false; + }; + std::mutex m_client_map_sync; + using ClientMapT = std::map; ClientMapT m_client_map; std::mutex m_response_callback_sync; @@ -120,17 +126,13 @@ namespace eCAL using EventCallbackMapT = std::map; EventCallbackMapT m_event_callback_map; - std::mutex m_connected_services_map_sync; - using ServiceAttrMapT = std::map; - ServiceAttrMapT m_connected_services_map; - static constexpr int m_client_version = 1; std::string m_service_name; std::string m_service_id; std::string m_host_name; - std::mutex m_method_sync; + std::mutex m_method_information_map_sync; ServiceMethodInformationMapT m_method_information_map; using MethodCallCountMapT = std::map; diff --git a/ecal/samples/cpp/services/minimal_client/src/minimal_client.cpp b/ecal/samples/cpp/services/minimal_client/src/minimal_client.cpp index 720fd53394..b5c7750078 100644 --- a/ecal/samples/cpp/services/minimal_client/src/minimal_client.cpp +++ b/ecal/samples/cpp/services/minimal_client/src/minimal_client.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,14 @@ int main(int argc, char **argv) eCAL::CServiceClient minimal_client("service1", { {"echo", eCAL::SServiceMethodInformation()} }); minimal_client.AddResponseCallback(OnServiceResponse); + while (!minimal_client.IsConnected()) + { + std::cout << "Waiting for a service .." << std::endl; + + // sleep a second + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + while(eCAL::Ok()) { eCAL::SServiceResponse service_info;