diff --git a/app/play/play_gui/src/main.cpp b/app/play/play_gui/src/main.cpp index 906771dd7a..91fcb1618b 100644 --- a/app/play/play_gui/src/main.cpp +++ b/app/play/play_gui/src/main.cpp @@ -159,7 +159,7 @@ int main(int argc, char *argv[]) ////////////////////////////////////////////////////////////////////////////// // Just make sure that eCAL is initialized - eCAL::Initialize(0, nullptr, "eCALPlayGUI", eCAL::Init::Default | eCAL::Init::ProcessReg | eCAL::Init::Publisher | eCAL::Init::Service | eCAL::Init::Monitoring); + eCAL::Initialize(0, nullptr, "eCALPlayGUI", eCAL::Init::Default | eCAL::Init::Publisher | eCAL::Init::Service | eCAL::Init::Monitoring); // For linux big measurements require more file descriptors than the default value #ifdef ECAL_OS_LINUX diff --git a/contrib/mma/src/mma_application.cpp b/contrib/mma/src/mma_application.cpp index eac9841fce..0bd4ec8312 100644 --- a/contrib/mma/src/mma_application.cpp +++ b/contrib/mma/src/mma_application.cpp @@ -135,7 +135,7 @@ int main(int argc, char** argv) std::cout << app_version_header << std::endl << ecal_version_header << std::endl << std::endl; // initialize eCAL API - if (eCAL::Initialize(0, nullptr, MMA_APPLICATION_NAME, eCAL::Init::Publisher | eCAL::Init::ProcessReg) < 0) + if (eCAL::Initialize(0, nullptr, MMA_APPLICATION_NAME, eCAL::Init::Publisher) < 0) { std::cout << "eCAL initialization failed !"; return 1; diff --git a/ecal/core/include/ecal/cimpl/ecal_init_cimpl.h b/ecal/core/include/ecal/cimpl/ecal_init_cimpl.h index 490fccb9bf..42384c6c8a 100644 --- a/ecal/core/include/ecal/cimpl/ecal_init_cimpl.h +++ b/ecal/core/include/ecal/cimpl/ecal_init_cimpl.h @@ -31,21 +31,18 @@ #define eCAL_Init_Monitoring 0x08 /*!< Initialize Monitoring API */ #define eCAL_Init_Logging 0x10 /*!< Initialize Logging API */ #define eCAL_Init_TimeSync 0x20 /*!< Initialize Time API */ -#define eCAL_Init_ProcessReg 0x80 /*!< Initialize Process Registration API */ #define eCAL_Init_All (eCAL_Init_Publisher \ | eCAL_Init_Subscriber \ | eCAL_Init_Service \ | eCAL_Init_Monitoring \ | eCAL_Init_Logging \ - | eCAL_Init_TimeSync \ - | eCAL_Init_ProcessReg) /*!< Initialize complete eCAL API */ + | eCAL_Init_TimeSync) /*!< Initialize complete eCAL API */ #define eCAL_Init_Default (eCAL_Init_Publisher \ | eCAL_Init_Subscriber \ | eCAL_Init_Service \ | eCAL_Init_Logging \ - | eCAL_Init_TimeSync \ - | eCAL_Init_ProcessReg) /*!< Initialize default eCAL API */ + | eCAL_Init_TimeSync) /*!< Initialize default eCAL API */ #endif /*ecal_init_cimpl_h_included*/ diff --git a/ecal/core/include/ecal/ecal_init.h b/ecal/core/include/ecal/ecal_init.h index 87f7108406..d3bb6c8b5b 100644 --- a/ecal/core/include/ecal/ecal_init.h +++ b/ecal/core/include/ecal/ecal_init.h @@ -34,22 +34,19 @@ namespace eCAL static const unsigned int Monitoring = 0x008; static const unsigned int Logging = 0x010; static const unsigned int TimeSync = 0x020; - static const unsigned int ProcessReg = 0x080; static const unsigned int All = Publisher | Subscriber | Service | Monitoring | Logging - | TimeSync - | ProcessReg; + | TimeSync; static const unsigned int Default = Publisher | Subscriber | Service | Logging - | TimeSync - | ProcessReg; + | TimeSync; static const unsigned int None = 0x000; } diff --git a/ecal/core/src/ecal_descgate.cpp b/ecal/core/src/ecal_descgate.cpp index ee38a078ea..a10d996197 100644 --- a/ecal/core/src/ecal_descgate.cpp +++ b/ecal/core/src/ecal_descgate.cpp @@ -24,19 +24,55 @@ #include #include +#include "ecal_globals.h" #include "ecal_descgate.h" + #include -#include -#include -#include -#include -#include -#include + +namespace +{ + // TODO: remove me with new CDescGate + eCAL::CDescGate::QualityFlags GetServiceMethodQuality(const eCAL::SDataTypeInformation& request_info_, const eCAL::SDataTypeInformation& response_info_) + { + eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; + if (!(request_info_.name.empty() && response_info_.name.empty())) + quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; + if (!(request_info_.descriptor.empty() && response_info_.descriptor.empty())) + quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; + + return quality; + } + + eCAL::CDescGate::QualityFlags GetPublisherQuality(const eCAL::SDataTypeInformation& topic_info_) + { + eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; + if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) + quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; + if (!topic_info_.descriptor.empty()) + quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; + quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; + quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_PRODUCER; + + return quality; + } + + eCAL::CDescGate::QualityFlags GetSubscriberQuality(const eCAL::SDataTypeInformation& topic_info_) + { + eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; + if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) + quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; + if (!topic_info_.descriptor.empty()) + quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; + quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; + + return quality; + } +} namespace eCAL { CDescGate::CDescGate() : - m_topic_info_map (std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())), + m_topic_info_map(std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())), m_service_info_map(std::chrono::milliseconds(Config::GetMonitoringTimeoutMs())) { } @@ -44,10 +80,186 @@ namespace eCAL void CDescGate::Create() { +#if ECAL_CORE_REGISTRATION + // utilize registration provider and receiver to get descriptions + g_registration_provider()->SetCustomApplySampleCallback("descgate", [this](const auto& sample_) {this->ApplySample(sample_, tl_none); }); + g_registration_receiver()->SetCustomApplySampleCallback("descgate", [this](const auto& sample_) {this->ApplySample(sample_, tl_none); }); +#endif } void CDescGate::Destroy() { +#if ECAL_CORE_REGISTRATION + // stop registration provider and receiver utilization to get descriptions + g_registration_provider()->RemCustomApplySampleCallback("descgate"); + g_registration_receiver()->RemCustomApplySampleCallback("descgate"); +#endif + } + + void CDescGate::GetTopics(std::unordered_map& topic_info_map_) + { + std::unordered_map map; + + const std::lock_guard lock(m_topic_info_map.sync); + m_topic_info_map.map->remove_deprecated(); + map.reserve(m_topic_info_map.map->size()); + + for (const auto& topic_info : (*m_topic_info_map.map)) + { + map.emplace(topic_info.first, topic_info.second.info); + } + topic_info_map_.swap(map); + } + + void CDescGate::GetTopicNames(std::vector& topic_names_) + { + topic_names_.clear(); + + const std::lock_guard lock(m_topic_info_map.sync); + m_topic_info_map.map->remove_deprecated(); + topic_names_.reserve(m_topic_info_map.map->size()); + + for (const auto& topic_info : (*m_topic_info_map.map)) + { + topic_names_.emplace_back(topic_info.first); + } + } + + bool CDescGate::GetDataTypeInformation(const std::string& topic_name_, SDataTypeInformation& topic_info_) + { + if (topic_name_.empty()) return(false); + + const std::lock_guard lock(m_topic_info_map.sync); + m_topic_info_map.map->remove_deprecated(); + const auto topic_info_it = m_topic_info_map.map->find(topic_name_); + + if (topic_info_it == m_topic_info_map.map->end()) return(false); + topic_info_ = (*topic_info_it).second.info; + return(true); + } + + void CDescGate::GetServices(std::map, SServiceMethodInformation>& service_info_map_) + { + std::map, SServiceMethodInformation> map; + + const std::lock_guard lock(m_service_info_map.sync); + m_service_info_map.map->remove_deprecated(); + + for (const auto& service_info : (*m_service_info_map.map)) + { + map.emplace(service_info.first, service_info.second.info); + } + service_info_map_.swap(map); + } + + void CDescGate::GetServiceNames(std::vector>& service_method_names_) + { + service_method_names_.clear(); + + const std::lock_guard lock(m_service_info_map.sync); + m_service_info_map.map->remove_deprecated(); + service_method_names_.reserve(m_service_info_map.map->size()); + + for (const auto& service_info : (*m_service_info_map.map)) + { + service_method_names_.emplace_back(service_info.first); + } + } + + bool CDescGate::GetServiceTypeNames(const std::string& service_name_, const std::string& method_name_, std::string& req_type_name_, std::string& resp_type_name_) + { + std::tuple service_method_tuple = std::make_tuple(service_name_, method_name_); + + const std::lock_guard lock(m_service_info_map.sync); + m_service_info_map.map->remove_deprecated(); + auto service_info_map_it = m_service_info_map.map->find(service_method_tuple); + + if (service_info_map_it == m_service_info_map.map->end()) return false; + req_type_name_ = (*service_info_map_it).second.info.request_type.name; + resp_type_name_ = (*service_info_map_it).second.info.response_type.name; + return true; + } + + bool CDescGate::GetServiceDescription(const std::string& service_name_, const std::string& method_name_, std::string& req_type_desc_, std::string& resp_type_desc_) + { + std::tuple service_method_tuple = std::make_tuple(service_name_, method_name_); + + const std::lock_guard lock(m_service_info_map.sync); + m_service_info_map.map->remove_deprecated(); + auto service_info_map_it = m_service_info_map.map->find(service_method_tuple); + + if (service_info_map_it == m_service_info_map.map->end()) return false; + req_type_desc_ = (*service_info_map_it).second.info.request_type.descriptor; + resp_type_desc_ = (*service_info_map_it).second.info.response_type.descriptor; + return true; + } + + bool CDescGate::ApplySample(const Registration::Sample& sample_, eTLayerType /*layer_*/) + { + switch (sample_.cmd_type) + { + case bct_none: + case bct_set_sample: + case bct_reg_process: + case bct_unreg_process: + break; + case bct_reg_service: + { + for (const auto& method : sample_.service.methods) + { + SDataTypeInformation request_type{}; + request_type.name = method.req_type; + request_type.descriptor = method.req_desc; + + SDataTypeInformation response_type{}; + response_type.name = method.resp_type; + response_type.descriptor = method.resp_desc; + + ApplyServiceDescription(sample_.service.sname, method.mname, request_type, response_type, GetServiceMethodQuality(request_type, response_type)); + } + } + break; + case bct_unreg_service: + // TODO: Implement fast unregistration + break; + case bct_reg_client: + // TODO: Implement this after client methods are available + //for (const auto& method : sample_.client.methods) + //{ + // SDataTypeInformation request_type; + // request_type.name = method.req_type; + // request_type.descriptor = method.req_desc; + + // SDataTypeInformation response_type{}; + // response_type.name = method.resp_type; + // response_type.descriptor = method.resp_desc; + + // ApplyClientDescription(sample_.service.sname, method.mname, request_type, response_type, GetQuality(sample_)); + //} + break; + case bct_unreg_client: + // TODO: Implement fast unregistration + break; + case bct_reg_publisher: + ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype, GetPublisherQuality(sample_.topic.tdatatype)); + break; + case bct_unreg_publisher: + // TODO: Implement fast unregistration + break; + case bct_reg_subscriber: + ApplyTopicDescription(sample_.topic.tname, sample_.topic.tdatatype, GetSubscriberQuality(sample_.topic.tdatatype)); + break; + case bct_unreg_subscriber: + // TODO: Implement fast unregistration + break; + default: + { + Logging::Log(log_level_debug1, "CDescGate::ApplySample : unknown sample type"); + } + break; + } + + return true; } bool CDescGate::ApplyTopicDescription(const std::string& topic_name_, const SDataTypeInformation& topic_info_, const QualityFlags description_quality_) @@ -56,14 +268,14 @@ namespace eCAL m_topic_info_map.map->remove_deprecated(); const auto topic_info_it = m_topic_info_map.map->find(topic_name_); - + // new element (no need to check anything, just add it) - if(topic_info_it == m_topic_info_map.map->end()) + if (topic_info_it == m_topic_info_map.map->end()) { // create a new topic entry STopicInfoQuality& topic_info = (*m_topic_info_map.map)[topic_name_]; - topic_info.info = topic_info_; - topic_info.quality = description_quality_; + topic_info.info = topic_info_; + topic_info.quality = description_quality_; return true; } @@ -82,7 +294,7 @@ namespace eCAL if (description_quality_ > topic_info.quality) { // overwrite attributes - topic_info.info = topic_info_; + topic_info.info = topic_info_; topic_info.quality = description_quality_; // update attributes and return @@ -166,7 +378,7 @@ namespace eCAL // topic type description differs // we log the error and update the entry one time - if ( !topic_info_.descriptor.empty() + if (!topic_info_.descriptor.empty() && !topic_info.info.descriptor.empty() && (topic_info.info.descriptor != topic_info_.descriptor) ) @@ -190,53 +402,11 @@ namespace eCAL return false; } - void CDescGate::GetTopics(std::unordered_map& topic_info_map_) - { - std::unordered_map map; - - const std::lock_guard lock(m_topic_info_map.sync); - m_topic_info_map.map->remove_deprecated(); - map.reserve(m_topic_info_map.map->size()); - - for (const auto& topic_info : (*m_topic_info_map.map)) - { - map.emplace(topic_info.first, topic_info.second.info); - } - topic_info_map_.swap(map); - } - - void CDescGate::GetTopicNames(std::vector& topic_names_) - { - topic_names_.clear(); - - const std::lock_guard lock(m_topic_info_map.sync); - m_topic_info_map.map->remove_deprecated(); - topic_names_.reserve(m_topic_info_map.map->size()); - - for (const auto& topic_info : (*m_topic_info_map.map)) - { - topic_names_.emplace_back(topic_info.first); - } - } - - bool CDescGate::GetDataTypeInformation(const std::string& topic_name_, SDataTypeInformation& topic_info_) - { - if (topic_name_.empty()) return(false); - - const std::lock_guard lock(m_topic_info_map.sync); - m_topic_info_map.map->remove_deprecated(); - const auto topic_info_it = m_topic_info_map.map->find(topic_name_); - - if (topic_info_it == m_topic_info_map.map->end()) return(false); - topic_info_ = (*topic_info_it).second.info; - return(true); - } - bool CDescGate::ApplyServiceDescription(const std::string& service_name_ - , const std::string& method_name_ - , const SDataTypeInformation& request_type_information_ - , const SDataTypeInformation& response_type_information_ - , const QualityFlags description_quality_) + , const std::string& method_name_ + , const SDataTypeInformation& request_type_information_ + , const SDataTypeInformation& response_type_information_ + , const QualityFlags description_quality_) { std::tuple service_method_tuple = std::make_tuple(service_name_, method_name_); @@ -248,9 +418,9 @@ namespace eCAL { // create a new service entry SServiceMethodInfoQuality& service_info = (*m_service_info_map.map)[service_method_tuple]; - service_info.info.request_type = request_type_information_; - service_info.info.response_type = response_type_information_; - service_info.quality = description_quality_; + service_info.info.request_type = request_type_information_; + service_info.info.response_type = response_type_information_; + service_info.quality = description_quality_; return true; } @@ -260,9 +430,9 @@ namespace eCAL SServiceMethodInfoQuality service_info = (*service_info_map_it).second; if (description_quality_ > service_info.quality) { - service_info.info.request_type = request_type_information_; - service_info.info.response_type = response_type_information_; - service_info.quality = description_quality_; + service_info.info.request_type = request_type_information_; + service_info.info.response_type = response_type_information_; + service_info.quality = description_quality_; ret_value = true; } @@ -271,60 +441,4 @@ namespace eCAL return ret_value; } - - void CDescGate::GetServices(std::map, SServiceMethodInformation>& service_info_map_) - { - std::map, SServiceMethodInformation> map; - - const std::lock_guard lock(m_service_info_map.sync); - m_service_info_map.map->remove_deprecated(); - - for (const auto& service_info : (*m_service_info_map.map)) - { - map.emplace(service_info.first, service_info.second.info); - } - service_info_map_.swap(map); - } - - void CDescGate::GetServiceNames(std::vector>& service_method_names_) - { - service_method_names_.clear(); - - const std::lock_guard lock(m_service_info_map.sync); - m_service_info_map.map->remove_deprecated(); - service_method_names_.reserve(m_service_info_map.map->size()); - - for (const auto& service_info : (*m_service_info_map.map)) - { - service_method_names_.emplace_back(service_info.first); - } - } - - bool CDescGate::GetServiceTypeNames(const std::string& service_name_, const std::string& method_name_, std::string& req_type_name_, std::string& resp_type_name_) - { - std::tuple service_method_tuple = std::make_tuple(service_name_, method_name_); - - const std::lock_guard lock(m_service_info_map.sync); - m_service_info_map.map->remove_deprecated(); - auto service_info_map_it = m_service_info_map.map->find(service_method_tuple); - - if (service_info_map_it == m_service_info_map.map->end()) return false; - req_type_name_ = (*service_info_map_it).second.info.request_type.name; - resp_type_name_ = (*service_info_map_it).second.info.response_type.name; - return true; - } - - bool CDescGate::GetServiceDescription(const std::string& service_name_, const std::string& method_name_, std::string& req_type_desc_, std::string& resp_type_desc_) - { - std::tuple service_method_tuple = std::make_tuple(service_name_, method_name_); - - const std::lock_guard lock(m_service_info_map.sync); - m_service_info_map.map->remove_deprecated(); - auto service_info_map_it = m_service_info_map.map->find(service_method_tuple); - - if (service_info_map_it == m_service_info_map.map->end()) return false; - req_type_desc_ = (*service_info_map_it).second.info.request_type.descriptor; - resp_type_desc_ = (*service_info_map_it).second.info.response_type.descriptor; - return true; - } } diff --git a/ecal/core/src/ecal_descgate.h b/ecal/core/src/ecal_descgate.h index a6a35bcd32..6a95b398bb 100644 --- a/ecal/core/src/ecal_descgate.h +++ b/ecal/core/src/ecal_descgate.h @@ -23,14 +23,15 @@ #pragma once -#include #include #include #include "ecal_global_accessors.h" #include "ecal_def.h" +#include "serialization/ecal_serialize_sample_registration.h" #include "util/ecal_expmap.h" +#include #include #include #include @@ -66,49 +67,51 @@ namespace eCAL void Create(); void Destroy(); - bool ApplyTopicDescription(const std::string& topic_name_, - const SDataTypeInformation& topic_info_, - QualityFlags description_quality_); - void GetTopics(std::unordered_map& topic_info_map_); void GetTopicNames(std::vector& topic_names_); bool GetDataTypeInformation(const std::string& topic_name_, SDataTypeInformation& topic_info_); - bool ApplyServiceDescription(const std::string& service_name_, - const std::string& method_name_, - const SDataTypeInformation& request_type_information_, - const SDataTypeInformation& response_type_information_, - QualityFlags description_quality_); - void GetServices(std::map, SServiceMethodInformation>& service_info_map_); void GetServiceNames(std::vector>& service_method_names_); bool GetServiceTypeNames(const std::string& service_name_, const std::string& method_name_, std::string& req_type_name_, std::string& resp_type_name_); bool GetServiceDescription(const std::string& service_name_, const std::string& method_name_, std::string& req_type_desc_, std::string& resp_type_desc_); protected: + bool ApplySample(const Registration::Sample& sample_, eTLayerType layer_); + + bool ApplyTopicDescription(const std::string& topic_name_, + const SDataTypeInformation& topic_info_, + QualityFlags description_quality_); + + bool ApplyServiceDescription(const std::string& service_name_, + const std::string& method_name_, + const SDataTypeInformation& request_type_information_, + const SDataTypeInformation& response_type_information_, + QualityFlags description_quality_); + struct STopicInfoQuality { - SDataTypeInformation info; //!< Topic info struct with type encoding, name and descriptor. - QualityFlags quality = QualityFlags::NO_QUALITY; //!< QualityFlags to determine whether we may overwrite the current data with better one. E.g. we prefer the description sent by a publisher over one sent by a subscriber. - bool type_missmatch_logged = false; //!< Whether we have already logged a type-missmatch + SDataTypeInformation info; //!< Topic info struct with type encoding, name and descriptor. + QualityFlags quality = QualityFlags::NO_QUALITY; //!< QualityFlags to determine whether we may overwrite the current data with better one. E.g. we prefer the description sent by a publisher over one sent by a subscriber. + bool type_missmatch_logged = false; //!< Whether we have already logged a type-missmatch }; struct SServiceMethodInfoQuality { SServiceMethodInformation info; //!< Service info struct with type names and descriptors for request and response. - QualityFlags quality = QualityFlags::NO_QUALITY; //!< The Quality of the Info + QualityFlags quality = QualityFlags::NO_QUALITY; //!< The Quality of the Info }; // key: topic name | value: topic (type/desc), quality - using TopicInfoMap = eCAL::Util::CExpMap; //!< Map containing { TopicName -> (Type, Description, Quality) } mapping of all topics that are currently known + using TopicInfoMap = eCAL::Util::CExpMap; //!< Map containing { TopicName -> (Type, Description, Quality) } mapping of all topics that are currently known struct STopicInfoMap { explicit STopicInfoMap(const std::chrono::milliseconds& timeout_) : map(std::make_unique(timeout_)) { }; - mutable std::mutex sync; //!< Mutex protecting the map - std::unique_ptr map; //!< Map containing information about each known topic + mutable std::mutex sync; //!< Mutex protecting the map + std::unique_ptr map; //!< Map containing information about each known topic }; STopicInfoMap m_topic_info_map; diff --git a/ecal/core/src/ecal_globals.cpp b/ecal/core/src/ecal_globals.cpp index 1effb47c26..634fdfc3b2 100644 --- a/ecal/core/src/ecal_globals.cpp +++ b/ecal/core/src/ecal_globals.cpp @@ -229,7 +229,7 @@ namespace eCAL //if (config_instance) config_instance->Create(); if (log_instance && ((components_ & Init::Logging) != 0u)) log_instance->Create(); #if ECAL_CORE_REGISTRATION - if (registration_provider_instance) registration_provider_instance->Create(true, true, (components_ & Init::ProcessReg) != 0x0); + if (registration_provider_instance) registration_provider_instance->Create(); if (registration_receiver_instance) registration_receiver_instance->Create(); #endif if (descgate_instance) descgate_instance->Create(); diff --git a/ecal/core/src/monitoring/ecal_monitoring_impl.cpp b/ecal/core/src/monitoring/ecal_monitoring_impl.cpp index 976def495a..fd5561c38f 100644 --- a/ecal/core/src/monitoring/ecal_monitoring_impl.cpp +++ b/ecal/core/src/monitoring/ecal_monitoring_impl.cpp @@ -30,7 +30,9 @@ #include +#include "registration/ecal_registration_provider.h" #include "registration/ecal_registration_receiver.h" + #include "serialization/ecal_serialize_monitoring.h" @@ -56,8 +58,9 @@ namespace eCAL // get name of this host m_host_name = Process::GetHostName(); - // utilize registration receiver to enrich monitor information - g_registration_receiver()->SetCustomApplySampleCallback([this](const auto& sample_){this->ApplySample(sample_, tl_none);}); + // utilize registration provider and receiver to enrich monitor information + g_registration_provider()->SetCustomApplySampleCallback("monitoring", [this](const auto& sample_) {this->ApplySample(sample_, tl_none); }); + g_registration_receiver()->SetCustomApplySampleCallback("monitoring", [this](const auto& sample_){this->ApplySample(sample_, tl_none);}); // setup blacklist and whitelist filter strings# m_topic_filter_excl_s = Config::GetMonitoringFilterExcludeList(); @@ -71,7 +74,9 @@ namespace eCAL void CMonitoringImpl::Destroy() { - g_registration_receiver()->RemCustomApplySampleCallback(); + // stop registration provider and receiver utilization to enrich monitor information + g_registration_provider()->RemCustomApplySampleCallback("monitoring"); + g_registration_receiver()->RemCustomApplySampleCallback("monitoring"); m_init = false; } diff --git a/ecal/core/src/pubsub/ecal_pubgate.cpp b/ecal/core/src/pubsub/ecal_pubgate.cpp index 96e1c12db2..e285cef2c6 100644 --- a/ecal/core/src/pubsub/ecal_pubgate.cpp +++ b/ecal/core/src/pubsub/ecal_pubgate.cpp @@ -31,27 +31,6 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyTopicDescription(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_) - { - if (eCAL::g_descgate() != nullptr) - { - // Calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!topic_info_.descriptor.empty()) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; - - return eCAL::g_descgate()->ApplyTopicDescription(topic_name_, topic_info_, quality); - } - return false; - } -} - namespace eCAL { ////////////////////////////////////////////////////////////////// @@ -153,9 +132,6 @@ namespace eCAL } #endif - // store description - ApplyTopicDescription(topic_name, topic_information); - // register local subscriber const std::shared_lock lock(m_topic_name_datawriter_sync); auto res = m_topic_name_datawriter_map.equal_range(topic_name); @@ -208,9 +184,6 @@ namespace eCAL } #endif - // store description - ApplyTopicDescription(topic_name, topic_information); - // register external subscriber const std::shared_lock lock(m_topic_name_datawriter_sync); auto res = m_topic_name_datawriter_map.equal_range(topic_name); @@ -248,6 +221,7 @@ namespace eCAL const std::shared_lock lock(m_topic_name_datawriter_sync); for (const auto& iter : m_topic_name_datawriter_map) { + // force data writer to (re)register itself on registration provider iter.second->RefreshRegistration(); } } diff --git a/ecal/core/src/pubsub/ecal_publisher.cpp b/ecal/core/src/pubsub/ecal_publisher.cpp index 6e3d456d34..9a917e8983 100644 --- a/ecal/core/src/pubsub/ecal_publisher.cpp +++ b/ecal/core/src/pubsub/ecal_publisher.cpp @@ -33,29 +33,6 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyTopicDescription(const std::string& topic_name_, const eCAL::SDataTypeInformation& data_type_info_) - { - if (eCAL::g_descgate() != nullptr) - { - // Calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!data_type_info_.name.empty() || !data_type_info_.encoding.empty()) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!data_type_info_.descriptor.empty()) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_THIS_PROCESS; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_PRODUCER; - - return eCAL::g_descgate()->ApplyTopicDescription(topic_name_, data_type_info_, quality); - } - return false; - } -} - namespace eCAL { CPublisher::CPublisher() : @@ -143,9 +120,6 @@ namespace eCAL // register publisher gateway (for publisher memory file and event name) g_pubgate()->Register(topic_name_, m_datawriter); - // register to description gateway for type / description checking - ApplyTopicDescription(topic_name_, data_type_info_); - // we made it :-) m_created = true; @@ -192,7 +166,6 @@ namespace eCAL bool CPublisher::SetDataTypeInformation(const SDataTypeInformation& data_type_info_) { if (m_datawriter == nullptr) return false; - ApplyTopicDescription(m_datawriter->GetTopicName(), data_type_info_); return m_datawriter->SetDataTypeInformation(data_type_info_); } diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index bee815d8a9..5c13504954 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -35,28 +35,6 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyTopicDescription(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_) - { - if (eCAL::g_descgate() != nullptr) - { - // Calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!topic_info_.descriptor.empty()) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_PRODUCER; - - return eCAL::g_descgate()->ApplyTopicDescription(topic_name_, topic_info_, quality); - } - return false; - } -} - namespace eCAL { ////////////////////////////////////////////////////////////////// @@ -243,9 +221,8 @@ namespace eCAL const std::string& topic_name = ecal_topic.tname; if (topic_name.empty()) return; - // store description + // get topic id const std::string& topic_id = ecal_topic.tid; - ApplyTopicDescription(topic_name, ecal_topic.tdatatype); // get process id const std::string process_id = std::to_string(ecal_sample_.topic.pid); @@ -275,9 +252,6 @@ namespace eCAL const std::string& topic_id = ecal_topic.tid; const std::string process_id = std::to_string(ecal_sample_.topic.pid); - // store description - ApplyTopicDescription(topic_name, ecal_topic.tdatatype); - // unregister local publisher const std::shared_lock lock(m_topic_name_datareader_sync); auto res = m_topic_name_datareader_map.equal_range(topic_name); @@ -340,6 +314,7 @@ namespace eCAL const std::shared_lock lock(m_topic_name_datareader_sync); for (const auto& iter : m_topic_name_datareader_map) { + // force data reader to (re)register itself on registration provider iter.second->RefreshRegistration(); } } diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index aa06516dc5..898af14e3b 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -32,28 +32,6 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyTopicDescription(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_) - { - if (eCAL::g_descgate() != nullptr) - { - // Calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!topic_info_.name.empty() || !topic_info_.encoding.empty()) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!topic_info_.descriptor.empty()) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_THIS_PROCESS; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; - - return eCAL::g_descgate()->ApplyTopicDescription(topic_name_, topic_info_, quality); - } - return false; - } -} - namespace eCAL { CSubscriber::CSubscriber() : @@ -138,9 +116,6 @@ namespace eCAL // register to subscriber gateway for publisher memory file receive thread g_subgate()->Register(topic_name_, m_datareader); - // register to description gateway for type / description checking - ApplyTopicDescription(topic_name_, topic_info_); - // we made it :-) m_created = true; diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index 5d1985f7bb..71ad234d0e 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -122,12 +122,12 @@ namespace eCAL // start transport layers SubscribeToLayers(); - // register - Register(false); - // mark as created m_created = true; + // register + Register(false); + return(true); } @@ -155,11 +155,13 @@ namespace eCAL m_event_callback_map.clear(); } - // unregister + // mark as no more created + m_created = false; + + // and unregister Unregister(); // reset defaults - m_created = false; m_clock = 0; m_message_drops = 0; @@ -230,6 +232,7 @@ namespace eCAL bool CDataReader::Register(const bool force_) { #if ECAL_CORE_REGISTRATION + if (!m_created) return(false); if(m_topic_name.empty()) return(false); // create command parameter @@ -301,7 +304,7 @@ namespace eCAL ecal_reg_sample_topic.connections_ext = 0; // register subscriber - if(g_registration_provider() != nullptr) g_registration_provider()->RegisterTopic(m_topic_name, m_topic_id, ecal_reg_sample, force_); + if(g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_); #ifndef NDEBUG // log it Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::DoRegister"); @@ -329,7 +332,7 @@ namespace eCAL ecal_reg_sample_topic.uname = Process::GetUnitName(); // unregister subscriber - if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterTopic(m_topic_name, m_topic_id, ecal_unreg_sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false); #ifndef NDEBUG // log it Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister"); diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index d812ffb940..2aa3d10d08 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -23,6 +23,7 @@ #pragma once +#include #include #include #include diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index d47529e6c3..806dedab4a 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -139,12 +139,12 @@ namespace eCAL // allow to share topic description m_use_tdesc = Config::IsTopicDescriptionSharingEnabled(); - // register - Register(false); - // mark as created m_created = true; + // register + Register(false); + // create udp multicast layer SetUseUdpMC(m_writer.udp_mc_mode.requested); @@ -210,11 +210,12 @@ namespace eCAL m_event_callback_map.clear(); } - // unregister - Unregister(); - + // mark as no more created m_created = false; + // and unregister + Unregister(); + return(true); } @@ -771,6 +772,7 @@ namespace eCAL bool CDataWriter::Register(bool force_) { #if ECAL_CORE_REGISTRATION + if (!m_created) return(false); if (m_topic_name.empty()) return(false); //@Rex: why is the logic different in CDataReader??? @@ -865,7 +867,7 @@ namespace eCAL ecal_reg_sample_topic.connections_ext = static_cast(ext_connections); // register publisher - if (g_registration_provider() != nullptr) g_registration_provider()->RegisterTopic(m_topic_name, m_topic_id, ecal_reg_sample, force_); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_); #ifndef NDEBUG // log it @@ -895,7 +897,7 @@ namespace eCAL ecal_reg_sample_topic.uname = Process::GetUnitName(); // unregister publisher - if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterTopic(m_topic_name, m_topic_id, ecal_unreg_sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false); #ifndef NDEBUG // log it diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index e0fd19afee..51b60dbc0b 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -23,6 +23,7 @@ #pragma once +#include #include #include #include @@ -217,6 +218,6 @@ namespace eCAL bool m_use_tdesc; int m_share_ttype; int m_share_tdesc; - bool m_created; + std::atomic m_created; }; } diff --git a/ecal/core/src/registration/ecal_registration_provider.cpp b/ecal/core/src/registration/ecal_registration_provider.cpp index a8d2ccf904..a30237ce0a 100644 --- a/ecal/core/src/registration/ecal_registration_provider.cpp +++ b/ecal/core/src/registration/ecal_registration_provider.cpp @@ -43,58 +43,11 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyTopicDescription(const std::string& topic_name_, const eCAL::SDataTypeInformation& topic_info_, bool topic_is_a_publisher_) - { - if (eCAL::g_descgate() != nullptr) - { - // calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!topic_info_.encoding.empty() || !topic_info_.name.empty()) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!topic_info_.descriptor.empty()) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - if (topic_is_a_publisher_) - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_PRODUCER; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_THIS_PROCESS; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_CORRECT_ENTITY; - // update description - return eCAL::g_descgate()->ApplyTopicDescription(topic_name_, topic_info_, quality); - } - return false; - } - - // TODO: remove me with new CDescGate - bool ApplyServiceDescription(const std::string& service_name_, const std::string& method_name_, - const eCAL::SDataTypeInformation& request_type_information_, - const eCAL::SDataTypeInformation& response_type_information_) - { - if (eCAL::g_descgate() != nullptr) - { - // Calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!(request_type_information_.name.empty() && response_type_information_.name.empty())) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!(request_type_information_.descriptor.empty() && response_type_information_.descriptor.empty())) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - quality |= eCAL::CDescGate::QualityFlags::INFO_COMES_FROM_THIS_PROCESS; - - return eCAL::g_descgate()->ApplyServiceDescription(service_name_, method_name_, request_type_information_, response_type_information_, quality); - } - return false; - } -} - namespace eCAL { std::atomic CRegistrationProvider::m_created; CRegistrationProvider::CRegistrationProvider() : - m_reg_topics(false), - m_reg_services(false), - m_reg_process(false), m_use_registration_udp(false), m_use_registration_shm(false) { @@ -105,14 +58,10 @@ namespace eCAL Destroy(); } - void CRegistrationProvider::Create(bool topics_, bool services_, bool process_) + void CRegistrationProvider::Create() { if(m_created) return; - m_reg_topics = topics_; - m_reg_services = services_; - m_reg_process = process_; - // send registration to shared memory and to udp m_use_registration_udp = !Config::Experimental::IsNetworkMonitoringDisabled(); m_use_registration_shm = Config::Experimental::IsShmMonitoringEnabled(); @@ -155,16 +104,25 @@ namespace eCAL // stop cyclic registration thread m_reg_sample_snd_thread->stop(); - // send one last (un)registration message to the world - // thank you and goodbye :-) - UnregisterProcess(); + // add process unregistration sample + AddSample2SampleList(GetProcessUnregisterSample()); + + if (m_use_registration_udp) + { + // send process unregistration sample over udp + SendSampleList2UDP(); - // destroy registration sample sender - m_reg_sample_snd.reset(); + // destroy udp registration sample sender + m_reg_sample_snd.reset(); + } #if ECAL_CORE_REGISTRATION_SHM if (m_use_registration_shm) { + // broadcast process unregistration sample over shm + SendSampleList2SHM(); + + // destroy shm registration sample writer m_memfile_broadcast_writer.Unbind(); m_memfile_broadcast.Destroy(); } @@ -173,143 +131,171 @@ namespace eCAL m_created = false; } - bool CRegistrationProvider::RegisterTopic(const std::string& topic_name_, const std::string& topic_id_, const Registration::Sample& ecal_sample_, const bool force_) + bool CRegistrationProvider::ApplySample(const Registration::Sample& sample_, const bool force_) { - if (!m_created) return(false); - if (!m_reg_topics) return(false); + if (!m_created) return(false); - const std::lock_guard lock(m_topics_map_sync); - m_topics_map[topic_name_ + topic_id_] = ecal_sample_; - if(force_) + // forward all registration samples to outside "customer" (e.g. monitoring, descgate) { - RegisterProcess(); - // apply registration sample - ApplySample(topic_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + for (const auto& iter : m_callback_custom_apply_sample_map) + { + iter.second(sample_); + } } - return(true); - } - - bool CRegistrationProvider::UnregisterTopic(const std::string& topic_name_, const std::string& topic_id_, const Registration::Sample& ecal_sample_, const bool force_) - { - if(!m_created) return(false); + // update sample list + AddSample2SampleList(sample_); + // if registration is forced if (force_) { - // apply unregistration sample - ApplySample(topic_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); - } + // send single registration sample over udp + SendSample2UDP(sample_); - SampleMapT::iterator iter; - const std::lock_guard lock(m_topics_map_sync); - iter = m_topics_map.find(topic_name_ + topic_id_); - if(iter != m_topics_map.end()) - { - m_topics_map.erase(iter); - return(true); + // broadcast (updated) sample list over shm + SendSampleList2SHM(); } - return(false); + return(true); } - bool CRegistrationProvider::RegisterServer(const std::string& service_name_, const std::string& service_id_, const Registration::Sample& ecal_sample_, bool force_) + void CRegistrationProvider::SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_) { - if (!m_created) return(false); - if (!m_reg_services) return(false); + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + m_callback_custom_apply_sample_map[customer_] = callback_; + } - const std::lock_guard lock(m_server_map_sync); - m_server_map[service_name_ + service_id_] = ecal_sample_; - if (force_) + void CRegistrationProvider::RemCustomApplySampleCallback(const std::string& customer_) + { + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + auto iter = m_callback_custom_apply_sample_map.find(customer_); + if (iter != m_callback_custom_apply_sample_map.end()) { - RegisterProcess(); - // apply registration sample - ApplySample(service_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); + m_callback_custom_apply_sample_map.erase(iter); } + } - return(true); + void CRegistrationProvider::AddSample2SampleList(const Registration::Sample& sample_) + { + const std::lock_guard lock(m_sample_list_mtx); + m_sample_list.samples.push_back(sample_); } - bool CRegistrationProvider::UnregisterServer(const std::string& service_name_, const std::string& service_id_, const Registration::Sample& ecal_sample_, bool force_) + bool CRegistrationProvider::SendSample2UDP(const Registration::Sample& sample_) { if (!m_created) return(false); - if (force_) + if (m_use_registration_udp && m_reg_sample_snd) { - // apply unregistration sample - ApplySample(service_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); - } + // lock sample buffer + const std::lock_guard lock(m_sample_buffer_mtx); - SampleMapT::iterator iter; - const std::lock_guard lock(m_server_map_sync); - iter = m_server_map.find(service_name_ + service_id_); - if (iter != m_server_map.end()) - { - m_server_map.erase(iter); - return(true); + // serialize single sample + if (SerializeToBuffer(sample_, m_sample_buffer)) + { + // send single sample over udp + return m_reg_sample_snd->Send("reg_sample", m_sample_buffer) != 0; + } } - return(false); } - bool CRegistrationProvider::RegisterClient(const std::string& client_name_, const std::string& client_id_, const Registration::Sample& ecal_sample_, bool force_) + bool CRegistrationProvider::SendSampleList2UDP() { - if (!m_created) return(false); - if (!m_reg_services) return(false); + if (!m_created) return(false); + bool return_value{ true }; - const std::lock_guard lock(m_client_map_sync); - m_client_map[client_name_ + client_id_] = ecal_sample_; - if (force_) + // lock sample list + const std::lock_guard lock(m_sample_list_mtx); + + // send all (single) samples over udp + if (m_use_registration_udp && m_reg_sample_snd) { - RegisterProcess(); - // apply registration sample - ApplySample(client_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); + for (const auto& sample : m_sample_list.samples) + { + return_value &= SendSample2UDP(sample); + } } - return(true); + return return_value; } - bool CRegistrationProvider::UnregisterClient(const std::string& client_name_, const std::string& client_id_, const Registration::Sample& ecal_sample_, bool force_) + bool CRegistrationProvider::SendSampleList2SHM() { if (!m_created) return(false); - if (force_) - { - // apply unregistration sample - ApplySample(client_name_, ecal_sample_); - // apply registration sample to shm registration - SendSampleList(false); - } +#if ECAL_CORE_REGISTRATION_SHM + bool return_value{ true }; - SampleMapT::iterator iter; - const std::lock_guard lock(m_client_map_sync); - iter = m_client_map.find(client_name_ + client_id_); - if (iter != m_client_map.end()) + // send sample list over shm + if (m_use_registration_shm) { - m_client_map.erase(iter); - return(true); + // lock sample list + const std::lock_guard lock(m_sample_list_mtx); + + // serialize whole sample list + if (SerializeToBuffer(m_sample_list, m_sample_list_buffer)) + { + if (!m_sample_list_buffer.empty()) + { + // broadcast sample list over shm + return_value &= m_memfile_broadcast_writer.Write(m_sample_list_buffer.data(), m_sample_list_buffer.size()); + } + } } + return return_value; +#else + return false; +#endif + } - return(false); + void CRegistrationProvider::ClearSampleList() + { + // lock sample list + const std::lock_guard lock(m_sample_list_mtx); + // clear sample list + m_sample_list.samples.clear(); } - bool CRegistrationProvider::RegisterProcess() + void CRegistrationProvider::RegisterSendThread() { - if (!m_created) return(false); - if (!m_reg_process) return(false); +#if ECAL_CORE_SUBSCRIBER + // refresh subscriber registration + if (g_subgate() != nullptr) g_subgate()->RefreshRegistrations(); +#endif + +#if ECAL_CORE_PUBLISHER + // refresh publisher registration + if (g_pubgate() != nullptr) g_pubgate()->RefreshRegistrations(); +#endif + +#if ECAL_CORE_SERVICE + // refresh server registration + if (g_servicegate() != nullptr) g_servicegate()->RefreshRegistrations(); + + // refresh client registration + if (g_clientgate() != nullptr) g_clientgate()->RefreshRegistrations(); +#endif + + // send out sample list over udp + SendSampleList2UDP(); + + // broadcast sample list over shm + SendSampleList2SHM(); + + // clear registration sample list + ClearSampleList(); + + // add process registration sample to internal sample list as first sample (for next registration loop) + AddSample2SampleList(GetProcessRegisterSample()); + } + Registration::Sample CRegistrationProvider::GetProcessRegisterSample() + { Registration::Sample process_sample; - process_sample.cmd_type = bct_reg_process; - auto& process_sample_process = process_sample.process; + process_sample.cmd_type = bct_reg_process; + auto& process_sample_process = process_sample.process; process_sample_process.hname = Process::GetHostName(); process_sample_process.hgname = Process::GetHostGroupName(); process_sample_process.pid = Process::GetProcessID(); @@ -362,205 +348,19 @@ namespace eCAL process_sample_process.ecal_runtime_version = GetVersionString(); - // apply registration sample - const bool return_value = ApplySample(Process::GetHostName(), process_sample); - - return return_value; - } - - bool CRegistrationProvider::UnregisterProcess() - { - if (!m_created) return(false); - if (!m_reg_process) return(false); - - Registration::Sample process_sample; - process_sample.cmd_type = bct_unreg_process; - auto& process_sample_process = process_sample.process; - process_sample_process.hname = Process::GetHostName(); - process_sample_process.pid = Process::GetProcessID(); - process_sample_process.pname = Process::GetProcessName(); - process_sample_process.uname = Process::GetUnitName(); - - // apply unregistration sample - const bool return_value = ApplySample(Process::GetHostName(), process_sample); - - return return_value; - } - - bool CRegistrationProvider::RegisterTopics() - { - if (!m_created) return(false); - if (!m_reg_topics) return(false); - - bool return_value {true}; - const std::lock_guard lock(m_topics_map_sync); - for(SampleMapT::const_iterator iter = m_topics_map.begin(); iter != m_topics_map.end(); ++iter) - { - ////////////////////////////////////////////// - // update description - ////////////////////////////////////////////// - // read attributes - const std::string topic_name(iter->second.topic.tname); - const bool topic_is_a_publisher(iter->second.cmd_type == eCAL::bct_reg_publisher); - - SDataTypeInformation topic_info; - const auto& topic_datatype = iter->second.topic.tdatatype; - topic_info.encoding = topic_datatype.encoding; - topic_info.name = topic_datatype.name; - topic_info.descriptor = topic_datatype.descriptor; - - ApplyTopicDescription(topic_name, topic_info, topic_is_a_publisher); - - ////////////////////////////////////////////// - // send sample to registration layer - ////////////////////////////////////////////// - return_value &= ApplySample(iter->second.topic.tname, iter->second); - } - - return return_value; - } - - bool CRegistrationProvider::RegisterServer() - { - if (!m_created) return(false); - if (!m_reg_services) return(false); - - bool return_value {true}; - const std::lock_guard lock(m_server_map_sync); - for (SampleMapT::const_iterator iter = m_server_map.begin(); iter != m_server_map.end(); ++iter) - { - ////////////////////////////////////////////// - // update description - ////////////////////////////////////////////// - const auto& ecal_sample_service = iter->second.service; - for (const auto& method : ecal_sample_service.methods) - { - SDataTypeInformation request_type; - request_type.name = method.req_type; - request_type.descriptor = method.req_desc; - - SDataTypeInformation response_type; - response_type.name = method.resp_type; - response_type.descriptor = method.resp_desc; - - ApplyServiceDescription(ecal_sample_service.sname, method.mname, request_type, response_type); - } - - ////////////////////////////////////////////// - // send sample to registration layer - ////////////////////////////////////////////// - return_value &= ApplySample(iter->second.service.sname, iter->second); - } - - return return_value; + return process_sample; } - bool CRegistrationProvider::RegisterClient() + Registration::Sample CRegistrationProvider::GetProcessUnregisterSample() { - if (!m_created) return(false); - if (!m_reg_services) return(false); - - bool return_value {true}; - const std::lock_guard lock(m_client_map_sync); - for (SampleMapT::const_iterator iter = m_client_map.begin(); iter != m_client_map.end(); ++iter) - { - // apply registration sample - return_value &= ApplySample(iter->second.client.sname, iter->second); - } - - return return_value; - } - - bool CRegistrationProvider::ApplySample(const std::string& sample_name_, const Registration::Sample& sample_) - { - if(!m_created) return(false); - - bool return_value {true}; - - if (m_use_registration_udp && m_reg_sample_snd) - { - const std::lock_guard lock(m_sample_buffer_sync); - if (SerializeToBuffer(sample_, m_sample_buffer)) - { - return_value &= (m_reg_sample_snd->Send(sample_name_, m_sample_buffer) != 0); - } - } - -#if ECAL_CORE_REGISTRATION_SHM - if (m_use_registration_shm) - { - const std::lock_guard lock(m_sample_list_sync); - m_sample_list.samples.push_back(sample_); - } -#endif - - return return_value; - } - - bool CRegistrationProvider::SendSampleList(bool reset_sample_list_) - { - if (!m_created) return(false); - bool return_value{ true }; - -#if ECAL_CORE_REGISTRATION_SHM - if (m_use_registration_shm) - { - { - const std::lock_guard lock(m_sample_list_sync); - if (SerializeToBuffer(m_sample_list, m_sample_list_buffer)) - { - if (reset_sample_list_) - { - m_sample_list.samples.clear(); - } - } - } - - if (!m_sample_list_buffer.empty()) - { - return_value &= m_memfile_broadcast_writer.Write(m_sample_list_buffer.data(), m_sample_list_buffer.size()); - } - } -#endif - - return return_value; - } - - void CRegistrationProvider::RegisterSendThread() - { -#if ECAL_CORE_SUBSCRIBER - // refresh subscriber registration - if (g_subgate() != nullptr) g_subgate()->RefreshRegistrations(); -#endif - -#if ECAL_CORE_PUBLISHER - // refresh publisher registration - if (g_pubgate() != nullptr) g_pubgate()->RefreshRegistrations(); -#endif - -#if ECAL_CORE_SERVICE - // refresh server registration - if (g_servicegate() != nullptr) g_servicegate()->RefreshRegistrations(); - - // refresh client registration - if (g_clientgate() != nullptr) g_clientgate()->RefreshRegistrations(); -#endif - - // register process - RegisterProcess(); - -#if ECAL_CORE_SERVICE - // register server - RegisterServer(); - - // register clients - RegisterClient(); -#endif - - // register topics - RegisterTopics(); + Registration::Sample process_sample; + process_sample.cmd_type = bct_unreg_process; + auto& process_sample_process = process_sample.process; + process_sample_process.hname = Process::GetHostName(); + process_sample_process.pid = Process::GetProcessID(); + process_sample_process.pname = Process::GetProcessName(); + process_sample_process.uname = Process::GetUnitName(); - // write sample list to shared memory - SendSampleList(); + return process_sample; } } diff --git a/ecal/core/src/registration/ecal_registration_provider.h b/ecal/core/src/registration/ecal_registration_provider.h index a630151420..e073874348 100644 --- a/ecal/core/src/registration/ecal_registration_provider.h +++ b/ecal/core/src/registration/ecal_registration_provider.h @@ -43,7 +43,6 @@ #include #include #include -#include #include namespace eCAL @@ -54,64 +53,49 @@ namespace eCAL CRegistrationProvider(); ~CRegistrationProvider(); - void Create(bool topics_, bool services_, bool process_); + void Create(); void Destroy(); - bool RegisterTopic(const std::string& topic_name_, const std::string& topic_id_, const Registration::Sample& ecal_sample_, bool force_); - bool UnregisterTopic(const std::string& topic_name_, const std::string& topic_id_, const Registration::Sample& ecal_sample_, bool force_); + bool ApplySample(const Registration::Sample& sample_, bool force_); - bool RegisterServer(const std::string& service_name_, const std::string& service_id_, const Registration::Sample& ecal_sample_, bool force_); - bool UnregisterServer(const std::string& service_name_, const std::string& service_id_, const Registration::Sample& ecal_sample_, bool force_); - - bool RegisterClient(const std::string& client_name_, const std::string& client_id_, const Registration::Sample& ecal_sample_, bool force_); - bool UnregisterClient(const std::string& client_name_, const std::string& client_id_, const Registration::Sample& ecal_sample_, bool force_); + using ApplySampleCallbackT = std::function; + void SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_); + void RemCustomApplySampleCallback(const std::string& customer_); protected: - bool RegisterProcess(); - bool UnregisterProcess(); - - bool RegisterTopics(); + void AddSample2SampleList(const Registration::Sample& sample_); + bool SendSample2UDP(const Registration::Sample& sample_); - bool RegisterServer(); - bool RegisterClient(); + bool SendSampleList2UDP(); + bool SendSampleList2SHM(); + void ClearSampleList(); - bool ApplySample(const std::string& sample_name_, const eCAL::Registration::Sample& sample_); - void RegisterSendThread(); - bool SendSampleList(bool reset_sample_list_ = true); + Registration::Sample GetProcessRegisterSample(); + Registration::Sample GetProcessUnregisterSample(); static std::atomic m_created; - bool m_reg_topics; - bool m_reg_services; - bool m_reg_process; std::shared_ptr m_reg_sample_snd; std::shared_ptr m_reg_sample_snd_thread; - std::mutex m_sample_buffer_sync; + std::mutex m_sample_buffer_mtx; std::vector m_sample_buffer; - using SampleMapT = std::unordered_map; - std::mutex m_topics_map_sync; - SampleMapT m_topics_map; - - std::mutex m_server_map_sync; - SampleMapT m_server_map; - - std::mutex m_client_map_sync; - SampleMapT m_client_map; + std::mutex m_sample_list_mtx; + Registration::SampleList m_sample_list; #if ECAL_CORE_REGISTRATION_SHM - std::mutex m_sample_list_sync; - Registration::SampleList m_sample_list; std::vector m_sample_list_buffer; - CMemoryFileBroadcast m_memfile_broadcast; CMemoryFileBroadcastWriter m_memfile_broadcast_writer; #endif bool m_use_registration_udp; bool m_use_registration_shm; + + std::mutex m_callback_custom_apply_sample_map_mtx; + std::map m_callback_custom_apply_sample_map; }; } diff --git a/ecal/core/src/registration/ecal_registration_receiver.cpp b/ecal/core/src/registration/ecal_registration_receiver.cpp index f69643861d..2bade0bea8 100644 --- a/ecal/core/src/registration/ecal_registration_receiver.cpp +++ b/ecal/core/src/registration/ecal_registration_receiver.cpp @@ -56,7 +56,6 @@ namespace eCAL m_callback_process(nullptr), m_use_registration_udp(false), m_use_registration_shm(false), - m_callback_custom_apply_sample([](const auto&) {}), m_host_group_name(Process::GetHostGroupName()) { } @@ -147,20 +146,23 @@ namespace eCAL { if(!m_created) return false; - Registration::Sample ecal_sample; - if (!DeserializeFromBuffer(serialized_sample_data_, serialized_sample_size_, ecal_sample)) return false; + Registration::Sample sample; + if (!DeserializeFromBuffer(serialized_sample_data_, serialized_sample_size_, sample)) return false; - return ApplySample(ecal_sample); + return ApplySample(sample); } - bool CRegistrationReceiver::ApplySample(const Registration::Sample& ecal_sample_) + bool CRegistrationReceiver::ApplySample(const Registration::Sample& sample_) { if (!m_created) return false; - // forward all registration samples to outside "customer" (e.g. Monitoring) + // forward all registration samples to outside "customer" (e.g. monitoring, descgate) { - const std::lock_guard lock(m_callback_custom_apply_sample_mtx); - m_callback_custom_apply_sample(ecal_sample_); + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + for (const auto& iter : m_callback_custom_apply_sample_map) + { + iter.second(sample_); + } } std::string reg_sample; @@ -171,10 +173,10 @@ namespace eCAL || m_callback_process ) { - SerializeToBuffer(ecal_sample_, reg_sample); + SerializeToBuffer(sample_, reg_sample); } - switch (ecal_sample_.cmd_type) + switch (sample_.cmd_type) { case bct_none: case bct_set_sample: @@ -186,7 +188,7 @@ namespace eCAL break; #if ECAL_CORE_SERVICE case bct_reg_service: - if (g_clientgate() != nullptr) g_clientgate()->ApplyServiceRegistration(ecal_sample_); + if (g_clientgate() != nullptr) g_clientgate()->ApplyServiceRegistration(sample_); if (m_callback_service) m_callback_service(reg_sample.c_str(), static_cast(reg_sample.size())); break; case bct_unreg_service: @@ -201,12 +203,12 @@ namespace eCAL break; case bct_reg_subscriber: case bct_unreg_subscriber: - ApplySubscriberRegistration(ecal_sample_); + ApplySubscriberRegistration(sample_); if (m_callback_sub) m_callback_sub(reg_sample.c_str(), static_cast(reg_sample.size())); break; case bct_reg_publisher: case bct_unreg_publisher: - ApplyPublisherRegistration(ecal_sample_); + ApplyPublisherRegistration(sample_); if (m_callback_pub) m_callback_pub(reg_sample.c_str(), static_cast(reg_sample.size())); break; default: @@ -267,24 +269,24 @@ namespace eCAL } } - void CRegistrationReceiver::ApplySubscriberRegistration(const Registration::Sample& ecal_sample_) + void CRegistrationReceiver::ApplySubscriberRegistration(const Registration::Sample& sample_) { #if ECAL_CORE_PUBLISHER // process registrations from same host group - if (IsHostGroupMember(ecal_sample_)) + if (IsHostGroupMember(sample_)) { // do not register local entities, only if loop back flag is set true - if (m_loopback || (ecal_sample_.topic.pid != Process::GetProcessID())) + if (m_loopback || (sample_.topic.pid != Process::GetProcessID())) { if (g_pubgate() != nullptr) { - switch (ecal_sample_.cmd_type) + switch (sample_.cmd_type) { case bct_reg_subscriber: - g_pubgate()->ApplyLocSubRegistration(ecal_sample_); + g_pubgate()->ApplyLocSubRegistration(sample_); break; case bct_unreg_subscriber: - g_pubgate()->ApplyLocSubUnregistration(ecal_sample_); + g_pubgate()->ApplyLocSubUnregistration(sample_); break; default: break; @@ -299,13 +301,13 @@ namespace eCAL { if (g_pubgate() != nullptr) { - switch (ecal_sample_.cmd_type) + switch (sample_.cmd_type) { case bct_reg_subscriber: - g_pubgate()->ApplyExtSubRegistration(ecal_sample_); + g_pubgate()->ApplyExtSubRegistration(sample_); break; case bct_unreg_subscriber: - g_pubgate()->ApplyExtSubUnregistration(ecal_sample_); + g_pubgate()->ApplyExtSubUnregistration(sample_); break; default: break; @@ -316,24 +318,24 @@ namespace eCAL #endif } - void CRegistrationReceiver::ApplyPublisherRegistration(const Registration::Sample& ecal_sample_) + void CRegistrationReceiver::ApplyPublisherRegistration(const Registration::Sample& sample_) { #if ECAL_CORE_SUBSCRIBER // process registrations from same host group - if (IsHostGroupMember(ecal_sample_)) + if (IsHostGroupMember(sample_)) { // do not register local entities, only if loop back flag is set true - if (m_loopback || (ecal_sample_.topic.pid != Process::GetProcessID())) + if (m_loopback || (sample_.topic.pid != Process::GetProcessID())) { if (g_subgate() != nullptr) { - switch (ecal_sample_.cmd_type) + switch (sample_.cmd_type) { case bct_reg_publisher: - g_subgate()->ApplyLocPubRegistration(ecal_sample_); + g_subgate()->ApplyLocPubRegistration(sample_); break; case bct_unreg_publisher: - g_subgate()->ApplyLocPubUnregistration(ecal_sample_); + g_subgate()->ApplyLocPubUnregistration(sample_); break; default: break; @@ -348,13 +350,13 @@ namespace eCAL { if (g_subgate() != nullptr) { - switch (ecal_sample_.cmd_type) + switch (sample_.cmd_type) { case bct_reg_publisher: - g_subgate()->ApplyExtPubRegistration(ecal_sample_); + g_subgate()->ApplyExtPubRegistration(sample_); break; case bct_unreg_publisher: - g_subgate()->ApplyExtPubUnregistration(ecal_sample_); + g_subgate()->ApplyExtPubUnregistration(sample_); break; default: break; @@ -365,9 +367,9 @@ namespace eCAL #endif } - bool CRegistrationReceiver::IsHostGroupMember(const Registration::Sample& ecal_sample_) + bool CRegistrationReceiver::IsHostGroupMember(const Registration::Sample& sample_) { - const std::string& sample_host_group_name = ecal_sample_.topic.hgname.empty() ? ecal_sample_.topic.hname : ecal_sample_.topic.hgname; + const std::string& sample_host_group_name = sample_.topic.hgname.empty() ? sample_.topic.hname : sample_.topic.hgname; if (sample_host_group_name.empty() || m_host_group_name.empty()) return false; @@ -377,15 +379,19 @@ namespace eCAL return true; } - void CRegistrationReceiver::SetCustomApplySampleCallback(const ApplySampleCallbackT& callback_) + void CRegistrationReceiver::SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_) { - const std::lock_guard lock(m_callback_custom_apply_sample_mtx); - m_callback_custom_apply_sample = callback_; + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + m_callback_custom_apply_sample_map[customer_] = callback_; } - void CRegistrationReceiver::RemCustomApplySampleCallback() + void CRegistrationReceiver::RemCustomApplySampleCallback(const std::string& customer_) { - const std::lock_guard lock(m_callback_custom_apply_sample_mtx); - m_callback_custom_apply_sample = [](const auto&) {}; + const std::lock_guard lock(m_callback_custom_apply_sample_map_mtx); + auto iter = m_callback_custom_apply_sample_map.find(customer_); + if(iter != m_callback_custom_apply_sample_map.end()) + { + m_callback_custom_apply_sample_map.erase(iter); + } } } diff --git a/ecal/core/src/registration/ecal_registration_receiver.h b/ecal/core/src/registration/ecal_registration_receiver.h index a3e4e2d4e2..1d64032da3 100644 --- a/ecal/core/src/registration/ecal_registration_receiver.h +++ b/ecal/core/src/registration/ecal_registration_receiver.h @@ -41,6 +41,7 @@ #include #include +#include #include #include #include @@ -60,22 +61,22 @@ namespace eCAL void EnableLoopback(bool state_); bool HasSample(const std::string& /*sample_name_*/) { return(true); }; - bool ApplySerializedSample(const char* serialized_sample_data_, size_t serialized_sample_size_); - - bool ApplySample(const Registration::Sample& ecal_sample_); + bool ApplySample(const Registration::Sample& sample_); bool AddRegistrationCallback(enum eCAL_Registration_Event event_, const RegistrationCallbackT& callback_); bool RemRegistrationCallback(enum eCAL_Registration_Event event_); using ApplySampleCallbackT = std::function; - void SetCustomApplySampleCallback(const ApplySampleCallbackT& callback_); - void RemCustomApplySampleCallback(); + void SetCustomApplySampleCallback(const std::string& customer_, const ApplySampleCallbackT& callback_); + void RemCustomApplySampleCallback(const std::string& customer_); protected: - void ApplySubscriberRegistration(const eCAL::Registration::Sample& ecal_sample_); - void ApplyPublisherRegistration(const eCAL::Registration::Sample& ecal_sample_); + bool ApplySerializedSample(const char* serialized_sample_data_, size_t serialized_sample_size_); + + void ApplySubscriberRegistration(const eCAL::Registration::Sample& sample_); + void ApplyPublisherRegistration(const eCAL::Registration::Sample& sample_); - bool IsHostGroupMember(const eCAL::Registration::Sample& ecal_sample_); + bool IsHostGroupMember(const eCAL::Registration::Sample& sample_); static std::atomic m_created; bool m_network; @@ -99,8 +100,8 @@ namespace eCAL bool m_use_registration_udp; bool m_use_registration_shm; - std::mutex m_callback_custom_apply_sample_mtx; - ApplySampleCallbackT m_callback_custom_apply_sample; + std::mutex m_callback_custom_apply_sample_map_mtx; + std::map m_callback_custom_apply_sample_map; std::string m_host_group_name; }; diff --git a/ecal/core/src/service/ecal_clientgate.cpp b/ecal/core/src/service/ecal_clientgate.cpp index fa6f2d41cd..9bdc535404 100644 --- a/ecal/core/src/service/ecal_clientgate.cpp +++ b/ecal/core/src/service/ecal_clientgate.cpp @@ -22,36 +22,14 @@ **/ #include "ecal_clientgate.h" -#include "ecal_descgate.h" #include "service/ecal_service_client_impl.h" + #include #include #include #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyServiceDescription(const std::string& service_name_, const std::string& method_name_, - const eCAL::SDataTypeInformation& request_type_information_, - const eCAL::SDataTypeInformation& response_type_information_) - { - if (eCAL::g_descgate() != nullptr) - { - // calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!(request_type_information_.name.empty() && response_type_information_.name.empty())) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!(request_type_information_.descriptor.empty() && response_type_information_.descriptor.empty())) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - - return eCAL::g_descgate()->ApplyServiceDescription(service_name_, method_name_, request_type_information_, response_type_information_, quality); - } - return false; - } -} - namespace eCAL { ////////////////////////////////////////////////////////////////// @@ -135,20 +113,6 @@ namespace eCAL service.tcp_port_v0 = static_cast(ecal_sample_service.tcp_port_v0); service.tcp_port_v1 = static_cast(ecal_sample_service.tcp_port_v1); - // store description - for (const auto& method : ecal_sample_service.methods) - { - SDataTypeInformation request_type; - request_type.name = method.req_type; - request_type.descriptor = method.req_desc; - - SDataTypeInformation response_type{}; - response_type.name = method.resp_type; - response_type.descriptor = method.resp_desc; - - ApplyServiceDescription(ecal_sample_service.sname, method.mname, request_type, response_type); - } - // create service key service.key = service.sname + ":" + service.sid + "@" + std::to_string(service.pid) + "@" + service.hname; @@ -196,10 +160,11 @@ namespace eCAL { if (!m_created) return; - // refresh service registrations + // refresh client registrations const std::shared_lock lock(m_client_set_sync); for (auto *iter : m_client_set) { + // force client to (re)register itself on registration provider iter->RefreshRegistration(); } } diff --git a/ecal/core/src/service/ecal_service_client_impl.cpp b/ecal/core/src/service/ecal_service_client_impl.cpp index ee7076c20b..9bb8d2734f 100644 --- a/ecal/core/src/service/ecal_service_client_impl.cpp +++ b/ecal/core/src/service/ecal_service_client_impl.cpp @@ -29,6 +29,7 @@ #include "registration/ecal_registration_provider.h" #include "serialization/ecal_serialize_service.h" +#include #include #include #include @@ -77,12 +78,12 @@ namespace eCAL counter << std::chrono::steady_clock::now().time_since_epoch().count(); m_service_id = counter.str(); - // register this client - Register(false); - // mark as created m_created = true; + // register this client + Register(false); + return(true); } @@ -108,7 +109,10 @@ namespace eCAL m_event_callback_map.clear(); } - // unregister this client + // mark as no more created + m_created = false; + + // and unregister this client Unregister(); // reset internals @@ -116,9 +120,6 @@ namespace eCAL m_service_id.clear(); m_host_name.clear(); - // mark as not created - m_created = false; - return(true); } @@ -616,6 +617,7 @@ namespace eCAL void CServiceClientImpl::Register(const bool force_) { + if (!m_created) return; if (m_service_name.empty()) return; Registration::Sample sample; @@ -630,7 +632,7 @@ namespace eCAL service_client.sid = m_service_id; // register entity - if (g_registration_provider() != nullptr) g_registration_provider()->RegisterClient(m_service_name, m_service_id, sample, force_); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, force_); // refresh connected services map CheckForNewServices(); @@ -683,7 +685,7 @@ namespace eCAL service_client.version = m_client_version; // unregister entity - if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterClient(m_service_name, m_service_id, sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, false); } void CServiceClientImpl::CheckForNewServices() diff --git a/ecal/core/src/service/ecal_service_client_impl.h b/ecal/core/src/service/ecal_service_client_impl.h index 4211c48ccb..d2bb9c60c1 100644 --- a/ecal/core/src/service/ecal_service_client_impl.h +++ b/ecal/core/src/service/ecal_service_client_impl.h @@ -123,6 +123,6 @@ namespace eCAL std::string m_service_id; std::string m_host_name; - bool m_created; + std::atomic m_created; }; } diff --git a/ecal/core/src/service/ecal_service_server.cpp b/ecal/core/src/service/ecal_service_server.cpp index b76e88a899..1257c1088c 100644 --- a/ecal/core/src/service/ecal_service_server.cpp +++ b/ecal/core/src/service/ecal_service_server.cpp @@ -116,12 +116,15 @@ namespace eCAL bool CServiceServer::AddDescription(const std::string& method_, const std::string& req_type_, const std::string& req_desc_, const std::string& resp_type_, const std::string& resp_desc_) { if (!m_created) return false; + SDataTypeInformation request_type_information; - request_type_information.name = req_type_; + request_type_information.name = req_type_; request_type_information.descriptor = req_desc_; + SDataTypeInformation response_type_information; - response_type_information.name = resp_type_; + response_type_information.name = resp_type_; response_type_information.descriptor = resp_desc_; + return m_service_server_impl->AddDescription(method_, request_type_information, response_type_information); } diff --git a/ecal/core/src/service/ecal_service_server_impl.cpp b/ecal/core/src/service/ecal_service_server_impl.cpp index d91e54bef5..bb61c979a3 100644 --- a/ecal/core/src/service/ecal_service_server_impl.cpp +++ b/ecal/core/src/service/ecal_service_server_impl.cpp @@ -24,7 +24,6 @@ #include #include "registration/ecal_registration_provider.h" -#include "ecal_descgate.h" #include "ecal_global_accessors.h" #include "ecal_service_server_impl.h" #include "ecal_service_singleton_manager.h" @@ -37,28 +36,6 @@ #include #include -namespace -{ - // TODO: remove me with new CDescGate - bool ApplyServiceDescription(const std::string& service_name_, const std::string& method_name_, - const eCAL::SDataTypeInformation& request_type_information_, - const eCAL::SDataTypeInformation& response_type_information_) - { - if (eCAL::g_descgate() != nullptr) - { - // calculate the quality of the current info - eCAL::CDescGate::QualityFlags quality = eCAL::CDescGate::QualityFlags::NO_QUALITY; - if (!(request_type_information_.name.empty() && response_type_information_.name.empty())) - quality |= eCAL::CDescGate::QualityFlags::TYPE_AVAILABLE; - if (!(request_type_information_.descriptor.empty() && response_type_information_.descriptor.empty())) - quality |= eCAL::CDescGate::QualityFlags::DESCRIPTION_AVAILABLE; - - return eCAL::g_descgate()->ApplyServiceDescription(service_name_, method_name_, request_type_information_, response_type_information_, quality); - } - return false; - } -} - namespace eCAL { std::shared_ptr CServiceServerImpl::CreateInstance() @@ -73,7 +50,10 @@ namespace eCAL return instance; } - CServiceServerImpl::CServiceServerImpl() = default; + CServiceServerImpl::CServiceServerImpl() : + m_created(false) + { + } CServiceServerImpl::~CServiceServerImpl() { @@ -145,12 +125,12 @@ namespace eCAL m_tcp_server_v1 = server_manager->create_server(1, 0, service_callback, true, event_callback); } - // register this service - Register(false); - // mark as created m_created = true; + // register this service + Register(false); + return(true); } @@ -176,7 +156,10 @@ namespace eCAL m_event_callback_map.clear(); } - // unregister this service + // mark as no more created + m_created = false; + + // and unregister this service Unregister(); // reset internals @@ -189,8 +172,6 @@ namespace eCAL m_connected_v1 = false; } - m_created = false; - return(true); } @@ -219,8 +200,10 @@ namespace eCAL } } - // update descgate infos - return ApplyServiceDescription(m_service_name, method_, request_type_information_, response_type_information_); + // register this service + Register(false); + + return true; } // add callback function for server method calls @@ -255,16 +238,8 @@ namespace eCAL } } - SDataTypeInformation request_type_information; - request_type_information.name = req_type_; - request_type_information.descriptor = req_desc; - - SDataTypeInformation response_type_information; - response_type_information.name = resp_type_; - response_type_information.descriptor = resp_desc; - - // update descgate infos - ApplyServiceDescription(m_service_name, method_, request_type_information, response_type_information); + // register this service + Register(false); return true; } @@ -342,6 +317,7 @@ namespace eCAL void CServiceServerImpl::Register(const bool force_) { + if (!m_created) return; if (m_service_name.empty()) return; // might be zero in contruction phase @@ -382,7 +358,7 @@ namespace eCAL } // register entity - if (g_registration_provider() != nullptr) g_registration_provider()->RegisterServer(m_service_name, m_service_id, sample, force_); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, force_); } void CServiceServerImpl::Unregister() @@ -402,7 +378,7 @@ namespace eCAL service.sid = m_service_id; // unregister entity - if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterServer(m_service_name, m_service_id, sample, true); + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(sample, false); } int CServiceServerImpl::RequestCallback(const std::string& request_pb_, std::string& response_pb_) diff --git a/ecal/core/src/service/ecal_service_server_impl.h b/ecal/core/src/service/ecal_service_server_impl.h index 5b35fcbef6..6aee86de4f 100644 --- a/ecal/core/src/service/ecal_service_server_impl.h +++ b/ecal/core/src/service/ecal_service_server_impl.h @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -121,10 +122,10 @@ namespace eCAL std::mutex m_event_callback_map_sync; EventCallbackMapT m_event_callback_map; - bool m_created = false; - mutable std::mutex m_connected_mutex; //!< mutex protecting the m_connected_v0 and m_connected_v1 variable, as those are modified by the event callbacks in another thread. bool m_connected_v0 = false; bool m_connected_v1 = false; + + std::atomic m_created; }; } diff --git a/ecal/tests/CMakeLists.txt b/ecal/tests/CMakeLists.txt index 6baf791a34..36a8a7f771 100644 --- a/ecal/tests/CMakeLists.txt +++ b/ecal/tests/CMakeLists.txt @@ -35,7 +35,10 @@ add_subdirectory(cpp/event_test) add_subdirectory(cpp/expmap_test) add_subdirectory(cpp/serialization_test) add_subdirectory(cpp/topic2mcast_test) -add_subdirectory(cpp/util_test) + +if(ECAL_CORE_REGISTRATION) + add_subdirectory(cpp/util_test) +endif() if(ECAL_CORE_REGISTRATION_SHM OR ECAL_CORE_TRANSPORT_SHM) add_subdirectory(cpp/io_memfile_test) diff --git a/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp b/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp index 1cad55c72f..a4c9f445c2 100644 --- a/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp +++ b/ecal/tests/cpp/clientserver_test/src/clientserver_getservices.cpp @@ -95,8 +95,7 @@ TEST(core_cpp_clientserver, GetServices) EXPECT_EQ(resp_desc, ""); // change attributes - bool ret1 = server.AddDescription("foo::method1", "foo::req_type1-1", "foo::req_desc1-1", "foo::resp_type1-1", "foo::resp_desc1-1"); - EXPECT_EQ(ret1, true); + server.AddDescription("foo::method1", "foo::req_type1-1", "foo::req_desc1-1", "foo::resp_type1-1", "foo::resp_desc1-1"); // check attributes eCAL::Util::GetServiceTypeNames("foo::service", "foo::method1", req_type, resp_type); @@ -107,8 +106,7 @@ TEST(core_cpp_clientserver, GetServices) EXPECT_EQ(resp_desc, "foo::resp_desc1-1"); // change attributes again (this will not overwrite the attributes anymore) - bool ret2 = server.AddDescription("foo::method1", "foo::req_type1-2", "foo::req_desc1-2", "foo::resp_type1-2", "foo::resp_desc1-2"); - EXPECT_EQ(ret2, false); + server.AddDescription("foo::method1", "foo::req_type1-2", "foo::req_desc1-2", "foo::resp_type1-2", "foo::resp_desc1-2"); // check attributes eCAL::Util::GetServiceTypeNames("foo::service", "foo::method1", req_type, resp_type); diff --git a/ecal/tests/cpp/util_test/src/util_test.cpp b/ecal/tests/cpp/util_test/src/util_test.cpp index 9d5b96512b..a4abb1664e 100644 --- a/ecal/tests/cpp/util_test/src/util_test.cpp +++ b/ecal/tests/cpp/util_test/src/util_test.cpp @@ -17,11 +17,13 @@ * ========================= eCAL LICENSE ================================= */ -#include #include -#include #include +#include +#include + +#include namespace { void TestCombinedTopicEncodingAndType(const std::string& encoding, const std::string& type, const std::string& expected_result) @@ -116,7 +118,6 @@ TEST(core_cpp_util, Freq_ResettableFrequencyCalculator) { const auto check_delta_t = std::chrono::milliseconds(999); - for (const auto& pair : frequency_pairs) { { @@ -193,3 +194,72 @@ TEST(core_cpp_util, Freq_ResettableFrequencyCalculator) } } } + + +#if ECAL_CORE_PUBLISHER +TEST(core_cpp_util, ParallelGetTopics) +{ + constexpr const int max_publisher_count(2000); + constexpr const int waiting_time_thread(1000); + constexpr const int parallel_threads(1); + + eCAL::Initialize(); + + auto create_publishers = [&]() { + std::string topic_name = "Test.ParallelUtilFunctions"; + std::atomic call_back_count{ 0 }; + + std::vector> publishers; + for (int pub_count = 0; pub_count < max_publisher_count; pub_count++) { + std::unique_ptr publisher = std::make_unique(topic_name + std::to_string(pub_count)); + publishers.push_back(std::move(publisher)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(waiting_time_thread)); + }; + + auto get_topics_from_ecal = [&]() { + size_t found_topics = 0; + std::vector tmp_topic_names; + std::unordered_map topics; + do { + eCAL::Util::GetTopicNames(tmp_topic_names); + eCAL::Util::GetTopics(topics); + + found_topics = tmp_topic_names.size(); + std::cout << "Number of topics found by ecal: " << found_topics << "\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } while (found_topics < max_publisher_count); + + // do it again until all publishers are deleted + do { + eCAL::Util::GetTopicNames(tmp_topic_names); + eCAL::Util::GetTopics(topics); + + found_topics = tmp_topic_names.size(); + std::cout << "Number of topics found by ecal: " << found_topics << "\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } while (found_topics != 0); + }; + + std::vector threads_container; + threads_container.push_back(std::thread(create_publishers)); + + for (size_t i = 0; i < parallel_threads; i++) { + threads_container.push_back(std::thread(get_topics_from_ecal)); + } + + for (auto& th : threads_container) { + th.join(); + } + + std::vector final_topic_names; + std::unordered_map final_topics; + eCAL::Util::GetTopicNames(final_topic_names); + eCAL::Util::GetTopics(final_topics); + + EXPECT_EQ(final_topic_names.size(), 0); + EXPECT_EQ(final_topics.size(), 0); + + eCAL::Finalize(); +} +#endif // ECAL_CORE_PUBLISHER