From ab8fa6b9698ac56ab995c12704e0224e8627e5a8 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Mon, 18 Mar 2024 17:21:54 +0100 Subject: [PATCH 01/14] data race in reader, writer, client, server create/register/unregister logic --- ecal/core/src/readwrite/ecal_reader.cpp | 9 ++++++--- ecal/core/src/readwrite/ecal_writer.cpp | 10 ++++++---- ecal/core/src/readwrite/ecal_writer.h | 2 +- ecal/core/src/service/ecal_service_client_impl.cpp | 11 ++++++----- ecal/core/src/service/ecal_service_client_impl.h | 2 +- ecal/core/src/service/ecal_service_server_impl.cpp | 10 ++++++---- ecal/core/src/service/ecal_service_server_impl.h | 2 +- 7 files changed, 27 insertions(+), 19 deletions(-) diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index 5d1985f7bb..523740b488 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -125,7 +125,7 @@ namespace eCAL // register Register(false); - // mark as created + // and mark as created m_created = true; return(true); @@ -155,11 +155,13 @@ namespace eCAL m_event_callback_map.clear(); } - // unregister + // mark as no more created + m_created = false; + + // and unregister Unregister(); // reset defaults - m_created = false; m_clock = 0; m_message_drops = 0; @@ -230,6 +232,7 @@ namespace eCAL bool CDataReader::Register(const bool force_) { #if ECAL_CORE_REGISTRATION + if (!m_created) return(false); if(m_topic_name.empty()) return(false); // create command parameter diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index d47529e6c3..162a515d02 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -142,7 +142,7 @@ namespace eCAL // register Register(false); - // mark as created + // and mark as created m_created = true; // create udp multicast layer @@ -210,11 +210,12 @@ namespace eCAL m_event_callback_map.clear(); } - // unregister - Unregister(); - + // mark as no more created m_created = false; + // and unregister + Unregister(); + return(true); } @@ -771,6 +772,7 @@ namespace eCAL bool CDataWriter::Register(bool force_) { #if ECAL_CORE_REGISTRATION + if (!m_created) return(false); if (m_topic_name.empty()) return(false); //@Rex: why is the logic different in CDataReader??? diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index e0fd19afee..03893c33cf 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -217,6 +217,6 @@ namespace eCAL bool m_use_tdesc; int m_share_ttype; int m_share_tdesc; - bool m_created; + std::atomic m_created; }; } diff --git a/ecal/core/src/service/ecal_service_client_impl.cpp b/ecal/core/src/service/ecal_service_client_impl.cpp index ee7076c20b..7e8abb3be1 100644 --- a/ecal/core/src/service/ecal_service_client_impl.cpp +++ b/ecal/core/src/service/ecal_service_client_impl.cpp @@ -80,7 +80,7 @@ namespace eCAL // register this client Register(false); - // mark as created + // and mark as created m_created = true; return(true); @@ -108,7 +108,10 @@ namespace eCAL m_event_callback_map.clear(); } - // unregister this client + // mark as no more created + m_created = false; + + // and unregister this client Unregister(); // reset internals @@ -116,9 +119,6 @@ namespace eCAL m_service_id.clear(); m_host_name.clear(); - // mark as not created - m_created = false; - return(true); } @@ -616,6 +616,7 @@ namespace eCAL void CServiceClientImpl::Register(const bool force_) { + if (!m_created) return; if (m_service_name.empty()) return; Registration::Sample sample; diff --git a/ecal/core/src/service/ecal_service_client_impl.h b/ecal/core/src/service/ecal_service_client_impl.h index 4211c48ccb..d2bb9c60c1 100644 --- a/ecal/core/src/service/ecal_service_client_impl.h +++ b/ecal/core/src/service/ecal_service_client_impl.h @@ -123,6 +123,6 @@ namespace eCAL std::string m_service_id; std::string m_host_name; - bool m_created; + std::atomic m_created; }; } diff --git a/ecal/core/src/service/ecal_service_server_impl.cpp b/ecal/core/src/service/ecal_service_server_impl.cpp index d91e54bef5..daae9b5ac8 100644 --- a/ecal/core/src/service/ecal_service_server_impl.cpp +++ b/ecal/core/src/service/ecal_service_server_impl.cpp @@ -148,7 +148,7 @@ namespace eCAL // register this service Register(false); - // mark as created + // and mark as created m_created = true; return(true); @@ -176,7 +176,10 @@ namespace eCAL m_event_callback_map.clear(); } - // unregister this service + // mark as no more created + m_created = false; + + // and unregister this service Unregister(); // reset internals @@ -189,8 +192,6 @@ namespace eCAL m_connected_v1 = false; } - m_created = false; - return(true); } @@ -342,6 +343,7 @@ namespace eCAL void CServiceServerImpl::Register(const bool force_) { + if (!m_created) return; if (m_service_name.empty()) return; // might be zero in contruction phase diff --git a/ecal/core/src/service/ecal_service_server_impl.h b/ecal/core/src/service/ecal_service_server_impl.h index 5b35fcbef6..5d998bd599 100644 --- a/ecal/core/src/service/ecal_service_server_impl.h +++ b/ecal/core/src/service/ecal_service_server_impl.h @@ -121,7 +121,7 @@ namespace eCAL std::mutex m_event_callback_map_sync; EventCallbackMapT m_event_callback_map; - bool m_created = false; + std::atomic m_created = false; mutable std::mutex m_connected_mutex; //!< mutex protecting the m_connected_v0 and m_connected_v1 variable, as those are modified by the event callbacks in another thread. bool m_connected_v0 = false; From 57bff30f54e9862e0d5a28a226e497f349b7fcdf Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Mon, 18 Mar 2024 17:48:20 +0100 Subject: [PATCH 02/14] atomic usage fixed --- ecal/core/src/readwrite/ecal_reader.h | 1 + ecal/core/src/readwrite/ecal_writer.h | 1 + ecal/core/src/service/ecal_service_client_impl.cpp | 1 + ecal/core/src/service/ecal_service_server_impl.cpp | 5 ++++- ecal/core/src/service/ecal_service_server_impl.h | 5 +++-- 5 files changed, 10 insertions(+), 3 deletions(-) diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index d812ffb940..2aa3d10d08 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -23,6 +23,7 @@ #pragma once +#include #include #include #include diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index 03893c33cf..51b60dbc0b 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -23,6 +23,7 @@ #pragma once +#include #include #include #include diff --git a/ecal/core/src/service/ecal_service_client_impl.cpp b/ecal/core/src/service/ecal_service_client_impl.cpp index 7e8abb3be1..33f3054aef 100644 --- a/ecal/core/src/service/ecal_service_client_impl.cpp +++ b/ecal/core/src/service/ecal_service_client_impl.cpp @@ -29,6 +29,7 @@ #include "registration/ecal_registration_provider.h" #include "serialization/ecal_serialize_service.h" +#include #include #include #include diff --git a/ecal/core/src/service/ecal_service_server_impl.cpp b/ecal/core/src/service/ecal_service_server_impl.cpp index daae9b5ac8..69cdc723d0 100644 --- a/ecal/core/src/service/ecal_service_server_impl.cpp +++ b/ecal/core/src/service/ecal_service_server_impl.cpp @@ -73,7 +73,10 @@ namespace eCAL return instance; } - CServiceServerImpl::CServiceServerImpl() = default; + CServiceServerImpl::CServiceServerImpl() : + m_created(false) + { + } CServiceServerImpl::~CServiceServerImpl() { diff --git a/ecal/core/src/service/ecal_service_server_impl.h b/ecal/core/src/service/ecal_service_server_impl.h index 5d998bd599..6aee86de4f 100644 --- a/ecal/core/src/service/ecal_service_server_impl.h +++ b/ecal/core/src/service/ecal_service_server_impl.h @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -121,10 +122,10 @@ namespace eCAL std::mutex m_event_callback_map_sync; EventCallbackMapT m_event_callback_map; - std::atomic m_created = false; - mutable std::mutex m_connected_mutex; //!< mutex protecting the m_connected_v0 and m_connected_v1 variable, as those are modified by the event callbacks in another thread. bool m_connected_v0 = false; bool m_connected_v1 = false; + + std::atomic m_created; }; } From 5a6438f7c3696d9d9bccfa36b5eb197192e85f94 Mon Sep 17 00:00:00 2001 From: tftzee <49162693+rex-schilasky@users.noreply.github.com> Date: Wed, 20 Mar 2024 17:26:44 +0100 Subject: [PATCH 03/14] descgate refactored, now using registration provider and registration receiver to collect information descgate quality logic removed (will be obsolete with new id based descgate anyway) --- .../core/include/ecal/cimpl/ecal_init_cimpl.h | 7 +- ecal/core/include/ecal/ecal_init.h | 7 +- ecal/core/src/ecal_descgate.cpp | 305 ++++------- ecal/core/src/ecal_descgate.h | 57 +- ecal/core/src/ecal_globals.cpp | 2 +- .../src/monitoring/ecal_monitoring_impl.cpp | 11 +- ecal/core/src/pubsub/ecal_pubgate.cpp | 28 +- ecal/core/src/pubsub/ecal_publisher.cpp | 27 - ecal/core/src/pubsub/ecal_subgate.cpp | 29 +- ecal/core/src/pubsub/ecal_subscriber.cpp | 25 - ecal/core/src/readwrite/ecal_reader.cpp | 10 +- ecal/core/src/readwrite/ecal_writer.cpp | 10 +- .../ecal_registration_provider.cpp | 485 ++++++------------ .../registration/ecal_registration_provider.h | 52 +- .../ecal_registration_receiver.cpp | 84 +-- .../registration/ecal_registration_receiver.h | 21 +- ecal/core/src/service/ecal_clientgate.cpp | 41 +- .../src/service/ecal_service_client_impl.cpp | 10 +- ecal/core/src/service/ecal_service_server.cpp | 7 +- .../src/service/ecal_service_server_impl.cpp | 51 +- ecal/tests/CMakeLists.txt | 5 +- .../src/clientserver_getservices.cpp | 4 +- 22 files changed, 395 insertions(+), 883 deletions(-) diff --git a/ecal/core/include/ecal/cimpl/ecal_init_cimpl.h b/ecal/core/include/ecal/cimpl/ecal_init_cimpl.h index 490fccb9bf..42384c6c8a 100644 --- a/ecal/core/include/ecal/cimpl/ecal_init_cimpl.h +++ b/ecal/core/include/ecal/cimpl/ecal_init_cimpl.h @@ -31,21 +31,18 @@ #define eCAL_Init_Monitoring 0x08 /*!< Initialize Monitoring API */ #define eCAL_Init_Logging 0x10 /*!< Initialize Logging API */ #define eCAL_Init_TimeSync 0x20 /*!< Initialize Time API */ -#define eCAL_Init_ProcessReg 0x80 /*!< Initialize Process Registration API */ #define eCAL_Init_All (eCAL_Init_Publisher \ | eCAL_Init_Subscriber \ | eCAL_Init_Service \ | eCAL_Init_Monitoring \ | eCAL_Init_Logging \ - | eCAL_Init_TimeSync \ - | eCAL_Init_ProcessReg) /*!< Initialize complete eCAL API */ + | eCAL_Init_TimeSync) /*!< Initialize complete eCAL API */ #define eCAL_Init_Default (eCAL_Init_Publisher \ | eCAL_Init_Subscriber \ | eCAL_Init_Service \ | eCAL_Init_Logging \ - | eCAL_Init_TimeSync \ - | eCAL_Init_ProcessReg) /*!< Initialize default eCAL API */ + | eCAL_Init_TimeSync) /*!< Initialize default eCAL API */ #endif /*ecal_init_cimpl_h_included*/ diff --git a/ecal/core/include/ecal/ecal_init.h b/ecal/core/include/ecal/ecal_init.h index 87f7108406..d3bb6c8b5b 100644 --- a/ecal/core/include/ecal/ecal_init.h +++ b/ecal/core/include/ecal/ecal_init.h @@ -34,22 +34,19 @@ namespace eCAL static const unsigned int Monitoring = 0x008; static const unsigned int Logging = 0x010; static const unsigned int TimeSync = 0x020; - static const unsigned int ProcessReg = 0x080; static const unsigned int All = Publisher | Subscriber | Service | Monitoring | Logging - | TimeSync - | ProcessReg; + | TimeSync; static const unsigned int Default = Publisher | Subscriber | Service | Logging - | TimeSync - | ProcessReg; + | TimeSync; static const unsigned int None = 0x000; } diff --git a/ecal/core/src/ecal_descgate.cpp b/ecal/core/src/ecal_descgate.cpp index ee38a078ea..1b1eb0a426 100644 --- a/ecal/core/src/ecal_descgate.cpp +++ b/ecal/core/src/ecal_descgate.cpp @@ -24,14 +24,8 @@ #include #include +#include "ecal_globals.h" #include "ecal_descgate.h" -#include -#include -#include -#include -#include -#include -#include namespace eCAL { @@ -44,150 +38,20 @@ namespace eCAL void CDescGate::Create() { +#if ECAL_CORE_REGISTRATION + // utilize registration provider and receiver to get descriptions + g_registration_provider()->SetCustomApplySampleCallback("descgate", [this](const auto& sample_) {this->ApplySample(sample_, tl_none); }); + g_registration_receiver()->SetCustomApplySampleCallback("descgate", [this](const auto& sample_) {this->ApplySample(sample_, tl_none); }); +#endif } void CDescGate::Destroy() { - } - - bool CDescGate::ApplyTopicDescription(const std::string& topic_name_, const SDataTypeInformation& topic_info_, const QualityFlags description_quality_) - { - const std::unique_lock lock(m_topic_info_map.sync); - m_topic_info_map.map->remove_deprecated(); - - const auto topic_info_it = m_topic_info_map.map->find(topic_name_); - - // new element (no need to check anything, just add it) - if(topic_info_it == m_topic_info_map.map->end()) - { - // create a new topic entry - STopicInfoQuality& topic_info = (*m_topic_info_map.map)[topic_name_]; - topic_info.info = topic_info_; - topic_info.quality = description_quality_; - return true; - } - - // we do not use the [] operator here to not update the timestamp - // by accessing the map entry - // - // a topic with the same name but different type name or different description - // should NOT update the timestamp of an existing entry - // - // otherwise there could be a scenario where a "lower quality topic" would keep a - // "higher quality topic" alive (even it is no more existing) - STopicInfoQuality topic_info = (*topic_info_it).second; - - // first let's check whether the current information has a higher quality - // if it has a higher quality, we overwrite it - if (description_quality_ > topic_info.quality) - { - // overwrite attributes - topic_info.info = topic_info_; - topic_info.quality = description_quality_; - - // update attributes and return - (*m_topic_info_map.map)[topic_name_] = topic_info; - return true; - } - - // this is the same topic (topic name, topic type name, topic type description) - if (topic_info.info == topic_info_) - { - // update timestamp (by just accessing the entry) and return - (*m_topic_info_map.map)[topic_name_] = topic_info; - return false; - } - - // topic type name or topic description differ but we logged this before - if (topic_info.type_missmatch_logged) - { - return false; - } - - // topic type name or topic description differ and this is not logged yet - // so we log the error and update the entry one time - bool update_topic_info(false); - - // topic type name differs - // we log the error and update the entry one time - if (!topic_info_.encoding.empty() - && !topic_info.info.encoding.empty() - && (topic_info.info.encoding != topic_info_.encoding) - ) - { - std::string tencoding1 = topic_info.info.encoding; - std::string tencoding2 = topic_info_.encoding; - std::replace(tencoding1.begin(), tencoding1.end(), '\0', '?'); - std::replace(tencoding1.begin(), tencoding1.end(), '\t', '?'); - std::replace(tencoding2.begin(), tencoding2.end(), '\0', '?'); - std::replace(tencoding2.begin(), tencoding2.end(), '\t', '?'); - std::string msg = "eCAL Pub/Sub encoding mismatch for topic "; - msg += topic_name_; - msg += " (\'"; - msg += tencoding1; - msg += "\' <> \'"; - msg += tencoding2; - msg += "\')"; - eCAL::Logging::Log(log_level_warning, msg); - - // mark as logged - topic_info.type_missmatch_logged = true; - // and update its attributes - update_topic_info = true; - } - - // topic type name differs - // we log the error and update the entry one time - if (!topic_info_.name.empty() - && !topic_info.info.name.empty() - && (topic_info.info.name != topic_info_.name) - ) - { - std::string ttype1 = topic_info.info.name; - std::string ttype2 = topic_info_.name; - std::replace(ttype1.begin(), ttype1.end(), '\0', '?'); - std::replace(ttype1.begin(), ttype1.end(), '\t', '?'); - std::replace(ttype2.begin(), ttype2.end(), '\0', '?'); - std::replace(ttype2.begin(), ttype2.end(), '\t', '?'); - std::string msg = "eCAL Pub/Sub type mismatch for topic "; - msg += topic_name_; - msg += " (\'"; - msg += ttype1; - msg += "\' <> \'"; - msg += ttype2; - msg += "\')"; - eCAL::Logging::Log(log_level_warning, msg); - - // mark as logged - topic_info.type_missmatch_logged = true; - // and update its attributes - update_topic_info = true; - } - - // topic type description differs - // we log the error and update the entry one time - if ( !topic_info_.descriptor.empty() - && !topic_info.info.descriptor.empty() - && (topic_info.info.descriptor != topic_info_.descriptor) - ) - { - std::string msg = "eCAL Pub/Sub description mismatch for topic "; - msg += topic_name_; - eCAL::Logging::Log(log_level_warning, msg); - - // mark as logged - topic_info.type_missmatch_logged = true; - // and update its attributes - update_topic_info = true; - } - - // update topic info attributes - if (update_topic_info) - { - (*m_topic_info_map.map)[topic_name_] = topic_info; - } - - return false; +#if ECAL_CORE_REGISTRATION + // stop registration provider and receiver utilization to get descriptions + g_registration_provider()->RemCustomApplySampleCallback("descgate"); + g_registration_receiver()->RemCustomApplySampleCallback("descgate"); +#endif } void CDescGate::GetTopics(std::unordered_map& topic_info_map_) @@ -200,7 +64,7 @@ namespace eCAL for (const auto& topic_info : (*m_topic_info_map.map)) { - map.emplace(topic_info.first, topic_info.second.info); + map.emplace(topic_info.first, topic_info.second); } topic_info_map_.swap(map); } @@ -228,49 +92,9 @@ namespace eCAL const auto topic_info_it = m_topic_info_map.map->find(topic_name_); if (topic_info_it == m_topic_info_map.map->end()) return(false); - topic_info_ = (*topic_info_it).second.info; + topic_info_ = (*topic_info_it).second; return(true); } - - bool CDescGate::ApplyServiceDescription(const std::string& service_name_ - , const std::string& method_name_ - , const SDataTypeInformation& request_type_information_ - , const SDataTypeInformation& response_type_information_ - , const QualityFlags description_quality_) - { - std::tuple service_method_tuple = std::make_tuple(service_name_, method_name_); - - const std::lock_guard lock(m_service_info_map.sync); - m_service_info_map.map->remove_deprecated(); - - auto service_info_map_it = m_service_info_map.map->find(service_method_tuple); - if (service_info_map_it == m_service_info_map.map->end()) - { - // create a new service entry - SServiceMethodInfoQuality& service_info = (*m_service_info_map.map)[service_method_tuple]; - service_info.info.request_type = request_type_information_; - service_info.info.response_type = response_type_information_; - service_info.quality = description_quality_; - return true; - } - - // let's check whether the current information has a higher quality - // if it has a higher quality, we overwrite it - bool ret_value(false); - SServiceMethodInfoQuality service_info = (*service_info_map_it).second; - if (description_quality_ > service_info.quality) - { - service_info.info.request_type = request_type_information_; - service_info.info.response_type = response_type_information_; - service_info.quality = description_quality_; - ret_value = true; - } - - // update service entry (and its timestamp) - (*m_service_info_map.map)[service_method_tuple] = service_info; - - return ret_value; - } void CDescGate::GetServices(std::map, SServiceMethodInformation>& service_info_map_) { @@ -281,7 +105,7 @@ namespace eCAL for (const auto& service_info : (*m_service_info_map.map)) { - map.emplace(service_info.first, service_info.second.info); + map.emplace(service_info.first, service_info.second); } service_info_map_.swap(map); } @@ -309,8 +133,8 @@ namespace eCAL auto service_info_map_it = m_service_info_map.map->find(service_method_tuple); if (service_info_map_it == m_service_info_map.map->end()) return false; - req_type_name_ = (*service_info_map_it).second.info.request_type.name; - resp_type_name_ = (*service_info_map_it).second.info.response_type.name; + req_type_name_ = (*service_info_map_it).second.request_type.name; + resp_type_name_ = (*service_info_map_it).second.response_type.name; return true; } @@ -323,8 +147,101 @@ namespace eCAL auto service_info_map_it = m_service_info_map.map->find(service_method_tuple); if (service_info_map_it == m_service_info_map.map->end()) return false; - req_type_desc_ = (*service_info_map_it).second.info.request_type.descriptor; - resp_type_desc_ = (*service_info_map_it).second.info.response_type.descriptor; + req_type_desc_ = (*service_info_map_it).second.request_type.descriptor; + resp_type_desc_ = (*service_info_map_it).second.response_type.descriptor; + return true; + } + + bool CDescGate::ApplySample(const Registration::Sample& sample_, eTLayerType /*layer_*/) + { + switch (sample_.cmd_type) + { + case bct_none: + case bct_set_sample: + case bct_reg_process: + case bct_unreg_process: + break; + case bct_reg_service: + { + for (const auto& method : sample_.service.methods) + { + SDataTypeInformation request_type; + request_type.name = method.req_type; + request_type.descriptor = method.req_desc; + + SDataTypeInformation response_type{}; + response_type.name = method.resp_type; + response_type.descriptor = method.resp_desc; + + ApplyServiceDescription(sample_.service.sname, method.mname, request_type, response_type); + } + } + break; + case bct_unreg_service: + break; + case bct_reg_client: + //for (const auto& method : sample_.client.methods) + //{ + // SDataTypeInformation request_type; + // request_type.name = method.req_type; + // request_type.descriptor = method.req_desc; + + // SDataTypeInformation response_type{}; + // response_type.name = method.resp_type; + // response_type.descriptor = method.resp_desc; + + // ApplyClientDescription(sample_.service.sname, method.mname, request_type, response_type); + //} + break; + case bct_unreg_client: + break; + case bct_reg_publisher: + ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype); + break; + case bct_unreg_publisher: + break; + case bct_reg_subscriber: + ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype); + break; + case bct_unreg_subscriber: + break; + default: + { + Logging::Log(log_level_debug1, "CDescGate::ApplySample : unknown sample type"); + } + break; + } + + return true; + } + + bool CDescGate::ApplyTopicDescription(const std::string& topic_name_, const SDataTypeInformation& topic_info_) + { + const std::unique_lock lock(m_topic_info_map.sync); + m_topic_info_map.map->remove_deprecated(); + + // update topic entry (and its timestamp) + (*m_topic_info_map.map)[topic_name_] = topic_info_; + return true; + } + + bool CDescGate::ApplyServiceDescription(const std::string& service_name_ + , const std::string& method_name_ + , const SDataTypeInformation& request_type_information_ + , const SDataTypeInformation& response_type_information_) + { + std::tuple service_method_tuple = std::make_tuple(service_name_, method_name_); + + const std::lock_guard lock(m_service_info_map.sync); + m_service_info_map.map->remove_deprecated(); + + // aggregate service information + SServiceMethodInformation service_info; + service_info.request_type = request_type_information_; + service_info.response_type = response_type_information_; + + // update service entry (and its timestamp) + (*m_service_info_map.map)[service_method_tuple] = service_info; return true; } } diff --git a/ecal/core/src/ecal_descgate.h b/ecal/core/src/ecal_descgate.h index a6a35bcd32..e710a74fca 100644 --- a/ecal/core/src/ecal_descgate.h +++ b/ecal/core/src/ecal_descgate.h @@ -29,6 +29,7 @@ #include "ecal_global_accessors.h" #include "ecal_def.h" +#include "serialization/ecal_serialize_sample_registration.h" #include "util/ecal_expmap.h" #include @@ -44,21 +45,6 @@ namespace eCAL { class CDescGate { - public: - // Enumeration of quality bits used for detecting how good a topic information is. - enum class QualityFlags : int - { - NO_QUALITY = 0, //!< Special value for initialization - - DESCRIPTION_AVAILABLE = 0x1 << 4, //!< Having a descriptor at all is the most important thing - INFO_COMES_FROM_CORRECT_ENTITY = 0x1 << 3, //!< The information comes from the current topic/service - //!< and has not been borrowed from another emtity, like read by a subscriber from a publisher - INFO_COMES_FROM_PRODUCER = 0x1 << 2, //!< A descriptor coming from the producer (like a publisher) is better than one from a - //!< consumer (like a subscriber), as we assume that the publisher knows best what he is publishing - INFO_COMES_FROM_THIS_PROCESS = 0x1 << 1, //!< We prefer descriptors from the current process - TYPE_AVAILABLE = 0x1 << 0, //!< Having information about the type's name available is nice but not that important to us - }; - public: CDescGate(); ~CDescGate(); @@ -66,41 +52,28 @@ namespace eCAL void Create(); void Destroy(); - bool ApplyTopicDescription(const std::string& topic_name_, - const SDataTypeInformation& topic_info_, - QualityFlags description_quality_); - void GetTopics(std::unordered_map& topic_info_map_); void GetTopicNames(std::vector& topic_names_); bool GetDataTypeInformation(const std::string& topic_name_, SDataTypeInformation& topic_info_); - bool ApplyServiceDescription(const std::string& service_name_, - const std::string& method_name_, - const SDataTypeInformation& request_type_information_, - const SDataTypeInformation& response_type_information_, - QualityFlags description_quality_); - void GetServices(std::map, SServiceMethodInformation>& service_info_map_); void GetServiceNames(std::vector>& service_method_names_); bool GetServiceTypeNames(const std::string& service_name_, const std::string& method_name_, std::string& req_type_name_, std::string& resp_type_name_); bool GetServiceDescription(const std::string& service_name_, const std::string& method_name_, std::string& req_type_desc_, std::string& resp_type_desc_); protected: - struct STopicInfoQuality - { - SDataTypeInformation info; //!< Topic info struct with type encoding, name and descriptor. - QualityFlags quality = QualityFlags::NO_QUALITY; //!< QualityFlags to determine whether we may overwrite the current data with better one. E.g. we prefer the description sent by a publisher over one sent by a subscriber. - bool type_missmatch_logged = false; //!< Whether we have already logged a type-missmatch - }; + bool ApplySample(const Registration::Sample& sample_, eTLayerType layer_); + + bool ApplyTopicDescription(const std::string& topic_name_, + const SDataTypeInformation& topic_info_); - struct SServiceMethodInfoQuality - { - SServiceMethodInformation info; //!< Service info struct with type names and descriptors for request and response. - QualityFlags quality = QualityFlags::NO_QUALITY; //!< The Quality of the Info - }; + bool ApplyServiceDescription(const std::string& service_name_, + const std::string& method_name_, + const SDataTypeInformation& request_type_information_, + const SDataTypeInformation& response_type_information_); // key: topic name | value: topic (type/desc), quality - using TopicInfoMap = eCAL::Util::CExpMap; //!< Map containing { TopicName -> (Type, Description, Quality) } mapping of all topics that are currently known + using TopicInfoMap = eCAL::Util::CExpMap; //!< Map containing { TopicName -> (Type, Description) } mapping of all topics that are currently known struct STopicInfoMap { explicit STopicInfoMap(const std::chrono::milliseconds& timeout_) : @@ -113,7 +86,7 @@ namespace eCAL STopicInfoMap m_topic_info_map; // key: tup | value: request (type/desc), response (type/desc), quality - using ServiceMethodInfoMap = eCAL::Util::CExpMap, SServiceMethodInfoQuality>; //!< Map { (ServiceName, MethodName) -> ( (ReqType, ReqDescription), (RespType, RespDescription), Quality ) } mapping of all currently known services + using ServiceMethodInfoMap = eCAL::Util::CExpMap, SServiceMethodInformation>; //!< Map { (ServiceName, MethodName) -> ( (ReqType, ReqDescription), (RespType, RespDescription) ) } mapping of all currently known services struct SServiceMethodInfoMap { explicit SServiceMethodInfoMap(const std::chrono::milliseconds& timeout_) : @@ -125,12 +98,4 @@ namespace eCAL }; SServiceMethodInfoMap m_service_info_map; }; - - constexpr inline CDescGate::QualityFlags operator~ (CDescGate::QualityFlags a) { return static_cast( ~static_cast::type>(a) ); } - constexpr inline CDescGate::QualityFlags operator| (CDescGate::QualityFlags a, CDescGate::QualityFlags b) { return static_cast( static_cast::type>(a) | static_cast::type>(b) ); } - constexpr inline CDescGate::QualityFlags operator& (CDescGate::QualityFlags a, CDescGate::QualityFlags b) { return static_cast( static_cast::type>(a) & static_cast::type>(b) ); } - constexpr inline CDescGate::QualityFlags operator^ (CDescGate::QualityFlags a, CDescGate::QualityFlags b) { return static_cast( static_cast::type>(a) ^ static_cast::type>(b) ); } - inline CDescGate::QualityFlags& operator|= (CDescGate::QualityFlags& a, CDescGate::QualityFlags b) { return reinterpret_cast( reinterpret_cast::type&>(a) |= static_cast::type>(b) ); } - inline CDescGate::QualityFlags& operator&= (CDescGate::QualityFlags& a, CDescGate::QualityFlags b) { return reinterpret_cast( reinterpret_cast::type&>(a) &= static_cast::type>(b) ); } - inline CDescGate::QualityFlags& operator^= (CDescGate::QualityFlags& a, CDescGate::QualityFlags b) { return reinterpret_cast( reinterpret_cast::type&>(a) ^= static_cast::type>(b) ); } } diff --git a/ecal/core/src/ecal_globals.cpp b/ecal/core/src/ecal_globals.cpp index 1effb47c26..634fdfc3b2 100644 --- a/ecal/core/src/ecal_globals.cpp +++ b/ecal/core/src/ecal_globals.cpp @@ -229,7 +229,7 @@ namespace eCAL //if (config_instance) config_instance->Create(); if (log_instance && ((components_ & Init::Logging) != 0u)) log_instance->Create(); #if ECAL_CORE_REGISTRATION - if (registration_provider_instance) registration_provider_instance->Create(true, true, (components_ & Init::ProcessReg) != 0x0); + if (registration_provider_instance) registration_provider_instance->Create(); if (registration_receiver_instance) registration_receiver_instance->Create(); #endif if (descgate_instance) descgate_instance->Create(); diff --git a/ecal/core/src/monitoring/ecal_monitoring_impl.cpp b/ecal/core/src/monitoring/ecal_monitoring_impl.cpp index 976def495a..fd5561c38f 100644 --- a/ecal/core/src/monitoring/ecal_monitoring_impl.cpp +++ b/ecal/core/src/monitoring/ecal_monitoring_impl.cpp @@ -30,7 +30,9 @@ #include +#include "registration/ecal_registration_provider.h" #include "registration/ecal_registration_receiver.h" + #include "serialization/ecal_serialize_monitoring.h" @@ -56,8 +58,9 @@ namespace eCAL // get name of this host m_host_name = Process::GetHostName(); - // utilize registration receiver to enrich monitor information - g_registration_receiver()->SetCustomApplySampleCallback([this](const auto& sample_){this->ApplySample(sample_, tl_none);}); + // utilize registration provider and receiver to enrich monitor information + g_registration_provider()->SetCustomApplySampleCallback("monitoring", [this](const auto& sample_) {this->ApplySample(sample_, tl_none); }); + g_registration_receiver()->SetCustomApplySampleCallback("monitoring", [this](const auto& sample_){this->ApplySample(sample_, tl_none);}); // setup blacklist and whitelist filter strings# m_topic_filter_excl_s = Config::GetMonitoringFilterExcludeList(); @@ -71,7 +74,9 @@ namespace eCAL void CMonitoringImpl::Destroy() { - g_registration_receiver()->RemCustomApplySampleCallback(); + // stop registration provider and receiver utilization to enrich monitor information + g_registration_provider()->RemCustomApplySampleCallback("monitoring"); + g_registration_receiver()->RemCustomApplySampleCallback("monitoring"); m_init = false; } diff --git a/ecal/core/src/pubsub/ecal_pubgate.cpp b/ecal/core/src/pubsub/ecal_pubgate.cpp index 96e1c12db2..e285cef2c6 100644 --- a/ecal/core/src/pubsub/ecal_pubgate.cpp +++ b/ecal/core/src/pubsub/ecal_pubgate.cpp @@ -31,27 +31,6 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyTopicDescription(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_) - { - if (eCAL::g_descgate() != nullptr) - { - // Calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!topic_info_.descriptor.empty()) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; - - return eCAL::g_descgate()->ApplyTopicDescription(topic_name_, topic_info_, quality); - } - return false; - } -} - namespace eCAL { ////////////////////////////////////////////////////////////////// @@ -153,9 +132,6 @@ namespace eCAL } #endif - // store description - ApplyTopicDescription(topic_name, topic_information); - // register local subscriber const std::shared_lock lock(m_topic_name_datawriter_sync); auto res = m_topic_name_datawriter_map.equal_range(topic_name); @@ -208,9 +184,6 @@ namespace eCAL } #endif - // store description - ApplyTopicDescription(topic_name, topic_information); - // register external subscriber const std::shared_lock lock(m_topic_name_datawriter_sync); auto res = m_topic_name_datawriter_map.equal_range(topic_name); @@ -248,6 +221,7 @@ namespace eCAL const std::shared_lock lock(m_topic_name_datawriter_sync); for (const auto& iter : m_topic_name_datawriter_map) { + // force data writer to (re)register itself on registration provider iter.second->RefreshRegistration(); } } diff --git a/ecal/core/src/pubsub/ecal_publisher.cpp b/ecal/core/src/pubsub/ecal_publisher.cpp index 6e3d456d34..9a917e8983 100644 --- a/ecal/core/src/pubsub/ecal_publisher.cpp +++ b/ecal/core/src/pubsub/ecal_publisher.cpp @@ -33,29 +33,6 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyTopicDescription(const std::string& topic_name_, const eCAL::SDataTypeInformation& data_type_info_) - { - if (eCAL::g_descgate() != nullptr) - { - // Calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!data_type_info_.name.empty() || !data_type_info_.encoding.empty()) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!data_type_info_.descriptor.empty()) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_THIS_PROCESS; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_PRODUCER; - - return eCAL::g_descgate()->ApplyTopicDescription(topic_name_, data_type_info_, quality); - } - return false; - } -} - namespace eCAL { CPublisher::CPublisher() : @@ -143,9 +120,6 @@ namespace eCAL // register publisher gateway (for publisher memory file and event name) g_pubgate()->Register(topic_name_, m_datawriter); - // register to description gateway for type / description checking - ApplyTopicDescription(topic_name_, data_type_info_); - // we made it :-) m_created = true; @@ -192,7 +166,6 @@ namespace eCAL bool CPublisher::SetDataTypeInformation(const SDataTypeInformation& data_type_info_) { if (m_datawriter == nullptr) return false; - ApplyTopicDescription(m_datawriter->GetTopicName(), data_type_info_); return m_datawriter->SetDataTypeInformation(data_type_info_); } diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index bee815d8a9..5c13504954 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -35,28 +35,6 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyTopicDescription(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_) - { - if (eCAL::g_descgate() != nullptr) - { - // Calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!topic_info_.descriptor.empty()) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_PRODUCER; - - return eCAL::g_descgate()->ApplyTopicDescription(topic_name_, topic_info_, quality); - } - return false; - } -} - namespace eCAL { ////////////////////////////////////////////////////////////////// @@ -243,9 +221,8 @@ namespace eCAL const std::string& topic_name = ecal_topic.tname; if (topic_name.empty()) return; - // store description + // get topic id const std::string& topic_id = ecal_topic.tid; - ApplyTopicDescription(topic_name, ecal_topic.tdatatype); // get process id const std::string process_id = std::to_string(ecal_sample_.topic.pid); @@ -275,9 +252,6 @@ namespace eCAL const std::string& topic_id = ecal_topic.tid; const std::string process_id = std::to_string(ecal_sample_.topic.pid); - // store description - ApplyTopicDescription(topic_name, ecal_topic.tdatatype); - // unregister local publisher const std::shared_lock lock(m_topic_name_datareader_sync); auto res = m_topic_name_datareader_map.equal_range(topic_name); @@ -340,6 +314,7 @@ namespace eCAL const std::shared_lock lock(m_topic_name_datareader_sync); for (const auto& iter : m_topic_name_datareader_map) { + // force data reader to (re)register itself on registration provider iter.second->RefreshRegistration(); } } diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index aa06516dc5..898af14e3b 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -32,28 +32,6 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyTopicDescription(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_) - { - if (eCAL::g_descgate() != nullptr) - { - // Calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!topic_info_.descriptor.empty()) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_THIS_PROCESS; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; - - return eCAL::g_descgate()->ApplyTopicDescription(topic_name_, topic_info_, quality); - } - return false; - } -} - namespace eCAL { CSubscriber::CSubscriber() : @@ -138,9 +116,6 @@ namespace eCAL // register to subscriber gateway for publisher memory file receive thread g_subgate()->Register(topic_name_, m_datareader); - // register to description gateway for type / description checking - ApplyTopicDescription(topic_name_, topic_info_); - // we made it :-) m_created = true; diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index 523740b488..c91ce3aa5b 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -122,12 +122,12 @@ namespace eCAL // start transport layers SubscribeToLayers(); + // mark as created + m_created = true; + // register Register(false); - // and mark as created - m_created = true; - return(true); } @@ -304,7 +304,7 @@ namespace eCAL ecal_reg_sample_topic.connections_ext = 0; // register subscriber - if(g_registration_provider() != nullptr) g_registration_provider()->RegisterTopic(m_topic_name, m_topic_id, ecal_reg_sample, force_); + if(g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_); #ifndef NDEBUG // log it Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::DoRegister"); @@ -332,7 +332,7 @@ namespace eCAL ecal_reg_sample_topic.uname = Process::GetUnitName(); // unregister subscriber - if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterTopic(m_topic_name, m_topic_id, ecal_unreg_sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, true); #ifndef NDEBUG // log it Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister"); diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index 162a515d02..bf75a868b3 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -139,12 +139,12 @@ namespace eCAL // allow to share topic description m_use_tdesc = Config::IsTopicDescriptionSharingEnabled(); + // mark as created + m_created = true; + // register Register(false); - // and mark as created - m_created = true; - // create udp multicast layer SetUseUdpMC(m_writer.udp_mc_mode.requested); @@ -867,7 +867,7 @@ namespace eCAL ecal_reg_sample_topic.connections_ext = static_cast(ext_connections); // register publisher - if (g_registration_provider() != nullptr) g_registration_provider()->RegisterTopic(m_topic_name, m_topic_id, ecal_reg_sample, force_); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_); #ifndef NDEBUG // log it @@ -897,7 +897,7 @@ namespace eCAL ecal_reg_sample_topic.uname = Process::GetUnitName(); // unregister publisher - if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterTopic(m_topic_name, m_topic_id, ecal_unreg_sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, true); #ifndef NDEBUG // log it diff --git a/ecal/core/src/registration/ecal_registration_provider.cpp b/ecal/core/src/registration/ecal_registration_provider.cpp index a8d2ccf904..793fb285e3 100644 --- a/ecal/core/src/registration/ecal_registration_provider.cpp +++ b/ecal/core/src/registration/ecal_registration_provider.cpp @@ -43,58 +43,11 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyTopicDescription(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_, bool topic_is_a_publisher_) - { - if (eCAL::g_descgate() != nullptr) - { - // calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!topic_info_.encoding.empty() || !topic_info_.name.empty()) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!topic_info_.descriptor.empty()) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - if (topic_is_a_publisher_) - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_PRODUCER; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_THIS_PROCESS; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; - // update description - return eCAL::g_descgate()->ApplyTopicDescription(topic_name_, topic_info_, quality); - } - return false; - } - - // TODO: remove me with new CDescGate - bool ApplyServiceDescription(const std::string& service_name_, const std::string& method_name_, - const eCAL::SDataTypeInformation& request_type_information_, - const eCAL::SDataTypeInformation& response_type_information_) - { - if (eCAL::g_descgate() != nullptr) - { - // Calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!(request_type_information_.name.empty() && response_type_information_.name.empty())) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!(request_type_information_.descriptor.empty() && response_type_information_.descriptor.empty())) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_THIS_PROCESS; - - return eCAL::g_descgate()->ApplyServiceDescription(service_name_, method_name_, request_type_information_, response_type_information_, quality); - } - return false; - } -} - namespace eCAL { std::atomic CRegistrationProvider::m_created; CRegistrationProvider::CRegistrationProvider() : - m_reg_topics(false), - m_reg_services(false), - m_reg_process(false), m_use_registration_udp(false), m_use_registration_shm(false) { @@ -105,14 +58,10 @@ namespace eCAL Destroy(); } - void CRegistrationProvider::Create(bool topics_, bool services_, bool process_) + void CRegistrationProvider::Create() { if(m_created) return; - m_reg_topics = topics_; - m_reg_services = services_; - m_reg_process = process_; - // send registration to shared memory and to udp m_use_registration_udp = !Config::Experimental::IsNetworkMonitoringDisabled(); m_use_registration_shm = Config::Experimental::IsShmMonitoringEnabled(); @@ -155,16 +104,26 @@ namespace eCAL // stop cyclic registration thread m_reg_sample_snd_thread->stop(); - // send one last (un)registration message to the world - // thank you and goodbye :-) - UnregisterProcess(); + // prepare unregistration + ClearSampleList(); + AddSample2SampleList(GetProcessUnregisterSample()); + + if (m_use_registration_udp) + { + // send process unregistration sample over udp + SendSampleList2UDP(); - // destroy registration sample sender - m_reg_sample_snd.reset(); + // destroy udp registration sample sender + m_reg_sample_snd.reset(); + } #if ECAL_CORE_REGISTRATION_SHM if (m_use_registration_shm) { + // broadcast process unregistration sample over shm + SendSampleList2SHM(); + + // destroy shm registration sample writer m_memfile_broadcast_writer.Unbind(); m_memfile_broadcast.Destroy(); } @@ -173,143 +132,171 @@ namespace eCAL m_created = false; } - bool CRegistrationProvider::RegisterTopic(const std::string& topic_name_, const std::string& topic_id_, const Registration::Sample& ecal_sample_, const bool force_) + bool CRegistrationProvider::ApplySample(const Registration::Sample& sample_, const bool force_) { - if (!m_created) return(false); - if (!m_reg_topics) return(false); + if (!m_created) return(false); - const std::lock_guard lock(m_topics_map_sync); - m_topics_map[topic_name_ + topic_id_] = ecal_sample_; - if(force_) + // forward all registration samples to outside "customer" (e.g. monitoring, descgate) { - RegisterProcess(); - // apply registration sample - ApplySample(topic_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + for (const auto& iter : m_callback_custom_apply_sample_map) + { + iter.second(sample_); + } } - return(true); - } - - bool CRegistrationProvider::UnregisterTopic(const std::string& topic_name_, const std::string& topic_id_, const Registration::Sample& ecal_sample_, const bool force_) - { - if(!m_created) return(false); + // update sample list + AddSample2SampleList(sample_); + // if registration is forced if (force_) { - // apply unregistration sample - ApplySample(topic_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); - } + // send single registration sample over udp + SendSample2UDP(sample_); - SampleMapT::iterator iter; - const std::lock_guard lock(m_topics_map_sync); - iter = m_topics_map.find(topic_name_ + topic_id_); - if(iter != m_topics_map.end()) - { - m_topics_map.erase(iter); - return(true); + // broadcast (updated) sample list over shm + SendSampleList2SHM(); } - return(false); + return(true); } - bool CRegistrationProvider::RegisterServer(const std::string& service_name_, const std::string& service_id_, const Registration::Sample& ecal_sample_, bool force_) + void CRegistrationProvider::SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_) { - if (!m_created) return(false); - if (!m_reg_services) return(false); + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + m_callback_custom_apply_sample_map[customer_] = callback_; + } - const std::lock_guard lock(m_server_map_sync); - m_server_map[service_name_ + service_id_] = ecal_sample_; - if (force_) + void CRegistrationProvider::RemCustomApplySampleCallback(const std::string& customer_) + { + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + auto iter = m_callback_custom_apply_sample_map.find(customer_); + if (iter != m_callback_custom_apply_sample_map.end()) { - RegisterProcess(); - // apply registration sample - ApplySample(service_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); + m_callback_custom_apply_sample_map.erase(iter); } + } - return(true); + void CRegistrationProvider::AddSample2SampleList(const Registration::Sample& sample_) + { + const std::lock_guard lock(m_sample_list_mtx); + m_sample_list.samples.push_back(sample_); } - bool CRegistrationProvider::UnregisterServer(const std::string& service_name_, const std::string& service_id_, const Registration::Sample& ecal_sample_, bool force_) + bool CRegistrationProvider::SendSample2UDP(const Registration::Sample& sample_) { if (!m_created) return(false); - if (force_) + if (m_use_registration_udp && m_reg_sample_snd) { - // apply unregistration sample - ApplySample(service_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); - } + // lock sample buffer + const std::lock_guard lock(m_sample_buffer_mtx); - SampleMapT::iterator iter; - const std::lock_guard lock(m_server_map_sync); - iter = m_server_map.find(service_name_ + service_id_); - if (iter != m_server_map.end()) - { - m_server_map.erase(iter); - return(true); + // serialize single sample + if (SerializeToBuffer(sample_, m_sample_buffer)) + { + // send single sample over udp + return m_reg_sample_snd->Send("reg_sample", m_sample_buffer) != 0; + } } - return(false); } - bool CRegistrationProvider::RegisterClient(const std::string& client_name_, const std::string& client_id_, const Registration::Sample& ecal_sample_, bool force_) + bool CRegistrationProvider::SendSampleList2UDP() { - if (!m_created) return(false); - if (!m_reg_services) return(false); + if (!m_created) return(false); + bool return_value{ true }; - const std::lock_guard lock(m_client_map_sync); - m_client_map[client_name_ + client_id_] = ecal_sample_; - if (force_) + // lock sample list + const std::lock_guard lock(m_sample_list_mtx); + + // send all (single) samples over udp + if (m_use_registration_udp && m_reg_sample_snd) { - RegisterProcess(); - // apply registration sample - ApplySample(client_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); + for (const auto& sample : m_sample_list.samples) + { + return_value &= SendSample2UDP(sample); + } } - return(true); + return return_value; } - bool CRegistrationProvider::UnregisterClient(const std::string& client_name_, const std::string& client_id_, const Registration::Sample& ecal_sample_, bool force_) + bool CRegistrationProvider::SendSampleList2SHM() { if (!m_created) return(false); - if (force_) - { - // apply unregistration sample - ApplySample(client_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); - } +#if ECAL_CORE_REGISTRATION_SHM + bool return_value{ true }; - SampleMapT::iterator iter; - const std::lock_guard lock(m_client_map_sync); - iter = m_client_map.find(client_name_ + client_id_); - if (iter != m_client_map.end()) + // send sample list over shm + if (m_use_registration_shm) { - m_client_map.erase(iter); - return(true); + // lock sample list + const std::lock_guard lock(m_sample_list_mtx); + + // serialize whole sample list + if (SerializeToBuffer(m_sample_list, m_sample_list_buffer)) + { + if (!m_sample_list_buffer.empty()) + { + // broadcast sample list over shm + return_value &= m_memfile_broadcast_writer.Write(m_sample_list_buffer.data(), m_sample_list_buffer.size()); + } + } } + return return_value; +#else + return false; +#endif + } - return(false); + void CRegistrationProvider::ClearSampleList() + { + // lock sample list + const std::lock_guard lock(m_sample_list_mtx); + // clear sample list + m_sample_list.samples.clear(); } - bool CRegistrationProvider::RegisterProcess() + void CRegistrationProvider::RegisterSendThread() { - if (!m_created) return(false); - if (!m_reg_process) return(false); +#if ECAL_CORE_SUBSCRIBER + // refresh subscriber registration + if (g_subgate() != nullptr) g_subgate()->RefreshRegistrations(); +#endif + +#if ECAL_CORE_PUBLISHER + // refresh publisher registration + if (g_pubgate() != nullptr) g_pubgate()->RefreshRegistrations(); +#endif + +#if ECAL_CORE_SERVICE + // refresh server registration + if (g_servicegate() != nullptr) g_servicegate()->RefreshRegistrations(); + + // refresh client registration + if (g_clientgate() != nullptr) g_clientgate()->RefreshRegistrations(); +#endif + + // send out sample list over udp + SendSampleList2UDP(); + + // broadcast sample list over shm + SendSampleList2SHM(); + + // clear registration sample list + ClearSampleList(); + + // add process registration sample to internal sample list as first sample (for next registration loop) + AddSample2SampleList(GetProcessRegisterSample()); + } + Registration::Sample CRegistrationProvider::GetProcessRegisterSample() + { Registration::Sample process_sample; - process_sample.cmd_type = bct_reg_process; - auto& process_sample_process = process_sample.process; + process_sample.cmd_type = bct_reg_process; + auto& process_sample_process = process_sample.process; process_sample_process.hname = Process::GetHostName(); process_sample_process.hgname = Process::GetHostGroupName(); process_sample_process.pid = Process::GetProcessID(); @@ -362,205 +349,19 @@ namespace eCAL process_sample_process.ecal_runtime_version = GetVersionString(); - // apply registration sample - const bool return_value = ApplySample(Process::GetHostName(), process_sample); - - return return_value; - } - - bool CRegistrationProvider::UnregisterProcess() - { - if (!m_created) return(false); - if (!m_reg_process) return(false); - - Registration::Sample process_sample; - process_sample.cmd_type = bct_unreg_process; - auto& process_sample_process = process_sample.process; - process_sample_process.hname = Process::GetHostName(); - process_sample_process.pid = Process::GetProcessID(); - process_sample_process.pname = Process::GetProcessName(); - process_sample_process.uname = Process::GetUnitName(); - - // apply unregistration sample - const bool return_value = ApplySample(Process::GetHostName(), process_sample); - - return return_value; - } - - bool CRegistrationProvider::RegisterTopics() - { - if (!m_created) return(false); - if (!m_reg_topics) return(false); - - bool return_value {true}; - const std::lock_guard lock(m_topics_map_sync); - for(SampleMapT::const_iterator iter = m_topics_map.begin(); iter != m_topics_map.end(); ++iter) - { - ////////////////////////////////////////////// - // update description - ////////////////////////////////////////////// - // read attributes - const std::string topic_name(iter->second.topic.tname); - const bool topic_is_a_publisher(iter->second.cmd_type == eCAL::bct_reg_publisher); - - SDataTypeInformation topic_info; - const auto& topic_datatype = iter->second.topic.tdatatype; - topic_info.encoding = topic_datatype.encoding; - topic_info.name = topic_datatype.name; - topic_info.descriptor = topic_datatype.descriptor; - - ApplyTopicDescription(topic_name, topic_info, topic_is_a_publisher); - - ////////////////////////////////////////////// - // send sample to registration layer - ////////////////////////////////////////////// - return_value &= ApplySample(iter->second.topic.tname, iter->second); - } - - return return_value; - } - - bool CRegistrationProvider::RegisterServer() - { - if (!m_created) return(false); - if (!m_reg_services) return(false); - - bool return_value {true}; - const std::lock_guard lock(m_server_map_sync); - for (SampleMapT::const_iterator iter = m_server_map.begin(); iter != m_server_map.end(); ++iter) - { - ////////////////////////////////////////////// - // update description - ////////////////////////////////////////////// - const auto& ecal_sample_service = iter->second.service; - for (const auto& method : ecal_sample_service.methods) - { - SDataTypeInformation request_type; - request_type.name = method.req_type; - request_type.descriptor = method.req_desc; - - SDataTypeInformation response_type; - response_type.name = method.resp_type; - response_type.descriptor = method.resp_desc; - - ApplyServiceDescription(ecal_sample_service.sname, method.mname, request_type, response_type); - } - - ////////////////////////////////////////////// - // send sample to registration layer - ////////////////////////////////////////////// - return_value &= ApplySample(iter->second.service.sname, iter->second); - } - - return return_value; + return process_sample; } - bool CRegistrationProvider::RegisterClient() + Registration::Sample CRegistrationProvider::GetProcessUnregisterSample() { - if (!m_created) return(false); - if (!m_reg_services) return(false); - - bool return_value {true}; - const std::lock_guard lock(m_client_map_sync); - for (SampleMapT::const_iterator iter = m_client_map.begin(); iter != m_client_map.end(); ++iter) - { - // apply registration sample - return_value &= ApplySample(iter->second.client.sname, iter->second); - } - - return return_value; - } - - bool CRegistrationProvider::ApplySample(const std::string& sample_name_, const Registration::Sample& sample_) - { - if(!m_created) return(false); - - bool return_value {true}; - - if (m_use_registration_udp && m_reg_sample_snd) - { - const std::lock_guard lock(m_sample_buffer_sync); - if (SerializeToBuffer(sample_, m_sample_buffer)) - { - return_value &= (m_reg_sample_snd->Send(sample_name_, m_sample_buffer) != 0); - } - } - -#if ECAL_CORE_REGISTRATION_SHM - if (m_use_registration_shm) - { - const std::lock_guard lock(m_sample_list_sync); - m_sample_list.samples.push_back(sample_); - } -#endif - - return return_value; - } - - bool CRegistrationProvider::SendSampleList(bool reset_sample_list_) - { - if (!m_created) return(false); - bool return_value{ true }; - -#if ECAL_CORE_REGISTRATION_SHM - if (m_use_registration_shm) - { - { - const std::lock_guard lock(m_sample_list_sync); - if (SerializeToBuffer(m_sample_list, m_sample_list_buffer)) - { - if (reset_sample_list_) - { - m_sample_list.samples.clear(); - } - } - } - - if (!m_sample_list_buffer.empty()) - { - return_value &= m_memfile_broadcast_writer.Write(m_sample_list_buffer.data(), m_sample_list_buffer.size()); - } - } -#endif - - return return_value; - } - - void CRegistrationProvider::RegisterSendThread() - { -#if ECAL_CORE_SUBSCRIBER - // refresh subscriber registration - if (g_subgate() != nullptr) g_subgate()->RefreshRegistrations(); -#endif - -#if ECAL_CORE_PUBLISHER - // refresh publisher registration - if (g_pubgate() != nullptr) g_pubgate()->RefreshRegistrations(); -#endif - -#if ECAL_CORE_SERVICE - // refresh server registration - if (g_servicegate() != nullptr) g_servicegate()->RefreshRegistrations(); - - // refresh client registration - if (g_clientgate() != nullptr) g_clientgate()->RefreshRegistrations(); -#endif - - // register process - RegisterProcess(); - -#if ECAL_CORE_SERVICE - // register server - RegisterServer(); - - // register clients - RegisterClient(); -#endif - - // register topics - RegisterTopics(); + Registration::Sample process_sample; + process_sample.cmd_type = bct_unreg_process; + auto& process_sample_process = process_sample.process; + process_sample_process.hname = Process::GetHostName(); + process_sample_process.pid = Process::GetProcessID(); + process_sample_process.pname = Process::GetProcessName(); + process_sample_process.uname = Process::GetUnitName(); - // write sample list to shared memory - SendSampleList(); + return process_sample; } } diff --git a/ecal/core/src/registration/ecal_registration_provider.h b/ecal/core/src/registration/ecal_registration_provider.h index a630151420..e073874348 100644 --- a/ecal/core/src/registration/ecal_registration_provider.h +++ b/ecal/core/src/registration/ecal_registration_provider.h @@ -43,7 +43,6 @@ #include #include #include -#include #include namespace eCAL @@ -54,64 +53,49 @@ namespace eCAL CRegistrationProvider(); ~CRegistrationProvider(); - void Create(bool topics_, bool services_, bool process_); + void Create(); void Destroy(); - bool RegisterTopic(const std::string& topic_name_, const std::string& topic_id_, const Registration::Sample& ecal_sample_, bool force_); - bool UnregisterTopic(const std::string& topic_name_, const std::string& topic_id_, const Registration::Sample& ecal_sample_, bool force_); + bool ApplySample(const Registration::Sample& sample_, bool force_); - bool RegisterServer(const std::string& service_name_, const std::string& service_id_, const Registration::Sample& ecal_sample_, bool force_); - bool UnregisterServer(const std::string& service_name_, const std::string& service_id_, const Registration::Sample& ecal_sample_, bool force_); - - bool RegisterClient(const std::string& client_name_, const std::string& client_id_, const Registration::Sample& ecal_sample_, bool force_); - bool UnregisterClient(const std::string& client_name_, const std::string& client_id_, const Registration::Sample& ecal_sample_, bool force_); + using ApplySampleCallbackT = std::function; + void SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_); + void RemCustomApplySampleCallback(const std::string& customer_); protected: - bool RegisterProcess(); - bool UnregisterProcess(); - - bool RegisterTopics(); + void AddSample2SampleList(const Registration::Sample& sample_); + bool SendSample2UDP(const Registration::Sample& sample_); - bool RegisterServer(); - bool RegisterClient(); + bool SendSampleList2UDP(); + bool SendSampleList2SHM(); + void ClearSampleList(); - bool ApplySample(const std::string& sample_name_, const eCAL::Registration::Sample& sample_); - void RegisterSendThread(); - bool SendSampleList(bool reset_sample_list_ = true); + Registration::Sample GetProcessRegisterSample(); + Registration::Sample GetProcessUnregisterSample(); static std::atomic m_created; - bool m_reg_topics; - bool m_reg_services; - bool m_reg_process; std::shared_ptr m_reg_sample_snd; std::shared_ptr m_reg_sample_snd_thread; - std::mutex m_sample_buffer_sync; + std::mutex m_sample_buffer_mtx; std::vector m_sample_buffer; - using SampleMapT = std::unordered_map; - std::mutex m_topics_map_sync; - SampleMapT m_topics_map; - - std::mutex m_server_map_sync; - SampleMapT m_server_map; - - std::mutex m_client_map_sync; - SampleMapT m_client_map; + std::mutex m_sample_list_mtx; + Registration::SampleList m_sample_list; #if ECAL_CORE_REGISTRATION_SHM - std::mutex m_sample_list_sync; - Registration::SampleList m_sample_list; std::vector m_sample_list_buffer; - CMemoryFileBroadcast m_memfile_broadcast; CMemoryFileBroadcastWriter m_memfile_broadcast_writer; #endif bool m_use_registration_udp; bool m_use_registration_shm; + + std::mutex m_callback_custom_apply_sample_map_mtx; + std::map m_callback_custom_apply_sample_map; }; } diff --git a/ecal/core/src/registration/ecal_registration_receiver.cpp b/ecal/core/src/registration/ecal_registration_receiver.cpp index f69643861d..2bade0bea8 100644 --- a/ecal/core/src/registration/ecal_registration_receiver.cpp +++ b/ecal/core/src/registration/ecal_registration_receiver.cpp @@ -56,7 +56,6 @@ namespace eCAL m_callback_process(nullptr), m_use_registration_udp(false), m_use_registration_shm(false), - m_callback_custom_apply_sample([](const auto&) {}), m_host_group_name(Process::GetHostGroupName()) { } @@ -147,20 +146,23 @@ namespace eCAL { if(!m_created) return false; - Registration::Sample ecal_sample; - if (!DeserializeFromBuffer(serialized_sample_data_, serialized_sample_size_, ecal_sample)) return false; + Registration::Sample sample; + if (!DeserializeFromBuffer(serialized_sample_data_, serialized_sample_size_, sample)) return false; - return ApplySample(ecal_sample); + return ApplySample(sample); } - bool CRegistrationReceiver::ApplySample(const Registration::Sample& ecal_sample_) + bool CRegistrationReceiver::ApplySample(const Registration::Sample& sample_) { if (!m_created) return false; - // forward all registration samples to outside "customer" (e.g. Monitoring) + // forward all registration samples to outside "customer" (e.g. monitoring, descgate) { - const std::lock_guard lock(m_callback_custom_apply_sample_mtx); - m_callback_custom_apply_sample(ecal_sample_); + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + for (const auto& iter : m_callback_custom_apply_sample_map) + { + iter.second(sample_); + } } std::string reg_sample; @@ -171,10 +173,10 @@ namespace eCAL || m_callback_process ) { - SerializeToBuffer(ecal_sample_, reg_sample); + SerializeToBuffer(sample_, reg_sample); } - switch (ecal_sample_.cmd_type) + switch (sample_.cmd_type) { case bct_none: case bct_set_sample: @@ -186,7 +188,7 @@ namespace eCAL break; #if ECAL_CORE_SERVICE case bct_reg_service: - if (g_clientgate() != nullptr) g_clientgate()->ApplyServiceRegistration(ecal_sample_); + if (g_clientgate() != nullptr) g_clientgate()->ApplyServiceRegistration(sample_); if (m_callback_service) m_callback_service(reg_sample.c_str(), static_cast(reg_sample.size())); break; case bct_unreg_service: @@ -201,12 +203,12 @@ namespace eCAL break; case bct_reg_subscriber: case bct_unreg_subscriber: - ApplySubscriberRegistration(ecal_sample_); + ApplySubscriberRegistration(sample_); if (m_callback_sub) m_callback_sub(reg_sample.c_str(), static_cast(reg_sample.size())); break; case bct_reg_publisher: case bct_unreg_publisher: - ApplyPublisherRegistration(ecal_sample_); + ApplyPublisherRegistration(sample_); if (m_callback_pub) m_callback_pub(reg_sample.c_str(), static_cast(reg_sample.size())); break; default: @@ -267,24 +269,24 @@ namespace eCAL } } - void CRegistrationReceiver::ApplySubscriberRegistration(const Registration::Sample& ecal_sample_) + void CRegistrationReceiver::ApplySubscriberRegistration(const Registration::Sample& sample_) { #if ECAL_CORE_PUBLISHER // process registrations from same host group - if (IsHostGroupMember(ecal_sample_)) + if (IsHostGroupMember(sample_)) { // do not register local entities, only if loop back flag is set true - if (m_loopback || (ecal_sample_.topic.pid != Process::GetProcessID())) + if (m_loopback || (sample_.topic.pid != Process::GetProcessID())) { if (g_pubgate() != nullptr) { - switch (ecal_sample_.cmd_type) + switch (sample_.cmd_type) { case bct_reg_subscriber: - g_pubgate()->ApplyLocSubRegistration(ecal_sample_); + g_pubgate()->ApplyLocSubRegistration(sample_); break; case bct_unreg_subscriber: - g_pubgate()->ApplyLocSubUnregistration(ecal_sample_); + g_pubgate()->ApplyLocSubUnregistration(sample_); break; default: break; @@ -299,13 +301,13 @@ namespace eCAL { if (g_pubgate() != nullptr) { - switch (ecal_sample_.cmd_type) + switch (sample_.cmd_type) { case bct_reg_subscriber: - g_pubgate()->ApplyExtSubRegistration(ecal_sample_); + g_pubgate()->ApplyExtSubRegistration(sample_); break; case bct_unreg_subscriber: - g_pubgate()->ApplyExtSubUnregistration(ecal_sample_); + g_pubgate()->ApplyExtSubUnregistration(sample_); break; default: break; @@ -316,24 +318,24 @@ namespace eCAL #endif } - void CRegistrationReceiver::ApplyPublisherRegistration(const Registration::Sample& ecal_sample_) + void CRegistrationReceiver::ApplyPublisherRegistration(const Registration::Sample& sample_) { #if ECAL_CORE_SUBSCRIBER // process registrations from same host group - if (IsHostGroupMember(ecal_sample_)) + if (IsHostGroupMember(sample_)) { // do not register local entities, only if loop back flag is set true - if (m_loopback || (ecal_sample_.topic.pid != Process::GetProcessID())) + if (m_loopback || (sample_.topic.pid != Process::GetProcessID())) { if (g_subgate() != nullptr) { - switch (ecal_sample_.cmd_type) + switch (sample_.cmd_type) { case bct_reg_publisher: - g_subgate()->ApplyLocPubRegistration(ecal_sample_); + g_subgate()->ApplyLocPubRegistration(sample_); break; case bct_unreg_publisher: - g_subgate()->ApplyLocPubUnregistration(ecal_sample_); + g_subgate()->ApplyLocPubUnregistration(sample_); break; default: break; @@ -348,13 +350,13 @@ namespace eCAL { if (g_subgate() != nullptr) { - switch (ecal_sample_.cmd_type) + switch (sample_.cmd_type) { case bct_reg_publisher: - g_subgate()->ApplyExtPubRegistration(ecal_sample_); + g_subgate()->ApplyExtPubRegistration(sample_); break; case bct_unreg_publisher: - g_subgate()->ApplyExtPubUnregistration(ecal_sample_); + g_subgate()->ApplyExtPubUnregistration(sample_); break; default: break; @@ -365,9 +367,9 @@ namespace eCAL #endif } - bool CRegistrationReceiver::IsHostGroupMember(const Registration::Sample& ecal_sample_) + bool CRegistrationReceiver::IsHostGroupMember(const Registration::Sample& sample_) { - const std::string& sample_host_group_name = ecal_sample_.topic.hgname.empty() ? ecal_sample_.topic.hname : ecal_sample_.topic.hgname; + const std::string& sample_host_group_name = sample_.topic.hgname.empty() ? sample_.topic.hname : sample_.topic.hgname; if (sample_host_group_name.empty() || m_host_group_name.empty()) return false; @@ -377,15 +379,19 @@ namespace eCAL return true; } - void CRegistrationReceiver::SetCustomApplySampleCallback(const ApplySampleCallbackT& callback_) + void CRegistrationReceiver::SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_) { - const std::lock_guard lock(m_callback_custom_apply_sample_mtx); - m_callback_custom_apply_sample = callback_; + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + m_callback_custom_apply_sample_map[customer_] = callback_; } - void CRegistrationReceiver::RemCustomApplySampleCallback() + void CRegistrationReceiver::RemCustomApplySampleCallback(const std::string& customer_) { - const std::lock_guard lock(m_callback_custom_apply_sample_mtx); - m_callback_custom_apply_sample = [](const auto&) {}; + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + auto iter = m_callback_custom_apply_sample_map.find(customer_); + if(iter != m_callback_custom_apply_sample_map.end()) + { + m_callback_custom_apply_sample_map.erase(iter); + } } } diff --git a/ecal/core/src/registration/ecal_registration_receiver.h b/ecal/core/src/registration/ecal_registration_receiver.h index a3e4e2d4e2..1d64032da3 100644 --- a/ecal/core/src/registration/ecal_registration_receiver.h +++ b/ecal/core/src/registration/ecal_registration_receiver.h @@ -41,6 +41,7 @@ #include #include +#include #include #include #include @@ -60,22 +61,22 @@ namespace eCAL void EnableLoopback(bool state_); bool HasSample(const std::string& /*sample_name_*/) { return(true); }; - bool ApplySerializedSample(const char* serialized_sample_data_, size_t serialized_sample_size_); - - bool ApplySample(const Registration::Sample& ecal_sample_); + bool ApplySample(const Registration::Sample& sample_); bool AddRegistrationCallback(enum eCAL_Registration_Event event_, const RegistrationCallbackT& callback_); bool RemRegistrationCallback(enum eCAL_Registration_Event event_); using ApplySampleCallbackT = std::function; - void SetCustomApplySampleCallback(const ApplySampleCallbackT& callback_); - void RemCustomApplySampleCallback(); + void SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_); + void RemCustomApplySampleCallback(const std::string& customer_); protected: - void ApplySubscriberRegistration(const eCAL::Registration::Sample& ecal_sample_); - void ApplyPublisherRegistration(const eCAL::Registration::Sample& ecal_sample_); + bool ApplySerializedSample(const char* serialized_sample_data_, size_t serialized_sample_size_); + + void ApplySubscriberRegistration(const eCAL::Registration::Sample& sample_); + void ApplyPublisherRegistration(const eCAL::Registration::Sample& sample_); - bool IsHostGroupMember(const eCAL::Registration::Sample& ecal_sample_); + bool IsHostGroupMember(const eCAL::Registration::Sample& sample_); static std::atomic m_created; bool m_network; @@ -99,8 +100,8 @@ namespace eCAL bool m_use_registration_udp; bool m_use_registration_shm; - std::mutex m_callback_custom_apply_sample_mtx; - ApplySampleCallbackT m_callback_custom_apply_sample; + std::mutex m_callback_custom_apply_sample_map_mtx; + std::map m_callback_custom_apply_sample_map; std::string m_host_group_name; }; diff --git a/ecal/core/src/service/ecal_clientgate.cpp b/ecal/core/src/service/ecal_clientgate.cpp index fa6f2d41cd..9bdc535404 100644 --- a/ecal/core/src/service/ecal_clientgate.cpp +++ b/ecal/core/src/service/ecal_clientgate.cpp @@ -22,36 +22,14 @@ **/ #include "ecal_clientgate.h" -#include "ecal_descgate.h" #include "service/ecal_service_client_impl.h" + #include #include #include #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyServiceDescription(const std::string& service_name_, const std::string& method_name_, - const eCAL::SDataTypeInformation& request_type_information_, - const eCAL::SDataTypeInformation& response_type_information_) - { - if (eCAL::g_descgate() != nullptr) - { - // calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!(request_type_information_.name.empty() && response_type_information_.name.empty())) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!(request_type_information_.descriptor.empty() && response_type_information_.descriptor.empty())) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - - return eCAL::g_descgate()->ApplyServiceDescription(service_name_, method_name_, request_type_information_, response_type_information_, quality); - } - return false; - } -} - namespace eCAL { ////////////////////////////////////////////////////////////////// @@ -135,20 +113,6 @@ namespace eCAL service.tcp_port_v0 = static_cast(ecal_sample_service.tcp_port_v0); service.tcp_port_v1 = static_cast(ecal_sample_service.tcp_port_v1); - // store description - for (const auto& method : ecal_sample_service.methods) - { - SDataTypeInformation request_type; - request_type.name = method.req_type; - request_type.descriptor = method.req_desc; - - SDataTypeInformation response_type{}; - response_type.name = method.resp_type; - response_type.descriptor = method.resp_desc; - - ApplyServiceDescription(ecal_sample_service.sname, method.mname, request_type, response_type); - } - // create service key service.key = service.sname + ":" + service.sid + "@" + std::to_string(service.pid) + "@" + service.hname; @@ -196,10 +160,11 @@ namespace eCAL { if (!m_created) return; - // refresh service registrations + // refresh client registrations const std::shared_lock lock(m_client_set_sync); for (auto *iter : m_client_set) { + // force client to (re)register itself on registration provider iter->RefreshRegistration(); } } diff --git a/ecal/core/src/service/ecal_service_client_impl.cpp b/ecal/core/src/service/ecal_service_client_impl.cpp index 33f3054aef..6a0898b950 100644 --- a/ecal/core/src/service/ecal_service_client_impl.cpp +++ b/ecal/core/src/service/ecal_service_client_impl.cpp @@ -78,12 +78,12 @@ namespace eCAL counter << std::chrono::steady_clock::now().time_since_epoch().count(); m_service_id = counter.str(); + // mark as created + m_created = true; + // register this client Register(false); - // and mark as created - m_created = true; - return(true); } @@ -632,7 +632,7 @@ namespace eCAL service_client.sid = m_service_id; // register entity - if (g_registration_provider() != nullptr) g_registration_provider()->RegisterClient(m_service_name, m_service_id, sample, force_); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, force_); // refresh connected services map CheckForNewServices(); @@ -685,7 +685,7 @@ namespace eCAL service_client.version = m_client_version; // unregister entity - if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterClient(m_service_name, m_service_id, sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, true); } void CServiceClientImpl::CheckForNewServices() diff --git a/ecal/core/src/service/ecal_service_server.cpp b/ecal/core/src/service/ecal_service_server.cpp index b76e88a899..1257c1088c 100644 --- a/ecal/core/src/service/ecal_service_server.cpp +++ b/ecal/core/src/service/ecal_service_server.cpp @@ -116,12 +116,15 @@ namespace eCAL bool CServiceServer::AddDescription(const std::string& method_, const std::string& req_type_, const std::string& req_desc_, const std::string& resp_type_, const std::string& resp_desc_) { if (!m_created) return false; + SDataTypeInformation request_type_information; - request_type_information.name = req_type_; + request_type_information.name = req_type_; request_type_information.descriptor = req_desc_; + SDataTypeInformation response_type_information; - response_type_information.name = resp_type_; + response_type_information.name = resp_type_; response_type_information.descriptor = resp_desc_; + return m_service_server_impl->AddDescription(method_, request_type_information, response_type_information); } diff --git a/ecal/core/src/service/ecal_service_server_impl.cpp b/ecal/core/src/service/ecal_service_server_impl.cpp index 69cdc723d0..4691d8deaa 100644 --- a/ecal/core/src/service/ecal_service_server_impl.cpp +++ b/ecal/core/src/service/ecal_service_server_impl.cpp @@ -24,7 +24,6 @@ #include #include "registration/ecal_registration_provider.h" -#include "ecal_descgate.h" #include "ecal_global_accessors.h" #include "ecal_service_server_impl.h" #include "ecal_service_singleton_manager.h" @@ -37,28 +36,6 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyServiceDescription(const std::string& service_name_, const std::string& method_name_, - const eCAL::SDataTypeInformation& request_type_information_, - const eCAL::SDataTypeInformation& response_type_information_) - { - if (eCAL::g_descgate() != nullptr) - { - // calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!(request_type_information_.name.empty() && response_type_information_.name.empty())) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!(request_type_information_.descriptor.empty() && response_type_information_.descriptor.empty())) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - - return eCAL::g_descgate()->ApplyServiceDescription(service_name_, method_name_, request_type_information_, response_type_information_, quality); - } - return false; - } -} - namespace eCAL { std::shared_ptr CServiceServerImpl::CreateInstance() @@ -148,12 +125,12 @@ namespace eCAL m_tcp_server_v1 = server_manager->create_server(1, 0, service_callback, true, event_callback); } + // mark as created + m_created = true; + // register this service Register(false); - // and mark as created - m_created = true; - return(true); } @@ -223,8 +200,10 @@ namespace eCAL } } - // update descgate infos - return ApplyServiceDescription(m_service_name, method_, request_type_information_, response_type_information_); + // register this service + Register(false); + + return true; } // add callback function for server method calls @@ -259,16 +238,8 @@ namespace eCAL } } - SDataTypeInformation request_type_information; - request_type_information.name = req_type_; - request_type_information.descriptor = req_desc; - - SDataTypeInformation response_type_information; - response_type_information.name = resp_type_; - response_type_information.descriptor = resp_desc; - - // update descgate infos - ApplyServiceDescription(m_service_name, method_, request_type_information, response_type_information); + // register this service + Register(false); return true; } @@ -387,7 +358,7 @@ namespace eCAL } // register entity - if (g_registration_provider() != nullptr) g_registration_provider()->RegisterServer(m_service_name, m_service_id, sample, force_); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, force_); } void CServiceServerImpl::Unregister() @@ -407,7 +378,7 @@ namespace eCAL service.sid = m_service_id; // unregister entity - if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterServer(m_service_name, m_service_id, sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, true); } int CServiceServerImpl::RequestCallback(const std::string& request_pb_, std::string& response_pb_) diff --git a/ecal/tests/CMakeLists.txt b/ecal/tests/CMakeLists.txt index 6baf791a34..36a8a7f771 100644 --- a/ecal/tests/CMakeLists.txt +++ b/ecal/tests/CMakeLists.txt @@ -35,7 +35,10 @@ add_subdirectory(cpp/event_test) add_subdirectory(cpp/expmap_test) add_subdirectory(cpp/serialization_test) add_subdirectory(cpp/topic2mcast_test) -add_subdirectory(cpp/util_test) + +if(ECAL_CORE_REGISTRATION) + add_subdirectory(cpp/util_test) +endif() if(ECAL_CORE_REGISTRATION_SHM OR ECAL_CORE_TRANSPORT_SHM) add_subdirectory(cpp/io_memfile_test) diff --git a/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp b/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp index 1cad55c72f..7c562b2d5c 100644 --- a/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp +++ b/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp @@ -107,8 +107,8 @@ TEST(core_cpp_clientserver, GetServices) EXPECT_EQ(resp_desc, "foo::resp_desc1-1"); // change attributes again (this will not overwrite the attributes anymore) - bool ret2 = server.AddDescription("foo::method1", "foo::req_type1-2", "foo::req_desc1-2", "foo::resp_type1-2", "foo::resp_desc1-2"); - EXPECT_EQ(ret2, false); + //bool ret2 = server.AddDescription("foo::method1", "foo::req_type1-2", "foo::req_desc1-2", "foo::resp_type1-2", "foo::resp_desc1-2"); + //EXPECT_EQ(ret2, false); // check attributes eCAL::Util::GetServiceTypeNames("foo::service", "foo::method1", req_type, resp_type); From 7123bf4cb998fc90c5f267c838d127449c4c84f8 Mon Sep 17 00:00:00 2001 From: tftzee <49162693+rex-schilasky@users.noreply.github.com> Date: Wed, 20 Mar 2024 17:54:43 +0100 Subject: [PATCH 04/14] pubsub_test adapted to new, simple descgate logic --- .../cpp/pubsub_test/src/pubsub_gettopics.cpp | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_gettopics.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_gettopics.cpp index 66e4dc6eaa..15cecb29e4 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_gettopics.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_gettopics.cpp @@ -36,12 +36,12 @@ TEST(core_cpp_pubsub, GetTopics) // create and check a few pub/sub entities { eCAL::SDataTypeInformation info_A1 { "typeA1" ,"", "descA1" }; - eCAL::SDataTypeInformation info_A1_2{ "typeA1.2","", "descA1.2" }; + //eCAL::SDataTypeInformation info_A1_2{ "typeA1.2","", "descA1.2" }; eCAL::SDataTypeInformation info_A2 { "typeA2" ,"", "descA2" }; eCAL::SDataTypeInformation info_A3 { "typeA3" ,"", "descA3" }; eCAL::SDataTypeInformation info_B1 { "typeB1" ,"", "descB1" }; - eCAL::SDataTypeInformation info_B1_2{ "typeB1.2","", "descB1.2" }; + //eCAL::SDataTypeInformation info_B1_2{ "typeB1.2","", "descB1.2" }; eCAL::SDataTypeInformation info_B2 { "typeB2" ,"", "descB2" }; // create 3 publisher @@ -51,7 +51,7 @@ TEST(core_cpp_pubsub, GetTopics) // create a missmatching publisher // this should trigger a warning but not increase map size - eCAL::CPublisher pub12("A1", info_A1_2); + //eCAL::CPublisher pub12("A1", info_A1_2); // create 2 subscriber eCAL::CSubscriber sub1("B1", info_B1); @@ -59,7 +59,7 @@ TEST(core_cpp_pubsub, GetTopics) // create a missmatching subscriber // this should trigger a warning but not increase map size - eCAL::CSubscriber sub12("B1", info_B1_2); + //eCAL::CSubscriber sub12("B1", info_B1_2); // get all topics eCAL::Util::GetTopics(topic_info_map); @@ -110,19 +110,19 @@ TEST(core_cpp_pubsub, GetTopics) eCAL::Util::GetTopics(topic_info_map); // size should be 5 again (because of pub1.2 and sub1.2 should have replaced pub1 and sub1 attributes now) - EXPECT_EQ(topic_info_map.size(), 5); + //EXPECT_EQ(topic_info_map.size(), 5); // check overwritten attributes - { - eCAL::SDataTypeInformation utils_topic_info; - eCAL::Util::GetTopicDataTypeInformation("A1", utils_topic_info); - EXPECT_EQ(utils_topic_info, info_A1_2); - } - { - eCAL::SDataTypeInformation utils_topic_info; - eCAL::Util::GetTopicDataTypeInformation("B1", utils_topic_info); - EXPECT_EQ(utils_topic_info, info_B1_2); - } + //{ + // eCAL::SDataTypeInformation utils_topic_info; + // eCAL::Util::GetTopicDataTypeInformation("A1", utils_topic_info); + // EXPECT_EQ(utils_topic_info, info_A1_2); + //} + //{ + // eCAL::SDataTypeInformation utils_topic_info; + // eCAL::Util::GetTopicDataTypeInformation("B1", utils_topic_info); + // EXPECT_EQ(utils_topic_info, info_B1_2); + //} } // let's unregister them From 3cc7168c516301a81c9b41c6712eaf9dbd3a1aa0 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Wed, 20 Mar 2024 18:24:59 +0100 Subject: [PATCH 05/14] init flag ProcessReg removed --- app/play/play_gui/src/main.cpp | 2 +- contrib/mma/src/mma_application.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/play/play_gui/src/main.cpp b/app/play/play_gui/src/main.cpp index 906771dd7a..91fcb1618b 100644 --- a/app/play/play_gui/src/main.cpp +++ b/app/play/play_gui/src/main.cpp @@ -159,7 +159,7 @@ int main(int argc, char *argv[]) ////////////////////////////////////////////////////////////////////////////// // Just make sure that eCAL is initialized - eCAL::Initialize(0, nullptr, "eCALPlayGUI", eCAL::Init::Default | eCAL::Init::ProcessReg | eCAL::Init::Publisher | eCAL::Init::Service | eCAL::Init::Monitoring); + eCAL::Initialize(0, nullptr, "eCALPlayGUI", eCAL::Init::Default | eCAL::Init::Publisher | eCAL::Init::Service | eCAL::Init::Monitoring); // For linux big measurements require more file descriptors than the default value #ifdef ECAL_OS_LINUX diff --git a/contrib/mma/src/mma_application.cpp b/contrib/mma/src/mma_application.cpp index eac9841fce..0bd4ec8312 100644 --- a/contrib/mma/src/mma_application.cpp +++ b/contrib/mma/src/mma_application.cpp @@ -135,7 +135,7 @@ int main(int argc, char** argv) std::cout << app_version_header << std::endl << ecal_version_header << std::endl << std::endl; // initialize eCAL API - if (eCAL::Initialize(0, nullptr, MMA_APPLICATION_NAME, eCAL::Init::Publisher | eCAL::Init::ProcessReg) < 0) + if (eCAL::Initialize(0, nullptr, MMA_APPLICATION_NAME, eCAL::Init::Publisher) < 0) { std::cout << "eCAL initialization failed !"; return 1; From 10c6fbd72f5a495b070955156832274955301154 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Thu, 21 Mar 2024 12:00:50 +0100 Subject: [PATCH 06/14] do not clear the sample list on registration provider destruction (to process all remaining samples, like unregistrations) --- ecal/core/src/registration/ecal_registration_provider.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ecal/core/src/registration/ecal_registration_provider.cpp b/ecal/core/src/registration/ecal_registration_provider.cpp index 793fb285e3..a30237ce0a 100644 --- a/ecal/core/src/registration/ecal_registration_provider.cpp +++ b/ecal/core/src/registration/ecal_registration_provider.cpp @@ -104,8 +104,7 @@ namespace eCAL // stop cyclic registration thread m_reg_sample_snd_thread->stop(); - // prepare unregistration - ClearSampleList(); + // add process unregistration sample AddSample2SampleList(GetProcessUnregisterSample()); if (m_use_registration_udp) From f44181fa95b3a0b5169637223fd42e7517d5221e Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Thu, 21 Mar 2024 13:27:29 +0100 Subject: [PATCH 07/14] do unregistration "unforced", this will lead to much higher performance in case of finalizing nodes with a high number of pub/sub/client/server entities --- ecal/core/src/readwrite/ecal_reader.cpp | 2 +- ecal/core/src/readwrite/ecal_writer.cpp | 2 +- ecal/core/src/service/ecal_service_client_impl.cpp | 2 +- ecal/core/src/service/ecal_service_server_impl.cpp | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index c91ce3aa5b..71ad234d0e 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -332,7 +332,7 @@ namespace eCAL ecal_reg_sample_topic.uname = Process::GetUnitName(); // unregister subscriber - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false); #ifndef NDEBUG // log it Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister"); diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index bf75a868b3..806dedab4a 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -897,7 +897,7 @@ namespace eCAL ecal_reg_sample_topic.uname = Process::GetUnitName(); // unregister publisher - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false); #ifndef NDEBUG // log it diff --git a/ecal/core/src/service/ecal_service_client_impl.cpp b/ecal/core/src/service/ecal_service_client_impl.cpp index 6a0898b950..9bb8d2734f 100644 --- a/ecal/core/src/service/ecal_service_client_impl.cpp +++ b/ecal/core/src/service/ecal_service_client_impl.cpp @@ -685,7 +685,7 @@ namespace eCAL service_client.version = m_client_version; // unregister entity - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, false); } void CServiceClientImpl::CheckForNewServices() diff --git a/ecal/core/src/service/ecal_service_server_impl.cpp b/ecal/core/src/service/ecal_service_server_impl.cpp index 4691d8deaa..bb61c979a3 100644 --- a/ecal/core/src/service/ecal_service_server_impl.cpp +++ b/ecal/core/src/service/ecal_service_server_impl.cpp @@ -378,7 +378,7 @@ namespace eCAL service.sid = m_service_id; // unregister entity - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, false); } int CServiceServerImpl::RequestCallback(const std::string& request_pb_, std::string& response_pb_) From 85e6564f751e63f26b6f5880c4f38b8a5f89c80b Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Thu, 21 Mar 2024 15:35:06 +0100 Subject: [PATCH 08/14] util test ParallelGetTopics added --- ecal/tests/cpp/util_test/src/util_test.cpp | 71 +++++++++++++++++++++- 1 file changed, 68 insertions(+), 3 deletions(-) diff --git a/ecal/tests/cpp/util_test/src/util_test.cpp b/ecal/tests/cpp/util_test/src/util_test.cpp index 9d5b96512b..b6692b2fd9 100644 --- a/ecal/tests/cpp/util_test/src/util_test.cpp +++ b/ecal/tests/cpp/util_test/src/util_test.cpp @@ -17,11 +17,13 @@ * ========================= eCAL LICENSE ================================= */ -#include #include -#include #include +#include +#include + +#include namespace { void TestCombinedTopicEncodingAndType(const std::string& encoding, const std::string& type, const std::string& expected_result) @@ -116,7 +118,6 @@ TEST(core_cpp_util, Freq_ResettableFrequencyCalculator) { const auto check_delta_t = std::chrono::milliseconds(999); - for (const auto& pair : frequency_pairs) { { @@ -193,3 +194,67 @@ TEST(core_cpp_util, Freq_ResettableFrequencyCalculator) } } } + +TEST(core_cpp_util, ParallelGetTopics) +{ + constexpr const int max_publisher_count(1000); + constexpr const int parallel_threads(1); + + eCAL::Initialize(); + + auto create_publishers = [&]() { + std::string topic_name = "Test.ParallelUtilFunctions"; + std::atomic call_back_count = 0; + + std::vector> publishers; + for (int pub_count = 0; pub_count < max_publisher_count; pub_count++) { + std::unique_ptr publisher = std::make_unique(topic_name + std::to_string(pub_count)); + publishers.push_back(std::move(publisher)); + } + }; + + auto get_topics_from_ecal = [&]() { + size_t found_topics = 0; + std::vector tmp_topic_names; + std::unordered_map topics; + do { + eCAL::Util::GetTopicNames(tmp_topic_names); + eCAL::Util::GetTopics(topics); + + found_topics = tmp_topic_names.size(); + std::cout << "Number of topics found by ecal: " << found_topics << "\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } while (found_topics < max_publisher_count); + + // do it again until all publishers are deleted + do { + eCAL::Util::GetTopicNames(tmp_topic_names); + eCAL::Util::GetTopics(topics); + + found_topics = tmp_topic_names.size(); + std::cout << "Number of topics found by ecal: " << found_topics << "\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } while (found_topics != 0); + }; + + std::vector threads_container; + threads_container.push_back(std::thread(create_publishers)); + + for (size_t i = 0; i < parallel_threads; i++) { + threads_container.push_back(std::thread(get_topics_from_ecal)); + } + + for (auto& th : threads_container) { + th.join(); + } + + std::vector tmp_topic_names; + std::unordered_map topics; + eCAL::Util::GetTopicNames(tmp_topic_names); + eCAL::Util::GetTopics(topics); + + EXPECT_EQ(tmp_topic_names.size(), 0); + EXPECT_EQ(topics.size(), 0); + + eCAL::Finalize(); +} From a72a59d065d9d88afcfb744566ed356154511763 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Thu, 21 Mar 2024 17:03:20 +0100 Subject: [PATCH 09/14] std::atomic initialization fixed --- ecal/tests/cpp/util_test/src/util_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ecal/tests/cpp/util_test/src/util_test.cpp b/ecal/tests/cpp/util_test/src/util_test.cpp index b6692b2fd9..80c15011ed 100644 --- a/ecal/tests/cpp/util_test/src/util_test.cpp +++ b/ecal/tests/cpp/util_test/src/util_test.cpp @@ -204,7 +204,7 @@ TEST(core_cpp_util, ParallelGetTopics) auto create_publishers = [&]() { std::string topic_name = "Test.ParallelUtilFunctions"; - std::atomic call_back_count = 0; + std::atomic call_back_count{ 0 }; std::vector> publishers; for (int pub_count = 0; pub_count < max_publisher_count; pub_count++) { From 8c54eaee762e1e3b676828368b312e83cb5fd745 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Thu, 21 Mar 2024 18:15:32 +0100 Subject: [PATCH 10/14] unregistration utilized in CDescGate --- ecal/core/src/ecal_descgate.cpp | 30 ++++++++++++++++++++++ ecal/core/src/ecal_descgate.h | 5 ++++ ecal/tests/cpp/util_test/src/util_test.cpp | 16 +++++++----- 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/ecal/core/src/ecal_descgate.cpp b/ecal/core/src/ecal_descgate.cpp index 1b1eb0a426..b6f0946457 100644 --- a/ecal/core/src/ecal_descgate.cpp +++ b/ecal/core/src/ecal_descgate.cpp @@ -178,6 +178,12 @@ namespace eCAL } break; case bct_unreg_service: + { + for (const auto& method : sample_.service.methods) + { + RemoveServiceDescription(sample_.service.sname, method.mname); + } + } break; case bct_reg_client: //for (const auto& method : sample_.client.methods) @@ -194,16 +200,22 @@ namespace eCAL //} break; case bct_unreg_client: + //for (const auto& method : sample_.client.methods) + //{ + // RemoveClientDescription(sample_.service.sname, method.mname); + //} break; case bct_reg_publisher: ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype); break; case bct_unreg_publisher: + RemoveTopicDescription(sample_.topic.tname); break; case bct_reg_subscriber: ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype); break; case bct_unreg_subscriber: + RemoveTopicDescription(sample_.topic.tname); break; default: { @@ -225,6 +237,14 @@ namespace eCAL return true; } + bool CDescGate::RemoveTopicDescription(const std::string& topic_name_) + { + const std::unique_lock lock(m_topic_info_map.sync); + + // remove topic entry + return (*m_topic_info_map.map).erase(topic_name_); + } + bool CDescGate::ApplyServiceDescription(const std::string& service_name_ , const std::string& method_name_ , const SDataTypeInformation& request_type_information_ @@ -244,4 +264,14 @@ namespace eCAL (*m_service_info_map.map)[service_method_tuple] = service_info; return true; } + + bool CDescGate::RemoveServiceDescription(const std::string& service_name_, const std::string& method_name_) + { + std::tuple service_method_tuple = std::make_tuple(service_name_, method_name_); + + const std::lock_guard lock(m_service_info_map.sync); + + // remove service entry + return (*m_service_info_map.map).erase(service_method_tuple); + } } diff --git a/ecal/core/src/ecal_descgate.h b/ecal/core/src/ecal_descgate.h index e710a74fca..c6aafc78ce 100644 --- a/ecal/core/src/ecal_descgate.h +++ b/ecal/core/src/ecal_descgate.h @@ -67,11 +67,16 @@ namespace eCAL bool ApplyTopicDescription(const std::string& topic_name_, const SDataTypeInformation& topic_info_); + bool RemoveTopicDescription(const std::string& topic_name_); + bool ApplyServiceDescription(const std::string& service_name_, const std::string& method_name_, const SDataTypeInformation& request_type_information_, const SDataTypeInformation& response_type_information_); + bool RemoveServiceDescription(const std::string& service_name_, + const std::string& method_name_); + // key: topic name | value: topic (type/desc), quality using TopicInfoMap = eCAL::Util::CExpMap; //!< Map containing { TopicName -> (Type, Description) } mapping of all topics that are currently known struct STopicInfoMap diff --git a/ecal/tests/cpp/util_test/src/util_test.cpp b/ecal/tests/cpp/util_test/src/util_test.cpp index 80c15011ed..d7aa7f8fe5 100644 --- a/ecal/tests/cpp/util_test/src/util_test.cpp +++ b/ecal/tests/cpp/util_test/src/util_test.cpp @@ -197,7 +197,8 @@ TEST(core_cpp_util, Freq_ResettableFrequencyCalculator) TEST(core_cpp_util, ParallelGetTopics) { - constexpr const int max_publisher_count(1000); + constexpr const int max_publisher_count(2000); + constexpr const int waiting_time_thread(1000); constexpr const int parallel_threads(1); eCAL::Initialize(); @@ -211,6 +212,7 @@ TEST(core_cpp_util, ParallelGetTopics) std::unique_ptr publisher = std::make_unique(topic_name + std::to_string(pub_count)); publishers.push_back(std::move(publisher)); } + std::this_thread::sleep_for(std::chrono::milliseconds(waiting_time_thread)); }; auto get_topics_from_ecal = [&]() { @@ -248,13 +250,13 @@ TEST(core_cpp_util, ParallelGetTopics) th.join(); } - std::vector tmp_topic_names; - std::unordered_map topics; - eCAL::Util::GetTopicNames(tmp_topic_names); - eCAL::Util::GetTopics(topics); + std::vector final_topic_names; + std::unordered_map final_topics; + eCAL::Util::GetTopicNames(final_topic_names); + eCAL::Util::GetTopics(final_topics); - EXPECT_EQ(tmp_topic_names.size(), 0); - EXPECT_EQ(topics.size(), 0); + EXPECT_EQ(final_topic_names.size(), 0); + EXPECT_EQ(final_topics.size(), 0); eCAL::Finalize(); } From 55aff9deab0e8bdda03bdad07b79759b687cd6bf Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Fri, 22 Mar 2024 10:26:28 +0100 Subject: [PATCH 11/14] quality logic reintroduced in description gate to get this PR done --- ecal/core/src/ecal_descgate.cpp | 262 +++++++++++++++--- ecal/core/src/ecal_descgate.h | 52 +++- .../src/clientserver_getservices.cpp | 6 +- .../cpp/pubsub_test/src/pubsub_gettopics.cpp | 30 +- 4 files changed, 289 insertions(+), 61 deletions(-) diff --git a/ecal/core/src/ecal_descgate.cpp b/ecal/core/src/ecal_descgate.cpp index b6f0946457..39a7e15b47 100644 --- a/ecal/core/src/ecal_descgate.cpp +++ b/ecal/core/src/ecal_descgate.cpp @@ -27,10 +27,52 @@ #include "ecal_globals.h" #include "ecal_descgate.h" +namespace +{ + // TODO: remove me with new CDescGate + eCAL::CDescGate::QualityFlags GetServiceMethodQuality(const std::string& service_name_, const std::string& method_name_, + const eCAL::SDataTypeInformation& request_info_, + const eCAL::SDataTypeInformation& response_info_) + { + eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; + if (!(request_info_.name.empty() && response_info_.name.empty())) + quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; + if (!(request_info_.descriptor.empty() && response_info_.descriptor.empty())) + quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; + + return quality; + } + + eCAL::CDescGate::QualityFlags GetPublisherQuality(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_) + { + eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; + if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) + quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; + if (!topic_info_.descriptor.empty()) + quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; + quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; + quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_PRODUCER; + + return quality; + } + + eCAL::CDescGate::QualityFlags GetSubscriberQuality(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_) + { + eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; + if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) + quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; + if (!topic_info_.descriptor.empty()) + quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; + quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; + + return quality; + } +} + namespace eCAL { CDescGate::CDescGate() : - m_topic_info_map (std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())), + m_topic_info_map(std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())), m_service_info_map(std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())) { } @@ -64,7 +106,7 @@ namespace eCAL for (const auto& topic_info : (*m_topic_info_map.map)) { - map.emplace(topic_info.first, topic_info.second); + map.emplace(topic_info.first, topic_info.second.info); } topic_info_map_.swap(map); } @@ -92,7 +134,7 @@ namespace eCAL const auto topic_info_it = m_topic_info_map.map->find(topic_name_); if (topic_info_it == m_topic_info_map.map->end()) return(false); - topic_info_ = (*topic_info_it).second; + topic_info_ = (*topic_info_it).second.info; return(true); } @@ -105,7 +147,7 @@ namespace eCAL for (const auto& service_info : (*m_service_info_map.map)) { - map.emplace(service_info.first, service_info.second); + map.emplace(service_info.first, service_info.second.info); } service_info_map_.swap(map); } @@ -133,8 +175,8 @@ namespace eCAL auto service_info_map_it = m_service_info_map.map->find(service_method_tuple); if (service_info_map_it == m_service_info_map.map->end()) return false; - req_type_name_ = (*service_info_map_it).second.request_type.name; - resp_type_name_ = (*service_info_map_it).second.response_type.name; + req_type_name_ = (*service_info_map_it).second.info.request_type.name; + resp_type_name_ = (*service_info_map_it).second.info.response_type.name; return true; } @@ -147,8 +189,8 @@ namespace eCAL auto service_info_map_it = m_service_info_map.map->find(service_method_tuple); if (service_info_map_it == m_service_info_map.map->end()) return false; - req_type_desc_ = (*service_info_map_it).second.request_type.descriptor; - resp_type_desc_ = (*service_info_map_it).second.response_type.descriptor; + req_type_desc_ = (*service_info_map_it).second.info.request_type.descriptor; + resp_type_desc_ = (*service_info_map_it).second.info.response_type.descriptor; return true; } @@ -165,7 +207,7 @@ namespace eCAL { for (const auto& method : sample_.service.methods) { - SDataTypeInformation request_type; + SDataTypeInformation request_type{}; request_type.name = method.req_type; request_type.descriptor = method.req_desc; @@ -173,17 +215,17 @@ namespace eCAL response_type.name = method.resp_type; response_type.descriptor = method.resp_desc; - ApplyServiceDescription(sample_.service.sname, method.mname, request_type, response_type); + ApplyServiceDescription(sample_.service.sname, method.mname, request_type, response_type, GetServiceMethodQuality(sample_.service.sname, method.mname, request_type, response_type)); } } - break; + break; case bct_unreg_service: - { - for (const auto& method : sample_.service.methods) - { - RemoveServiceDescription(sample_.service.sname, method.mname); - } - } + //{ + // for (const auto& method : sample_.service.methods) + // { + // RemoveServiceDescription(sample_.service.sname, method.mname); + // } + //} break; case bct_reg_client: //for (const auto& method : sample_.client.methods) @@ -196,7 +238,7 @@ namespace eCAL // response_type.name = method.resp_type; // response_type.descriptor = method.resp_desc; - // ApplyClientDescription(sample_.service.sname, method.mname, request_type, response_type); + // ApplyClientDescription(sample_.service.sname, method.mname, request_type, response_type, GetQuality(sample_)); //} break; case bct_unreg_client: @@ -206,16 +248,16 @@ namespace eCAL //} break; case bct_reg_publisher: - ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype); + ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype, GetPublisherQuality(sample_.topic.tname, sample_.topic.tdatatype)); break; case bct_unreg_publisher: - RemoveTopicDescription(sample_.topic.tname); + //RemoveTopicDescription(sample_.topic.tname); break; case bct_reg_subscriber: - ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype); + ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype, GetSubscriberQuality(sample_.topic.tname, sample_.topic.tdatatype)); break; case bct_unreg_subscriber: - RemoveTopicDescription(sample_.topic.tname); + //RemoveTopicDescription(sample_.topic.tname); break; default: { @@ -227,14 +269,144 @@ namespace eCAL return true; } - bool CDescGate::ApplyTopicDescription(const std::string& topic_name_, const SDataTypeInformation& topic_info_) + bool CDescGate::ApplyTopicDescription(const std::string& topic_name_, const SDataTypeInformation& topic_info_, const QualityFlags description_quality_) { const std::unique_lock lock(m_topic_info_map.sync); m_topic_info_map.map->remove_deprecated(); - // update topic entry (and its timestamp) - (*m_topic_info_map.map)[topic_name_] = topic_info_; - return true; + const auto topic_info_it = m_topic_info_map.map->find(topic_name_); + + // new element (no need to check anything, just add it) + if (topic_info_it == m_topic_info_map.map->end()) + { + // create a new topic entry + STopicInfoQuality& topic_info = (*m_topic_info_map.map)[topic_name_]; + topic_info.info = topic_info_; + topic_info.quality = description_quality_; + return true; + } + + // we do not use the [] operator here to not update the timestamp + // by accessing the map entry + // + // a topic with the same name but different type name or different description + // should NOT update the timestamp of an existing entry + // + // otherwise there could be a scenario where a "lower quality topic" would keep a + // "higher quality topic" alive (even it is no more existing) + STopicInfoQuality topic_info = (*topic_info_it).second; + + // first let's check whether the current information has a higher quality + // if it has a higher quality, we overwrite it + if (description_quality_ > topic_info.quality) + { + // overwrite attributes + topic_info.info = topic_info_; + topic_info.quality = description_quality_; + + // update attributes and return + (*m_topic_info_map.map)[topic_name_] = topic_info; + return true; + } + + // this is the same topic (topic name, topic type name, topic type description) + if (topic_info.info == topic_info_) + { + // update timestamp (by just accessing the entry) and return + (*m_topic_info_map.map)[topic_name_] = topic_info; + return false; + } + + // topic type name or topic description differ but we logged this before + if (topic_info.type_missmatch_logged) + { + return false; + } + + // topic type name or topic description differ and this is not logged yet + // so we log the error and update the entry one time + bool update_topic_info(false); + + // topic type name differs + // we log the error and update the entry one time + if (!topic_info_.encoding.empty() + && !topic_info.info.encoding.empty() + && (topic_info.info.encoding != topic_info_.encoding) + ) + { + std::string tencoding1 = topic_info.info.encoding; + std::string tencoding2 = topic_info_.encoding; + std::replace(tencoding1.begin(), tencoding1.end(), '\0', '?'); + std::replace(tencoding1.begin(), tencoding1.end(), '\t', '?'); + std::replace(tencoding2.begin(), tencoding2.end(), '\0', '?'); + std::replace(tencoding2.begin(), tencoding2.end(), '\t', '?'); + std::string msg = "eCAL Pub/Sub encoding mismatch for topic "; + msg += topic_name_; + msg += " (\'"; + msg += tencoding1; + msg += "\' <> \'"; + msg += tencoding2; + msg += "\')"; + eCAL::Logging::Log(log_level_warning, msg); + + // mark as logged + topic_info.type_missmatch_logged = true; + // and update its attributes + update_topic_info = true; + } + + // topic type name differs + // we log the error and update the entry one time + if (!topic_info_.name.empty() + && !topic_info.info.name.empty() + && (topic_info.info.name != topic_info_.name) + ) + { + std::string ttype1 = topic_info.info.name; + std::string ttype2 = topic_info_.name; + std::replace(ttype1.begin(), ttype1.end(), '\0', '?'); + std::replace(ttype1.begin(), ttype1.end(), '\t', '?'); + std::replace(ttype2.begin(), ttype2.end(), '\0', '?'); + std::replace(ttype2.begin(), ttype2.end(), '\t', '?'); + std::string msg = "eCAL Pub/Sub type mismatch for topic "; + msg += topic_name_; + msg += " (\'"; + msg += ttype1; + msg += "\' <> \'"; + msg += ttype2; + msg += "\')"; + eCAL::Logging::Log(log_level_warning, msg); + + // mark as logged + topic_info.type_missmatch_logged = true; + // and update its attributes + update_topic_info = true; + } + + // topic type description differs + // we log the error and update the entry one time + if (!topic_info_.descriptor.empty() + && !topic_info.info.descriptor.empty() + && (topic_info.info.descriptor != topic_info_.descriptor) + ) + { + std::string msg = "eCAL Pub/Sub description mismatch for topic "; + msg += topic_name_; + eCAL::Logging::Log(log_level_warning, msg); + + // mark as logged + topic_info.type_missmatch_logged = true; + // and update its attributes + update_topic_info = true; + } + + // update topic info attributes + if (update_topic_info) + { + (*m_topic_info_map.map)[topic_name_] = topic_info; + } + + return false; } bool CDescGate::RemoveTopicDescription(const std::string& topic_name_) @@ -246,23 +418,43 @@ namespace eCAL } bool CDescGate::ApplyServiceDescription(const std::string& service_name_ - , const std::string& method_name_ - , const SDataTypeInformation& request_type_information_ - , const SDataTypeInformation& response_type_information_) + , const std::string& method_name_ + , const SDataTypeInformation& request_type_information_ + , const SDataTypeInformation& response_type_information_ + , const QualityFlags description_quality_) { std::tuple service_method_tuple = std::make_tuple(service_name_, method_name_); const std::lock_guard lock(m_service_info_map.sync); m_service_info_map.map->remove_deprecated(); - // aggregate service information - SServiceMethodInformation service_info; - service_info.request_type = request_type_information_; - service_info.response_type = response_type_information_; - + auto service_info_map_it = m_service_info_map.map->find(service_method_tuple); + if (service_info_map_it == m_service_info_map.map->end()) + { + // create a new service entry + SServiceMethodInfoQuality& service_info = (*m_service_info_map.map)[service_method_tuple]; + service_info.info.request_type = request_type_information_; + service_info.info.response_type = response_type_information_; + service_info.quality = description_quality_; + return true; + } + + // let's check whether the current information has a higher quality + // if it has a higher quality, we overwrite it + bool ret_value(false); + SServiceMethodInfoQuality service_info = (*service_info_map_it).second; + if (description_quality_ > service_info.quality) + { + service_info.info.request_type = request_type_information_; + service_info.info.response_type = response_type_information_; + service_info.quality = description_quality_; + ret_value = true; + } + // update service entry (and its timestamp) (*m_service_info_map.map)[service_method_tuple] = service_info; - return true; + + return ret_value; } bool CDescGate::RemoveServiceDescription(const std::string& service_name_, const std::string& method_name_) diff --git a/ecal/core/src/ecal_descgate.h b/ecal/core/src/ecal_descgate.h index c6aafc78ce..81010a65a3 100644 --- a/ecal/core/src/ecal_descgate.h +++ b/ecal/core/src/ecal_descgate.h @@ -45,6 +45,21 @@ namespace eCAL { class CDescGate { + public: + // Enumeration of quality bits used for detecting how good a topic information is. + enum class QualityFlags : int + { + NO_QUALITY = 0, //!< Special value for initialization + + DESCRIPTION_AVAILABLE = 0x1 << 4, //!< Having a descriptor at all is the most important thing + INFO_COMES_FROM_CORRECT_ENTITY = 0x1 << 3, //!< The information comes from the current topic/service + //!< and has not been borrowed from another emtity, like read by a subscriber from a publisher + INFO_COMES_FROM_PRODUCER = 0x1 << 2, //!< A descriptor coming from the producer (like a publisher) is better than one from a + //!< consumer (like a subscriber), as we assume that the publisher knows best what he is publishing + INFO_COMES_FROM_THIS_PROCESS = 0x1 << 1, //!< We prefer descriptors from the current process + TYPE_AVAILABLE = 0x1 << 0, //!< Having information about the type's name available is nice but not that important to us + }; + public: CDescGate(); ~CDescGate(); @@ -64,34 +79,49 @@ namespace eCAL protected: bool ApplySample(const Registration::Sample& sample_, eTLayerType layer_); - bool ApplyTopicDescription(const std::string& topic_name_, - const SDataTypeInformation& topic_info_); + bool ApplyTopicDescription(const std::string& topic_name_, + const SDataTypeInformation& topic_info_, + QualityFlags description_quality_); bool RemoveTopicDescription(const std::string& topic_name_); bool ApplyServiceDescription(const std::string& service_name_, const std::string& method_name_, const SDataTypeInformation& request_type_information_, - const SDataTypeInformation& response_type_information_); + const SDataTypeInformation& response_type_information_, + QualityFlags description_quality_); bool RemoveServiceDescription(const std::string& service_name_, const std::string& method_name_); + struct STopicInfoQuality + { + SDataTypeInformation info; //!< Topic info struct with type encoding, name and descriptor. + QualityFlags quality = QualityFlags::NO_QUALITY; //!< QualityFlags to determine whether we may overwrite the current data with better one. E.g. we prefer the description sent by a publisher over one sent by a subscriber. + bool type_missmatch_logged = false; //!< Whether we have already logged a type-missmatch + }; + + struct SServiceMethodInfoQuality + { + SServiceMethodInformation info; //!< Service info struct with type names and descriptors for request and response. + QualityFlags quality = QualityFlags::NO_QUALITY; //!< The Quality of the Info + }; + // key: topic name | value: topic (type/desc), quality - using TopicInfoMap = eCAL::Util::CExpMap; //!< Map containing { TopicName -> (Type, Description) } mapping of all topics that are currently known + using TopicInfoMap = eCAL::Util::CExpMap; //!< Map containing { TopicName -> (Type, Description, Quality) } mapping of all topics that are currently known struct STopicInfoMap { explicit STopicInfoMap(const std::chrono::milliseconds& timeout_) : map(std::make_unique(timeout_)) { }; - mutable std::mutex sync; //!< Mutex protecting the map - std::unique_ptr map; //!< Map containing information about each known topic + mutable std::mutex sync; //!< Mutex protecting the map + std::unique_ptr map; //!< Map containing information about each known topic }; STopicInfoMap m_topic_info_map; // key: tup | value: request (type/desc), response (type/desc), quality - using ServiceMethodInfoMap = eCAL::Util::CExpMap, SServiceMethodInformation>; //!< Map { (ServiceName, MethodName) -> ( (ReqType, ReqDescription), (RespType, RespDescription) ) } mapping of all currently known services + using ServiceMethodInfoMap = eCAL::Util::CExpMap, SServiceMethodInfoQuality>; //!< Map { (ServiceName, MethodName) -> ( (ReqType, ReqDescription), (RespType, RespDescription), Quality ) } mapping of all currently known services struct SServiceMethodInfoMap { explicit SServiceMethodInfoMap(const std::chrono::milliseconds& timeout_) : @@ -103,4 +133,12 @@ namespace eCAL }; SServiceMethodInfoMap m_service_info_map; }; + + constexpr inline CDescGate::QualityFlags operator~ (CDescGate::QualityFlags a) { return static_cast( ~static_cast::type>(a) ); } + constexpr inline CDescGate::QualityFlags operator| (CDescGate::QualityFlags a, CDescGate::QualityFlags b) { return static_cast( static_cast::type>(a) | static_cast::type>(b) ); } + constexpr inline CDescGate::QualityFlags operator& (CDescGate::QualityFlags a, CDescGate::QualityFlags b) { return static_cast( static_cast::type>(a) & static_cast::type>(b) ); } + constexpr inline CDescGate::QualityFlags operator^ (CDescGate::QualityFlags a, CDescGate::QualityFlags b) { return static_cast( static_cast::type>(a) ^ static_cast::type>(b) ); } + inline CDescGate::QualityFlags& operator|= (CDescGate::QualityFlags& a, CDescGate::QualityFlags b) { return reinterpret_cast( reinterpret_cast::type&>(a) |= static_cast::type>(b) ); } + inline CDescGate::QualityFlags& operator&= (CDescGate::QualityFlags& a, CDescGate::QualityFlags b) { return reinterpret_cast( reinterpret_cast::type&>(a) &= static_cast::type>(b) ); } + inline CDescGate::QualityFlags& operator^= (CDescGate::QualityFlags& a, CDescGate::QualityFlags b) { return reinterpret_cast( reinterpret_cast::type&>(a) ^= static_cast::type>(b) ); } } diff --git a/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp b/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp index 7c562b2d5c..a4c9f445c2 100644 --- a/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp +++ b/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp @@ -95,8 +95,7 @@ TEST(core_cpp_clientserver, GetServices) EXPECT_EQ(resp_desc, ""); // change attributes - bool ret1 = server.AddDescription("foo::method1", "foo::req_type1-1", "foo::req_desc1-1", "foo::resp_type1-1", "foo::resp_desc1-1"); - EXPECT_EQ(ret1, true); + server.AddDescription("foo::method1", "foo::req_type1-1", "foo::req_desc1-1", "foo::resp_type1-1", "foo::resp_desc1-1"); // check attributes eCAL::Util::GetServiceTypeNames("foo::service", "foo::method1", req_type, resp_type); @@ -107,8 +106,7 @@ TEST(core_cpp_clientserver, GetServices) EXPECT_EQ(resp_desc, "foo::resp_desc1-1"); // change attributes again (this will not overwrite the attributes anymore) - //bool ret2 = server.AddDescription("foo::method1", "foo::req_type1-2", "foo::req_desc1-2", "foo::resp_type1-2", "foo::resp_desc1-2"); - //EXPECT_EQ(ret2, false); + server.AddDescription("foo::method1", "foo::req_type1-2", "foo::req_desc1-2", "foo::resp_type1-2", "foo::resp_desc1-2"); // check attributes eCAL::Util::GetServiceTypeNames("foo::service", "foo::method1", req_type, resp_type); diff --git a/ecal/tests/cpp/pubsub_test/src/pubsub_gettopics.cpp b/ecal/tests/cpp/pubsub_test/src/pubsub_gettopics.cpp index 15cecb29e4..66e4dc6eaa 100644 --- a/ecal/tests/cpp/pubsub_test/src/pubsub_gettopics.cpp +++ b/ecal/tests/cpp/pubsub_test/src/pubsub_gettopics.cpp @@ -36,12 +36,12 @@ TEST(core_cpp_pubsub, GetTopics) // create and check a few pub/sub entities { eCAL::SDataTypeInformation info_A1 { "typeA1" ,"", "descA1" }; - //eCAL::SDataTypeInformation info_A1_2{ "typeA1.2","", "descA1.2" }; + eCAL::SDataTypeInformation info_A1_2{ "typeA1.2","", "descA1.2" }; eCAL::SDataTypeInformation info_A2 { "typeA2" ,"", "descA2" }; eCAL::SDataTypeInformation info_A3 { "typeA3" ,"", "descA3" }; eCAL::SDataTypeInformation info_B1 { "typeB1" ,"", "descB1" }; - //eCAL::SDataTypeInformation info_B1_2{ "typeB1.2","", "descB1.2" }; + eCAL::SDataTypeInformation info_B1_2{ "typeB1.2","", "descB1.2" }; eCAL::SDataTypeInformation info_B2 { "typeB2" ,"", "descB2" }; // create 3 publisher @@ -51,7 +51,7 @@ TEST(core_cpp_pubsub, GetTopics) // create a missmatching publisher // this should trigger a warning but not increase map size - //eCAL::CPublisher pub12("A1", info_A1_2); + eCAL::CPublisher pub12("A1", info_A1_2); // create 2 subscriber eCAL::CSubscriber sub1("B1", info_B1); @@ -59,7 +59,7 @@ TEST(core_cpp_pubsub, GetTopics) // create a missmatching subscriber // this should trigger a warning but not increase map size - //eCAL::CSubscriber sub12("B1", info_B1_2); + eCAL::CSubscriber sub12("B1", info_B1_2); // get all topics eCAL::Util::GetTopics(topic_info_map); @@ -110,19 +110,19 @@ TEST(core_cpp_pubsub, GetTopics) eCAL::Util::GetTopics(topic_info_map); // size should be 5 again (because of pub1.2 and sub1.2 should have replaced pub1 and sub1 attributes now) - //EXPECT_EQ(topic_info_map.size(), 5); + EXPECT_EQ(topic_info_map.size(), 5); // check overwritten attributes - //{ - // eCAL::SDataTypeInformation utils_topic_info; - // eCAL::Util::GetTopicDataTypeInformation("A1", utils_topic_info); - // EXPECT_EQ(utils_topic_info, info_A1_2); - //} - //{ - // eCAL::SDataTypeInformation utils_topic_info; - // eCAL::Util::GetTopicDataTypeInformation("B1", utils_topic_info); - // EXPECT_EQ(utils_topic_info, info_B1_2); - //} + { + eCAL::SDataTypeInformation utils_topic_info; + eCAL::Util::GetTopicDataTypeInformation("A1", utils_topic_info); + EXPECT_EQ(utils_topic_info, info_A1_2); + } + { + eCAL::SDataTypeInformation utils_topic_info; + eCAL::Util::GetTopicDataTypeInformation("B1", utils_topic_info); + EXPECT_EQ(utils_topic_info, info_B1_2); + } } // let's unregister them From cd39afa3554ec2ff98a7969ec70fc05e0afe9a1b Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Fri, 22 Mar 2024 10:43:05 +0100 Subject: [PATCH 12/14] missing include fixed --- ecal/core/src/ecal_descgate.cpp | 2 ++ ecal/core/src/ecal_descgate.h | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ecal/core/src/ecal_descgate.cpp b/ecal/core/src/ecal_descgate.cpp index 39a7e15b47..5dc45499fc 100644 --- a/ecal/core/src/ecal_descgate.cpp +++ b/ecal/core/src/ecal_descgate.cpp @@ -27,6 +27,8 @@ #include "ecal_globals.h" #include "ecal_descgate.h" +#include + namespace { // TODO: remove me with new CDescGate diff --git a/ecal/core/src/ecal_descgate.h b/ecal/core/src/ecal_descgate.h index 81010a65a3..d0d3e473ea 100644 --- a/ecal/core/src/ecal_descgate.h +++ b/ecal/core/src/ecal_descgate.h @@ -23,7 +23,6 @@ #pragma once -#include #include #include @@ -32,6 +31,7 @@ #include "serialization/ecal_serialize_sample_registration.h" #include "util/ecal_expmap.h" +#include #include #include #include From 066513411279aeee75d19fd52ab5d94517208572 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Fri, 22 Mar 2024 10:50:39 +0100 Subject: [PATCH 13/14] removed unused parameter and functions --- ecal/core/src/ecal_descgate.cpp | 49 ++++++++------------------------- ecal/core/src/ecal_descgate.h | 5 ---- 2 files changed, 11 insertions(+), 43 deletions(-) diff --git a/ecal/core/src/ecal_descgate.cpp b/ecal/core/src/ecal_descgate.cpp index 5dc45499fc..a10d996197 100644 --- a/ecal/core/src/ecal_descgate.cpp +++ b/ecal/core/src/ecal_descgate.cpp @@ -32,9 +32,7 @@ namespace { // TODO: remove me with new CDescGate - eCAL::CDescGate::QualityFlags GetServiceMethodQuality(const std::string& service_name_, const std::string& method_name_, - const eCAL::SDataTypeInformation& request_info_, - const eCAL::SDataTypeInformation& response_info_) + eCAL::CDescGate::QualityFlags GetServiceMethodQuality(const eCAL::SDataTypeInformation& request_info_, const eCAL::SDataTypeInformation& response_info_) { eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; if (!(request_info_.name.empty() && response_info_.name.empty())) @@ -45,7 +43,7 @@ namespace return quality; } - eCAL::CDescGate::QualityFlags GetPublisherQuality(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_) + eCAL::CDescGate::QualityFlags GetPublisherQuality(const eCAL::SDataTypeInformation& topic_info_) { eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) @@ -58,7 +56,7 @@ namespace return quality; } - eCAL::CDescGate::QualityFlags GetSubscriberQuality(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_) + eCAL::CDescGate::QualityFlags GetSubscriberQuality(const eCAL::SDataTypeInformation& topic_info_) { eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) @@ -217,19 +215,15 @@ namespace eCAL response_type.name = method.resp_type; response_type.descriptor = method.resp_desc; - ApplyServiceDescription(sample_.service.sname, method.mname, request_type, response_type, GetServiceMethodQuality(sample_.service.sname, method.mname, request_type, response_type)); + ApplyServiceDescription(sample_.service.sname, method.mname, request_type, response_type, GetServiceMethodQuality(request_type, response_type)); } } break; case bct_unreg_service: - //{ - // for (const auto& method : sample_.service.methods) - // { - // RemoveServiceDescription(sample_.service.sname, method.mname); - // } - //} + // TODO: Implement fast unregistration break; case bct_reg_client: + // TODO: Implement this after client methods are available //for (const auto& method : sample_.client.methods) //{ // SDataTypeInformation request_type; @@ -244,22 +238,19 @@ namespace eCAL //} break; case bct_unreg_client: - //for (const auto& method : sample_.client.methods) - //{ - // RemoveClientDescription(sample_.service.sname, method.mname); - //} + // TODO: Implement fast unregistration break; case bct_reg_publisher: - ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype, GetPublisherQuality(sample_.topic.tname, sample_.topic.tdatatype)); + ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype, GetPublisherQuality(sample_.topic.tdatatype)); break; case bct_unreg_publisher: - //RemoveTopicDescription(sample_.topic.tname); + // TODO: Implement fast unregistration break; case bct_reg_subscriber: - ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype, GetSubscriberQuality(sample_.topic.tname, sample_.topic.tdatatype)); + ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype, GetSubscriberQuality(sample_.topic.tdatatype)); break; case bct_unreg_subscriber: - //RemoveTopicDescription(sample_.topic.tname); + // TODO: Implement fast unregistration break; default: { @@ -411,14 +402,6 @@ namespace eCAL return false; } - bool CDescGate::RemoveTopicDescription(const std::string& topic_name_) - { - const std::unique_lock lock(m_topic_info_map.sync); - - // remove topic entry - return (*m_topic_info_map.map).erase(topic_name_); - } - bool CDescGate::ApplyServiceDescription(const std::string& service_name_ , const std::string& method_name_ , const SDataTypeInformation& request_type_information_ @@ -458,14 +441,4 @@ namespace eCAL return ret_value; } - - bool CDescGate::RemoveServiceDescription(const std::string& service_name_, const std::string& method_name_) - { - std::tuple service_method_tuple = std::make_tuple(service_name_, method_name_); - - const std::lock_guard lock(m_service_info_map.sync); - - // remove service entry - return (*m_service_info_map.map).erase(service_method_tuple); - } } diff --git a/ecal/core/src/ecal_descgate.h b/ecal/core/src/ecal_descgate.h index d0d3e473ea..6a95b398bb 100644 --- a/ecal/core/src/ecal_descgate.h +++ b/ecal/core/src/ecal_descgate.h @@ -83,17 +83,12 @@ namespace eCAL const SDataTypeInformation& topic_info_, QualityFlags description_quality_); - bool RemoveTopicDescription(const std::string& topic_name_); - bool ApplyServiceDescription(const std::string& service_name_, const std::string& method_name_, const SDataTypeInformation& request_type_information_, const SDataTypeInformation& response_type_information_, QualityFlags description_quality_); - bool RemoveServiceDescription(const std::string& service_name_, - const std::string& method_name_); - struct STopicInfoQuality { SDataTypeInformation info; //!< Topic info struct with type encoding, name and descriptor. From dcf1844768650e9975319478e61c6ec9199272d7 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Fri, 22 Mar 2024 11:07:34 +0100 Subject: [PATCH 14/14] util test corrected (dependency from ECAL_CORE_PUBLISHER option) --- ecal/tests/cpp/util_test/src/util_test.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ecal/tests/cpp/util_test/src/util_test.cpp b/ecal/tests/cpp/util_test/src/util_test.cpp index d7aa7f8fe5..a4abb1664e 100644 --- a/ecal/tests/cpp/util_test/src/util_test.cpp +++ b/ecal/tests/cpp/util_test/src/util_test.cpp @@ -195,6 +195,8 @@ TEST(core_cpp_util, Freq_ResettableFrequencyCalculator) } } + +#if ECAL_CORE_PUBLISHER TEST(core_cpp_util, ParallelGetTopics) { constexpr const int max_publisher_count(2000); @@ -260,3 +262,4 @@ TEST(core_cpp_util, ParallelGetTopics) eCAL::Finalize(); } +#endif // ECAL_CORE_PUBLISHER