From b65b6ae87a7ef7ed908e369eff0a253d38fa77ae Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Fri, 24 May 2024 15:10:58 +0200 Subject: [PATCH 1/5] layer states (enabling) exchanged between pub/sub layer activation based on layer priority lists for local and remote connections --- ecal/core/include/ecal/ecal_subscriber.h | 2 +- ecal/core/include/ecal/msg/subscriber.h | 8 +- ecal/core/src/pubsub/ecal_pubgate.cpp | 8 +- ecal/core/src/pubsub/ecal_subgate.cpp | 14 +- ecal/core/src/pubsub/ecal_subscriber.cpp | 4 +- ecal/core/src/readwrite/ecal_reader.cpp | 679 ++++++++++-------- ecal/core/src/readwrite/ecal_reader.h | 46 +- ecal/core/src/readwrite/ecal_writer.cpp | 438 +++++++---- ecal/core/src/readwrite/ecal_writer.h | 55 +- .../src/readwrite/shm/ecal_reader_shm.cpp | 15 +- .../src/readwrite/shm/ecal_writer_shm.cpp | 7 +- ecal/core/src/readwrite/shm/ecal_writer_shm.h | 4 +- .../src/readwrite/tcp/ecal_reader_tcp.cpp | 9 +- ecal/core/src/readwrite/tcp/ecal_reader_tcp.h | 5 +- .../src/readwrite/udp/ecal_reader_udp.cpp | 8 +- .../serialization/ecal_serialize_common.cpp | 4 +- .../ecal_struct_sample_registration.h | 5 +- ecal/core/src/serialization/nanopb/ecal.pb.h | 6 +- ecal/core/src/serialization/nanopb/layer.pb.h | 20 +- .../src/serialization/nanopb/process.pb.h | 3 +- .../src/serialization/nanopb/service.pb.h | 2 +- ecal/core/src/serialization/nanopb/topic.pb.h | 3 +- ecal/core_pb/src/ecal/core/pb/ecal.proto | 5 +- ecal/core_pb/src/ecal/core/pb/layer.proto | 11 +- ecal/core_pb/src/ecal/core/pb/process.proto | 5 +- ecal/core_pb/src/ecal/core/pb/topic.proto | 5 +- .../protobuf/person_rec/src/person_rec.cpp | 12 +- .../src/registration_compare.cpp | 3 +- .../src/registration_generate.cpp | 3 +- 29 files changed, 823 insertions(+), 566 deletions(-) diff --git a/ecal/core/include/ecal/ecal_subscriber.h b/ecal/core/include/ecal/ecal_subscriber.h index 0ed5ea9f80..d2c070a46f 100644 --- a/ecal/core/include/ecal/ecal_subscriber.h +++ b/ecal/core/include/ecal/ecal_subscriber.h @@ -103,7 +103,7 @@ namespace eCAL * @brief Constructor. * * @param topic_name_ Unique topic name. - * @param data_type_info_ Topic data type information (encoding, type, descriptor). + * @param config_ Optional configuration parameters. **/ ECAL_API explicit CSubscriber(const std::string& topic_name_, const Subscriber::Configuration& config_ = {}); diff --git a/ecal/core/include/ecal/msg/subscriber.h b/ecal/core/include/ecal/msg/subscriber.h index b59d926daa..b293f6762e 100644 --- a/ecal/core/include/ecal/msg/subscriber.h +++ b/ecal/core/include/ecal/msg/subscriber.h @@ -254,12 +254,13 @@ namespace eCAL * @brief Constructor. * * @param topic_name_ Unique topic name. + * @param config_ Optional configuration parameters. **/ - CMessageSubscriber(const std::string& topic_name_) : CSubscriber() + CMessageSubscriber(const std::string& topic_name_, const Subscriber::Configuration& config_ = {}) : CSubscriber() , m_deserializer() { SDataTypeInformation topic_info = m_deserializer.GetDataTypeInformation(); - CSubscriber::Create(topic_name_, topic_info); + CSubscriber::Create(topic_name_, topic_info, config_); } ~CMessageSubscriber() noexcept @@ -418,7 +419,4 @@ namespace eCAL MsgReceiveCallbackT m_cb_callback; Deserializer m_deserializer; }; - - - } diff --git a/ecal/core/src/pubsub/ecal_pubgate.cpp b/ecal/core/src/pubsub/ecal_pubgate.cpp index cd624626d6..0dc863b343 100644 --- a/ecal/core/src/pubsub/ecal_pubgate.cpp +++ b/ecal/core/src/pubsub/ecal_pubgate.cpp @@ -129,18 +129,18 @@ namespace eCAL CDataWriter::SLayerStates layer_states; for (const auto& layer : ecal_topic.tlayer) { - if (layer.confirmed) + if (layer.enabled) { switch (layer.type) { case TLayer::tlayer_udp_mc: - layer_states.udp = true; + layer_states.udp.read_enabled = true; break; case TLayer::tlayer_shm: - layer_states.shm = true; + layer_states.shm.read_enabled = true; break; case TLayer::tlayer_tcp: - layer_states.tcp = true; + layer_states.tcp.read_enabled = true; break; default: break; diff --git a/ecal/core/src/pubsub/ecal_subgate.cpp b/ecal/core/src/pubsub/ecal_subgate.cpp index 44261b5e63..41cf7375e0 100644 --- a/ecal/core/src/pubsub/ecal_subgate.cpp +++ b/ecal/core/src/pubsub/ecal_subgate.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -166,7 +166,7 @@ namespace eCAL const auto& ecal_sample_content = ecal_sample.content; for (const auto& reader : readers_to_apply) { - applied_size = reader->AddSample( + applied_size = reader->ApplySample( ecal_sample.topic.tid, payload_addr, payload_size, @@ -207,7 +207,7 @@ namespace eCAL for (const auto& reader : readers_to_apply) { - applied_size = reader->AddSample(topic_id_, buf_, len_, id_, clock_, time_, hash_, layer_); + applied_size = reader->ApplySample(topic_id_, buf_, len_, id_, clock_, time_, hash_, layer_); } return (applied_size > 0); @@ -232,18 +232,18 @@ namespace eCAL CDataReader::SLayerStates layer_states; for (const auto& layer : ecal_topic.tlayer) { - if (layer.confirmed) + if (layer.enabled) { switch (layer.type) { case TLayer::tlayer_udp_mc: - layer_states.udp = true; + layer_states.udp.write_enabled = true; break; case TLayer::tlayer_shm: - layer_states.shm = true; + layer_states.shm.write_enabled = true; break; case TLayer::tlayer_tcp: - layer_states.tcp = true; + layer_states.tcp.write_enabled = true; break; default: break; diff --git a/ecal/core/src/pubsub/ecal_subscriber.cpp b/ecal/core/src/pubsub/ecal_subscriber.cpp index 0086f32afa..3b785a33d1 100644 --- a/ecal/core/src/pubsub/ecal_subscriber.cpp +++ b/ecal/core/src/pubsub/ecal_subscriber.cpp @@ -81,7 +81,7 @@ namespace eCAL if (topic_name_.empty()) return(false); // create datareader - m_datareader = std::make_shared(topic_name_, data_type_info_); + m_datareader = std::make_shared(topic_name_, data_type_info_, config_); // register datareader g_subgate()->Register(topic_name_, m_datareader); @@ -143,7 +143,7 @@ namespace eCAL bool CSubscriber::ReceiveBuffer(std::string& buf_, long long* time_ /* = nullptr */, int rcv_timeout_ /* = 0 */) const { if (!m_created) return(false); - return(m_datareader->Receive(buf_, time_, rcv_timeout_)); + return(m_datareader->Read(buf_, time_, rcv_timeout_)); } bool CSubscriber::AddReceiveCallback(ReceiveCallbackT callback_) diff --git a/ecal/core/src/readwrite/ecal_reader.cpp b/ecal/core/src/readwrite/ecal_reader.cpp index 9fedd736f9..eab4d6f1e8 100644 --- a/ecal/core/src/readwrite/ecal_reader.cpp +++ b/ecal/core/src/readwrite/ecal_reader.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,17 +21,14 @@ * @brief common eCAL data reader **/ -#include -#include #include -#include -#include -#include -#include +#include +#include #if ECAL_CORE_REGISTRATION #include "registration/ecal_registration_provider.h" #endif + #include "ecal_reader.h" #include "ecal_global_accessors.h" #include "ecal_reader_layer.h" @@ -49,9 +46,14 @@ #endif #include +#include +#include #include +#include +#include +#include #include -#include +#include #include namespace eCAL @@ -59,7 +61,7 @@ namespace eCAL //////////////////////////////////////// // CDataReader //////////////////////////////////////// - CDataReader::CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_) : + CDataReader::CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_, const Subscriber::Configuration& config_) : m_host_name(Process::GetHostName()), m_host_group_name(Process::GetHostGroupName()), m_pid(Process::GetProcessID()), @@ -67,6 +69,7 @@ namespace eCAL m_topic_name(topic_name_), m_topic_info(topic_info_), m_topic_size(0), + m_config(config_), m_connected(false), m_receive_time(0), m_clock(0), @@ -90,7 +93,7 @@ namespace eCAL m_pub_map.set_expiration(registration_timeout); // start transport layers - SubscribeToLayers(); + StartTransportLayer(); // mark as created m_created = true; @@ -118,7 +121,7 @@ namespace eCAL #endif // stop transport layers - UnsubscribeFromLayers(); + StopTransportLayer(); // reset receive callback { @@ -141,173 +144,112 @@ namespace eCAL return true; } - void CDataReader::InitializeLayers() + bool CDataReader::Read(std::string& buf_, long long* time_ /* = nullptr */, int rcv_timeout_ms_ /* = 0 */) { - // initialize udp multicast layer -#if ECAL_CORE_TRANSPORT_UDP - if (Config::IsUdpMulticastRecEnabled()) - { - CUDPReaderLayer::Get()->Initialize(); - } -#endif + if (!m_created) return(false); - // initialize tcp layer -#if ECAL_CORE_TRANSPORT_TCP - if (Config::IsTcpRecEnabled()) - { - CTCPReaderLayer::Get()->Initialize(); - } -#endif - } + std::unique_lock read_buffer_lock(m_read_buf_mtx); - void CDataReader::SubscribeToLayers() - { - // subscribe topic to udp multicast layer -#if ECAL_CORE_TRANSPORT_UDP - if (Config::IsUdpMulticastRecEnabled()) + // No need to wait (for whatever time) if something has been received + if (!m_read_buf_received) { - CUDPReaderLayer::Get()->AddSubscription(m_host_name, m_topic_name, m_topic_id); + if (rcv_timeout_ms_ < 0) + { + m_read_buf_cv.wait(read_buffer_lock, [this]() { return this->m_read_buf_received; }); + } + else if (rcv_timeout_ms_ > 0) + { + m_read_buf_cv.wait_for(read_buffer_lock, std::chrono::milliseconds(rcv_timeout_ms_), [this]() { return this->m_read_buf_received; }); + } } -#endif - // subscribe topic to tcp layer -#if ECAL_CORE_TRANSPORT_TCP - if (Config::IsTcpRecEnabled()) - { - CTCPReaderLayer::Get()->AddSubscription(m_host_name, m_topic_name, m_topic_id); - } -#endif - } - - void CDataReader::UnsubscribeFromLayers() - { - // unsubscribe topic from udp multicast layer -#if ECAL_CORE_TRANSPORT_UDP - if (Config::IsUdpMulticastRecEnabled()) + // did we receive new samples ? + if (m_read_buf_received) { - CUDPReaderLayer::Get()->RemSubscription(m_host_name, m_topic_name, m_topic_id); - } +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug3, m_topic_name + "::CDataReader::Receive"); #endif + // copy content to target string + buf_.clear(); + buf_.swap(m_read_buf); + m_read_buf_received = false; - // unsubscribe topic from tcp multicast layer -#if ECAL_CORE_TRANSPORT_TCP - if (Config::IsTcpRecEnabled()) - { - CTCPReaderLayer::Get()->RemSubscription(m_host_name, m_topic_name, m_topic_id); + // apply time + if (time_ != nullptr) *time_ = m_read_time; + + // return success + return(true); } -#endif + + return(false); } - bool CDataReader::Register(const bool force_) + bool CDataReader::AddReceiveCallback(ReceiveCallbackT callback_) { -#if ECAL_CORE_REGISTRATION - if (!m_created) return(false); - if(m_topic_name.empty()) return(false); + if (!m_created) return(false); - // create command parameter - Registration::Sample ecal_reg_sample; - ecal_reg_sample.cmd_type = bct_reg_subscriber; - auto& ecal_reg_sample_topic = ecal_reg_sample.topic; - ecal_reg_sample_topic.hname = m_host_name; - ecal_reg_sample_topic.hgname = m_host_group_name; - ecal_reg_sample_topic.tname = m_topic_name; - ecal_reg_sample_topic.tid = m_topic_id; - // topic_information + // store receive callback { - auto& ecal_reg_sample_tdatatype = ecal_reg_sample_topic.tdatatype; - if (m_share_ttype) - { - ecal_reg_sample_tdatatype.encoding = m_topic_info.encoding; - ecal_reg_sample_tdatatype.name = m_topic_info.name; - } - if (m_share_tdesc) - { - ecal_reg_sample_tdatatype.descriptor = m_topic_info.descriptor; - } + const std::lock_guard lock(m_receive_callback_mtx); +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddReceiveCallback"); +#endif + m_receive_callback = std::move(callback_); } - ecal_reg_sample_topic.attr = m_attr; - ecal_reg_sample_topic.tsize = static_cast(m_topic_size); -#if ECAL_CORE_TRANSPORT_UDP - // udp multicast layer - { - Registration::TLayer udp_tlayer; - udp_tlayer.type = tl_ecal_udp; - udp_tlayer.version = 1; - udp_tlayer.confirmed = m_confirmed_layers.udp; - ecal_reg_sample_topic.tlayer.push_back(udp_tlayer); - } -#endif + return(true); + } -#if ECAL_CORE_TRANSPORT_SHM - // shm layer - { - Registration::TLayer shm_tlayer; - shm_tlayer.type = tl_ecal_shm; - shm_tlayer.version = 1; - shm_tlayer.confirmed = m_confirmed_layers.shm; - ecal_reg_sample_topic.tlayer.push_back(shm_tlayer); - } -#endif + bool CDataReader::RemReceiveCallback() + { + if (!m_created) return(false); -#if ECAL_CORE_TRANSPORT_TCP - // tcp layer + // reset receive callback { - Registration::TLayer tcp_tlayer; - tcp_tlayer.type = tl_ecal_tcp; - tcp_tlayer.version = 1; - tcp_tlayer.confirmed = m_confirmed_layers.tcp; - ecal_reg_sample_topic.tlayer.push_back(tcp_tlayer); - } + const std::lock_guard lock(m_receive_callback_mtx); +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemReceiveCallback"); #endif + m_receive_callback = nullptr; + } - ecal_reg_sample_topic.pid = m_pid; - ecal_reg_sample_topic.pname = m_pname; - ecal_reg_sample_topic.uname = Process::GetUnitName(); - ecal_reg_sample_topic.dclock = m_clock; - ecal_reg_sample_topic.dfreq = GetFrequency(); - ecal_reg_sample_topic.message_drops = static_cast(m_message_drops); + return(true); + } - // we do not know the number of connections .. - ecal_reg_sample_topic.connections_loc = 0; - ecal_reg_sample_topic.connections_ext = 0; + bool CDataReader::AddEventCallback(eCAL_Subscriber_Event type_, SubEventCallbackT callback_) + { + if (!m_created) return(false); - // register subscriber - if(g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_); + // store event callback + { #ifndef NDEBUG - // log it - Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::DoRegister"); + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddEventCallback"); #endif + const std::lock_guard lock(m_event_callback_map_mtx); + m_event_callback_map[type_] = std::move(callback_); + } -#endif // ECAL_CORE_REGISTRATION return(true); } - bool CDataReader::Unregister() + bool CDataReader::RemEventCallback(eCAL_Subscriber_Event type_) { -#if ECAL_CORE_REGISTRATION - if (m_topic_name.empty()) return(false); - - // create command parameter - Registration::Sample ecal_unreg_sample; - ecal_unreg_sample.cmd_type = bct_unreg_subscriber; - auto& ecal_reg_sample_topic = ecal_unreg_sample.topic; - ecal_reg_sample_topic.hname = m_host_name; - ecal_reg_sample_topic.hgname = m_host_group_name; - ecal_reg_sample_topic.pname = m_pname; - ecal_reg_sample_topic.pid = m_pid; - ecal_reg_sample_topic.tname = m_topic_name; - ecal_reg_sample_topic.tid = m_topic_id; - ecal_reg_sample_topic.uname = Process::GetUnitName(); + if (!m_created) return(false); - // unregister subscriber - if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false); + // reset event callback + { #ifndef NDEBUG - // log it - Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister"); + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemEventCallback"); #endif + const std::lock_guard lock(m_event_callback_map_mtx); + m_event_callback_map[type_] = nullptr; + } -#endif // ECAL_CORE_REGISTRATION return(true); } @@ -346,57 +288,143 @@ namespace eCAL return(true); } - bool CDataReader::Receive(std::string& buf_, long long* time_ /* = nullptr */, int rcv_timeout_ms_ /* = 0 */) + void CDataReader::SetID(const std::set& id_set_) { - if (!m_created) return(false); + m_id_set = id_set_; + } - std::unique_lock read_buffer_lock(m_read_buf_mtx); + void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_) + { + // flag write enabled from publisher side (information not used yet) +#if ECAL_CORE_TRANSPORT_UDP + m_layers.udp.write_enabled = layer_states_.udp.write_enabled; +#endif +#if ECAL_CORE_TRANSPORT_SHM + m_layers.shm.write_enabled = layer_states_.shm.write_enabled; +#endif +#if ECAL_CORE_TRANSPORT_TCP + m_layers.tcp.write_enabled = layer_states_.tcp.write_enabled; +#endif - // No need to wait (for whatever time) if something has been received - if (!m_read_buf_received) + FireConnectEvent(publication_info_.topic_id, data_type_info_); + + // add key to publisher map { - if (rcv_timeout_ms_ < 0) - { - m_read_buf_cv.wait(read_buffer_lock, [this]() { return this->m_read_buf_received; }); - } - else if (rcv_timeout_ms_ > 0) + const std::lock_guard lock(m_pub_map_mtx); + m_pub_map[publication_info_] = std::make_tuple(data_type_info_, layer_states_); + } + } + + void CDataReader::RemovePublication(const SPublicationInfo& publication_info_) + { + // remove key from publisher map + { + const std::lock_guard lock(m_pub_map_mtx); + m_pub_map.erase(publication_info_); + } + } + + void CDataReader::ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_) + { + SReaderLayerPar par; + par.host_name = publication_info_.host_name; + par.process_id = publication_info_.process_id; + par.topic_name = m_topic_name; + par.topic_id = publication_info_.topic_id; + par.parameter = parameter_; + + switch (type_) + { + case tl_ecal_shm: +#if ECAL_CORE_TRANSPORT_SHM + CSHMReaderLayer::Get()->SetConnectionParameter(par); +#endif + break; + case tl_ecal_tcp: +#if ECAL_CORE_TRANSPORT_TCP + CTCPReaderLayer::Get()->SetConnectionParameter(par); +#endif + break; + default: + break; + } + } + + void CDataReader::RefreshRegistration() + { + if (!m_created) return; + + // ensure that registration is not called within zero nanoseconds + // normally it will be called from registration logic every second + + // register without send + Register(false); + + // check connection timeouts + { + const std::lock_guard lock(m_pub_map_mtx); + m_pub_map.remove_deprecated(); + + if (m_pub_map.empty()) { - m_read_buf_cv.wait_for(read_buffer_lock, std::chrono::milliseconds(rcv_timeout_ms_), [this]() { return this->m_read_buf_received; }); + FireDisconnectEvent(); } } + } - // did we receive new samples ? - if (m_read_buf_received) + void CDataReader::InitializeLayers() + { + // initialize udp layer +#if ECAL_CORE_TRANSPORT_UDP + if (Config::IsUdpMulticastRecEnabled()) { -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug3, m_topic_name + "::CDataReader::Receive"); + CUDPReaderLayer::Get()->Initialize(); + } #endif - // copy content to target string - buf_.clear(); - buf_.swap(m_read_buf); - m_read_buf_received = false; - // apply time - if(time_ != nullptr) *time_ = m_read_time; - - // return success - return(true); + // initialize shm layer +#if ECAL_CORE_TRANSPORT_SHM + if (Config::IsShmRecEnabled()) + { + CSHMReaderLayer::Get()->Initialize(); } +#endif - return(false); + // initialize tcp layer +#if ECAL_CORE_TRANSPORT_TCP + if (Config::IsTcpRecEnabled()) + { + CTCPReaderLayer::Get()->Initialize(); + } +#endif } - size_t CDataReader::AddSample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_) + size_t CDataReader::ApplySample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_) { // ensure thread safety const std::lock_guard lock(m_receive_callback_mtx); if (!m_created) return(0); + // check receive layer configuration + switch (layer_) + { + case tl_ecal_udp: + if (!m_config.udp.enable) return 0; + break; + case tl_ecal_shm: + if (!m_config.shm.enable) return 0; + break; + case tl_ecal_tcp: + if (!m_config.tcp.enable) return 0; + break; + default: + break; + } + // store receive layer - m_confirmed_layers.udp |= layer_ == tl_ecal_udp; - m_confirmed_layers.shm |= layer_ == tl_ecal_shm; - m_confirmed_layers.tcp |= layer_ == tl_ecal_tcp; + m_layers.udp.read_confirmed |= layer_ == tl_ecal_udp; + m_layers.shm.read_confirmed |= layer_ == tl_ecal_shm; + m_layers.tcp.read_confirmed |= layer_ == tl_ecal_tcp; // number of hash values to track for duplicates constexpr int hash_queue_size(64); @@ -480,7 +508,7 @@ namespace eCAL } // if not consumed by user receive call - if(!processed) + if (!processed) { // push sample into read buffer const std::lock_guard read_buffer_lock(m_read_buf_mtx); @@ -500,126 +528,226 @@ namespace eCAL return(size_); } - bool CDataReader::AddReceiveCallback(ReceiveCallbackT callback_) + std::string CDataReader::Dump(const std::string& indent_ /* = "" */) { - if (!m_created) return(false); + std::stringstream out; - // store receive callback - { - const std::lock_guard lock(m_receive_callback_mtx); -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddReceiveCallback"); -#endif - m_receive_callback = std::move(callback_); - } + out << '\n'; + out << indent_ << "------------------------------------" << '\n'; + out << indent_ << " class CDataReader " << '\n'; + out << indent_ << "------------------------------------" << '\n'; + out << indent_ << "m_host_name: " << m_host_name << '\n'; + out << indent_ << "m_host_group_name: " << m_host_group_name << '\n'; + out << indent_ << "m_topic_name: " << m_topic_name << '\n'; + out << indent_ << "m_topic_id: " << m_topic_id << '\n'; + out << indent_ << "m_topic_info.encoding: " << m_topic_info.encoding << '\n'; + out << indent_ << "m_topic_info.name: " << m_topic_info.name << '\n'; + out << indent_ << "m_topic_info.desc: " << m_topic_info.descriptor << '\n'; + out << indent_ << "m_topic_size: " << m_topic_size << '\n'; + out << indent_ << "m_read_buf.size(): " << m_read_buf.size() << '\n'; + out << indent_ << "m_read_time: " << m_read_time << '\n'; + out << indent_ << "m_clock: " << m_clock << '\n'; + out << indent_ << "frequency [mHz]: " << GetFrequency() << '\n'; + out << indent_ << "m_created: " << m_created << '\n'; + out << '\n'; - return(true); + return(out.str()); } - bool CDataReader::RemReceiveCallback() + bool CDataReader::Register(const bool force_) { - if (!m_created) return(false); +#if ECAL_CORE_REGISTRATION + if (!m_created) return(false); + if(m_topic_name.empty()) return(false); - // reset receive callback + // create command parameter + Registration::Sample ecal_reg_sample; + ecal_reg_sample.cmd_type = bct_reg_subscriber; + auto& ecal_reg_sample_topic = ecal_reg_sample.topic; + ecal_reg_sample_topic.hname = m_host_name; + ecal_reg_sample_topic.hgname = m_host_group_name; + ecal_reg_sample_topic.tname = m_topic_name; + ecal_reg_sample_topic.tid = m_topic_id; + // topic_information { - const std::lock_guard lock(m_receive_callback_mtx); -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemReceiveCallback"); -#endif - m_receive_callback = nullptr; + auto& ecal_reg_sample_tdatatype = ecal_reg_sample_topic.tdatatype; + if (m_share_ttype) + { + ecal_reg_sample_tdatatype.encoding = m_topic_info.encoding; + ecal_reg_sample_tdatatype.name = m_topic_info.name; + } + if (m_share_tdesc) + { + ecal_reg_sample_tdatatype.descriptor = m_topic_info.descriptor; + } } + ecal_reg_sample_topic.attr = m_attr; + ecal_reg_sample_topic.tsize = static_cast(m_topic_size); - return(true); - } +#if ECAL_CORE_TRANSPORT_UDP + // udp multicast layer + { + Registration::TLayer udp_tlayer; + udp_tlayer.type = tl_ecal_udp; + udp_tlayer.version = 1; + udp_tlayer.enabled = m_layers.udp.read_enabled; + udp_tlayer.confirmed = m_layers.udp.read_confirmed; + ecal_reg_sample_topic.tlayer.push_back(udp_tlayer); + } +#endif - bool CDataReader::AddEventCallback(eCAL_Subscriber_Event type_, SubEventCallbackT callback_) - { - if (!m_created) return(false); +#if ECAL_CORE_TRANSPORT_SHM + // shm layer + { + Registration::TLayer shm_tlayer; + shm_tlayer.type = tl_ecal_shm; + shm_tlayer.version = 1; + shm_tlayer.enabled = m_layers.shm.read_enabled; + shm_tlayer.confirmed = m_layers.shm.read_confirmed; + ecal_reg_sample_topic.tlayer.push_back(shm_tlayer); + } +#endif - // store event callback +#if ECAL_CORE_TRANSPORT_TCP + // tcp layer { + Registration::TLayer tcp_tlayer; + tcp_tlayer.type = tl_ecal_tcp; + tcp_tlayer.version = 1; + tcp_tlayer.enabled = m_layers.tcp.read_enabled; + tcp_tlayer.confirmed = m_layers.tcp.read_confirmed; + ecal_reg_sample_topic.tlayer.push_back(tcp_tlayer); + } +#endif + + ecal_reg_sample_topic.pid = m_pid; + ecal_reg_sample_topic.pname = m_pname; + ecal_reg_sample_topic.uname = Process::GetUnitName(); + ecal_reg_sample_topic.dclock = m_clock; + ecal_reg_sample_topic.dfreq = GetFrequency(); + ecal_reg_sample_topic.message_drops = static_cast(m_message_drops); + + // we do not know the number of connections .. + ecal_reg_sample_topic.connections_loc = 0; + ecal_reg_sample_topic.connections_ext = 0; + + // register subscriber + if(g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_reg_sample, force_); #ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::AddEventCallback"); + // log it + Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::DoRegister"); #endif - const std::lock_guard lock(m_event_callback_map_mtx); - m_event_callback_map[type_] = std::move(callback_); - } return(true); +#else // ECAL_CORE_REGISTRATION + (void)force_; + return(false); +#endif // ECAL_CORE_REGISTRATION } - bool CDataReader::RemEventCallback(eCAL_Subscriber_Event type_) + bool CDataReader::Unregister() { - if (!m_created) return(false); +#if ECAL_CORE_REGISTRATION + if (m_topic_name.empty()) return(false); - // reset event callback - { + // create command parameter + Registration::Sample ecal_unreg_sample; + ecal_unreg_sample.cmd_type = bct_unreg_subscriber; + auto& ecal_reg_sample_topic = ecal_unreg_sample.topic; + ecal_reg_sample_topic.hname = m_host_name; + ecal_reg_sample_topic.hgname = m_host_group_name; + ecal_reg_sample_topic.pname = m_pname; + ecal_reg_sample_topic.pid = m_pid; + ecal_reg_sample_topic.tname = m_topic_name; + ecal_reg_sample_topic.tid = m_topic_id; + ecal_reg_sample_topic.uname = Process::GetUnitName(); + + // unregister subscriber + if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(ecal_unreg_sample, false); #ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::RemEventCallback"); + // log it + Logging::Log(log_level_debug4, m_topic_name + "::CDataReader::Unregister"); #endif - const std::lock_guard lock(m_event_callback_map_mtx); - m_event_callback_map[type_] = nullptr; - } return(true); +#else // ECAL_CORE_REGISTRATION + return(false); +#endif // ECAL_CORE_REGISTRATION } - void CDataReader::SetID(const std::set& id_set_) + void CDataReader::StartTransportLayer() { - m_id_set = id_set_; - } +#if ECAL_CORE_TRANSPORT_UDP + if (m_config.udp.enable) + { + // flag enabled + m_layers.udp.read_enabled = true; - void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_) - { - Connect(publication_info_.topic_id, data_type_info_); + // subscribe to layer (if supported) + CUDPReaderLayer::Get()->AddSubscription(m_host_name, m_topic_name, m_topic_id); + } +#endif - // add key to publisher map +#if ECAL_CORE_TRANSPORT_SHM + if (m_config.shm.enable) { - const std::lock_guard lock(m_pub_map_mtx); - m_pub_map[publication_info_] = std::make_tuple(data_type_info_, layer_states_); + // flag enabled + m_layers.shm.read_enabled = true; + + // subscribe to layer (if supported) + CSHMReaderLayer::Get()->AddSubscription(m_host_name, m_topic_name, m_topic_id); } - } +#endif - void CDataReader::RemovePublication(const SPublicationInfo& publication_info_) - { - // remove key from publisher map +#if ECAL_CORE_TRANSPORT_TCP + if (m_config.tcp.enable) { - const std::lock_guard lock(m_pub_map_mtx); - m_pub_map.erase(publication_info_); + // flag enabled + m_layers.tcp.read_enabled = true; + + // subscribe to layer (if supported) + CTCPReaderLayer::Get()->AddSubscription(m_host_name, m_topic_name, m_topic_id); } +#endif } - - void CDataReader::ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_) + + void CDataReader::StopTransportLayer() { - SReaderLayerPar par; - par.host_name = publication_info_.host_name; - par.process_id = publication_info_.process_id; - par.topic_name = m_topic_name; - par.topic_id = publication_info_.topic_id; - par.parameter = parameter_; - - switch (type_) +#if ECAL_CORE_TRANSPORT_UDP + if (m_config.udp.enable) { - case tl_ecal_shm: + // flag disabled + m_layers.udp.read_enabled = false; + + // unsubscribe from layer (if supported) + CUDPReaderLayer::Get()->RemSubscription(m_host_name, m_topic_name, m_topic_id); + } +#endif + #if ECAL_CORE_TRANSPORT_SHM - CSHMReaderLayer::Get()->SetConnectionParameter(par); + if (m_config.shm.enable) + { + // flag disabled + m_layers.shm.read_enabled = false; + + // unsubscribe from layer (if supported) + CSHMReaderLayer::Get()->RemSubscription(m_host_name, m_topic_name, m_topic_id); + } #endif - break; - case tl_ecal_tcp: + #if ECAL_CORE_TRANSPORT_TCP - CTCPReaderLayer::Get()->SetConnectionParameter(par); -#endif - break; - default: - break; + if (m_config.tcp.enable) + { + // flag disabled + m_layers.tcp.read_enabled = false; + + // unsubscribe from layer (if supported) + CTCPReaderLayer::Get()->RemSubscription(m_host_name, m_topic_name, m_topic_id); } +#endif } - void CDataReader::Connect(const std::string& tid_, const SDataTypeInformation& tinfo_) + void CDataReader::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_) { SSubEventCallbackData data; data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); @@ -657,7 +785,7 @@ namespace eCAL } } - void CDataReader::Disconnect() + void CDataReader::FireDisconnectEvent() { if (m_connected) { @@ -804,53 +932,4 @@ namespace eCAL const std::lock_guard lock(m_frequency_calculator_mtx); return static_cast(m_frequency_calculator.getFrequency(frequency_time) * 1000); } - - void CDataReader::RefreshRegistration() - { - if(!m_created) return; - - // ensure that registration is not called within zero nanoseconds - // normally it will be called from registration logic every second - - // register without send - Register(false); - - // check connection timeouts - { - const std::lock_guard lock(m_pub_map_mtx); - m_pub_map.remove_deprecated(); - - if (m_pub_map.empty()) - { - Disconnect(); - } - } - } - - std::string CDataReader::Dump(const std::string& indent_ /* = "" */) - { - std::stringstream out; - - - out << '\n'; - out << indent_ << "------------------------------------" << '\n'; - out << indent_ << " class CDataReader " << '\n'; - out << indent_ << "------------------------------------" << '\n'; - out << indent_ << "m_host_name: " << m_host_name << '\n'; - out << indent_ << "m_host_group_name: " << m_host_group_name << '\n'; - out << indent_ << "m_topic_name: " << m_topic_name << '\n'; - out << indent_ << "m_topic_id: " << m_topic_id << '\n'; - out << indent_ << "m_topic_info.encoding: " << m_topic_info.encoding << '\n'; - out << indent_ << "m_topic_info.name: " << m_topic_info.name << '\n'; - out << indent_ << "m_topic_info.desc: " << m_topic_info.descriptor << '\n'; - out << indent_ << "m_topic_size: " << m_topic_size << '\n'; - out << indent_ << "m_read_buf.size(): " << m_read_buf.size() << '\n'; - out << indent_ << "m_read_time: " << m_read_time << '\n'; - out << indent_ << "m_clock: " << m_clock << '\n'; - out << indent_ << "frequency [mHz]: " << GetFrequency() << '\n'; - out << indent_ << "m_created: " << m_created << '\n'; - out << '\n'; - - return(out.str()); - } } diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index 5d48a3c1f0..d1d5c07ffc 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,8 @@ #pragma once -#include #include +#include #include #include "serialization/ecal_serialize_sample_payload.h" @@ -39,10 +39,10 @@ #include #include #include +#include #include #include #include -#include #include namespace eCAL @@ -50,11 +50,18 @@ namespace eCAL class CDataReader { public: + struct SLayerState + { + bool write_enabled = false; // publisher side + bool read_enabled = false; // subscriber side + bool read_confirmed = false; // subscriber side + }; + struct SLayerStates { - bool udp = false; - bool shm = false; - bool tcp = false; + SLayerState udp; + SLayerState shm; + SLayerState tcp; }; struct SPublicationInfo @@ -70,14 +77,12 @@ namespace eCAL } }; - CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_); + CDataReader(const std::string& topic_name_, const SDataTypeInformation& topic_info_, const Subscriber::Configuration& config_ = {}); ~CDataReader(); bool Stop(); - static void InitializeLayers(); - - bool Receive(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ms_ = 0); + bool Read(std::string& buf_, long long* time_ = nullptr, int rcv_timeout_ms_ = 0); bool AddReceiveCallback(ReceiveCallbackT callback_); bool RemReceiveCallback(); @@ -95,7 +100,7 @@ namespace eCAL void ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_); - std::string Dump(const std::string& indent_ = ""); + void RefreshRegistration(); bool IsCreated() const { return(m_created); } @@ -115,19 +120,21 @@ namespace eCAL std::string GetTopicID() const { return(m_topic_id); } SDataTypeInformation GetDataTypeInformation() const { return(m_topic_info); } - void RefreshRegistration(); + static void InitializeLayers(); + size_t ApplySample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_); - size_t AddSample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_); + std::string Dump(const std::string& indent_ = ""); protected: - void SubscribeToLayers(); - void UnsubscribeFromLayers(); - bool Register(bool force_); bool Unregister(); - void Connect(const std::string& tid_, const SDataTypeInformation& tinfo_); - void Disconnect(); + void StartTransportLayer(); + void StopTransportLayer(); + + void FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_); + void FireDisconnectEvent(); + bool CheckMessageClock(const std::string& tid_, long long current_clock_); int32_t GetFrequency(); @@ -141,6 +148,7 @@ namespace eCAL SDataTypeInformation m_topic_info; std::map m_attr; std::atomic m_topic_size; + Subscriber::Configuration m_config; std::atomic m_connected; using PublicationMapT = Util::CExpMap>; @@ -177,7 +185,7 @@ namespace eCAL bool m_share_ttype = false; bool m_share_tdesc = false; - SLayerStates m_confirmed_layers; + SLayerStates m_layers; std::atomic m_created; }; } diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index 6f4548e526..fe9fe0c547 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -21,13 +21,10 @@ * @brief common eCAL data writer **/ -#include #include +#include #include -#include -#include -#include -#include +#include #include "config/ecal_config_reader_hlp.h" @@ -41,8 +38,12 @@ #include "pubsub/ecal_pubgate.h" -#include #include +#include +#include +#include +#include +#include struct SSndHash { @@ -65,6 +66,32 @@ namespace std }; } +namespace +{ +#ifndef NDEBUG + // function to convert boolean to string + std::string boolToString(bool value) + { + return value ? "true" : "false"; + } + + // function to log the states of SLayerState + void logLayerState(const std::string& layerName, const eCAL::CDataWriter::SLayerState& state) { + std::cout << layerName << " - Read Enabled: " << boolToString(state.read_enabled) + << ", Write Enabled: " << boolToString(state.write_enabled) + << ", Write Confirmed: " << boolToString(state.write_confirmed) << std::endl; + } + + // function to log the states of SLayerStates + void logLayerStates(const eCAL::CDataWriter::SLayerStates& states) { + std::cout << "Logging Layer States:" << std::endl; + logLayerState("UDP", states.udp); + logLayerState("SHM", states.shm); + logLayerState("TCP", states.tcp); + } +#endif +} + namespace eCAL { CDataWriter::CDataWriter(const std::string& topic_name_, const SDataTypeInformation& topic_info_, const Publisher::Configuration& config_) : @@ -93,14 +120,15 @@ namespace eCAL const std::chrono::milliseconds registration_timeout(Config::GetRegistrationTimeoutMs()); m_sub_map.set_expiration(registration_timeout); + // set layer priority defaults + m_local_layer_priority = { tl_ecal_shm, tl_ecal_udp, tl_ecal_tcp }; + m_remote_layer_priority = { tl_ecal_udp, tl_ecal_tcp }; + // mark as created m_created = true; // register Register(false); - - // start udp, shm, tcp layer - StartTransportLayer(); } CDataWriter::~CDataWriter() @@ -121,8 +149,8 @@ namespace eCAL Logging::Log(log_level_debug1, m_topic_name + "::CDataWriter::Stop"); #endif - // stop udp, shm, tcp layer - StopTransportLayer(); + // stop all transport layer + StopAllLayer(); // clear subscriber maps { @@ -145,92 +173,6 @@ namespace eCAL return true; } - bool CDataWriter::SetDataTypeInformation(const SDataTypeInformation& topic_info_) - { - // Does it even make sense to register if the info is the same??? - const bool force = m_topic_info != topic_info_; - m_topic_info = topic_info_; - -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::SetDescription"); -#endif - - // register it - Register(force); - - return(true); - } - - bool CDataWriter::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) - { - auto current_val = m_attr.find(attr_name_); - - const bool force = current_val == m_attr.end() || current_val->second != attr_value_; - m_attr[attr_name_] = attr_value_; - -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::SetAttribute"); -#endif - - // register it - Register(force); - - return(true); - } - - bool CDataWriter::ClearAttribute(const std::string& attr_name_) - { - auto force = m_attr.find(attr_name_) != m_attr.end(); - - m_attr.erase(attr_name_); - -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ClearAttribute"); -#endif - - // register it - Register(force); - - return(true); - } - - bool CDataWriter::AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_) - { - if (!m_created) return(false); - - // store event callback - { -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::AddEventCallback"); -#endif - const std::lock_guard lock(m_event_callback_map_mtx); - m_event_callback_map[type_] = std::move(callback_); - } - - return(true); - } - - bool CDataWriter::RemEventCallback(eCAL_Publisher_Event type_) - { - if (!m_created) return(false); - - // reset event callback - { -#ifndef NDEBUG - // log it - Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::RemEventCallback"); -#endif - const std::lock_guard lock(m_event_callback_map_mtx); - m_event_callback_map[type_] = nullptr; - } - - return(true); - } - size_t CDataWriter::Write(CPayloadWriter& payload_, long long time_, long long id_) { // get payload buffer size (one time, to avoid multiple computations) @@ -310,7 +252,7 @@ namespace eCAL shm_sent = m_writer_shm->Write(payload_buf, wattr); } - m_confirmed_layers.shm = true; + m_layers.shm.write_confirmed = true; } written |= shm_sent; @@ -361,7 +303,7 @@ namespace eCAL // write to udp multicast layer udp_sent = m_writer_udp->Write(m_payload_buffer.data(), wattr); - m_confirmed_layers.udp = true; + m_layers.udp.write_confirmed = true; } written |= udp_sent; @@ -403,7 +345,7 @@ namespace eCAL // write to tcp layer tcp_sent = m_writer_tcp->Write(m_payload_buffer.data(), wattr); - m_confirmed_layers.tcp = true; + m_layers.tcp.write_confirmed = true; } written |= tcp_sent; @@ -426,14 +368,146 @@ namespace eCAL else return 0; } - void CDataWriter::ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_, const std::string& reader_par_) + bool CDataWriter::SetDataTypeInformation(const SDataTypeInformation& topic_info_) + { + // Does it even make sense to register if the info is the same??? + const bool force = m_topic_info != topic_info_; + m_topic_info = topic_info_; + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::SetDescription"); +#endif + + // register it + Register(force); + + return(true); + } + + bool CDataWriter::SetAttribute(const std::string& attr_name_, const std::string& attr_value_) + { + auto current_val = m_attr.find(attr_name_); + + const bool force = current_val == m_attr.end() || current_val->second != attr_value_; + m_attr[attr_name_] = attr_value_; + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::SetAttribute"); +#endif + + // register it + Register(force); + + return(true); + } + + bool CDataWriter::ClearAttribute(const std::string& attr_name_) + { + auto force = m_attr.find(attr_name_) != m_attr.end(); + + m_attr.erase(attr_name_); + +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ClearAttribute"); +#endif + + // register it + Register(force); + + return(true); + } + + bool CDataWriter::AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_) + { + if (!m_created) return(false); + + // store event callback + { +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::AddEventCallback"); +#endif + const std::lock_guard lock(m_event_callback_map_mtx); + m_event_callback_map[type_] = std::move(callback_); + } + + return(true); + } + + bool CDataWriter::RemEventCallback(eCAL_Publisher_Event type_) { - Connect(subscription_info_.topic_id, data_type_info_); + if (!m_created) return(false); + + // reset event callback + { +#ifndef NDEBUG + // log it + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::RemEventCallback"); +#endif + const std::lock_guard lock(m_event_callback_map_mtx); + m_event_callback_map[type_] = nullptr; + } + + return(true); + } + + void CDataWriter::ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_) + { + FireConnectEvent(subscription_info_.topic_id, data_type_info_); + + // collect layer states + std::vector pub_layers; + std::vector sub_layers; +#if ECAL_CORE_TRANSPORT_UDP + if (m_config.udp.enable) pub_layers.push_back(tl_ecal_udp); + if (sub_layer_states_.udp.read_enabled) sub_layers.push_back(tl_ecal_udp); + + m_layers.udp.read_enabled = sub_layer_states_.udp.read_enabled; // just for debugging/logging +#endif +#if ECAL_CORE_TRANSPORT_SHM + if (m_config.shm.enable) pub_layers.push_back(tl_ecal_shm); + if (sub_layer_states_.shm.read_enabled) sub_layers.push_back(tl_ecal_shm); + + m_layers.shm.read_enabled = sub_layer_states_.shm.read_enabled; // just for debugging/logging +#endif +#if ECAL_CORE_TRANSPORT_TCP + if (m_config.tcp.enable) pub_layers.push_back(tl_ecal_tcp); + if (sub_layer_states_.tcp.read_enabled) sub_layers.push_back(tl_ecal_tcp); + + m_layers.tcp.read_enabled = sub_layer_states_.tcp.read_enabled; // just for debugging/logging +#endif + + // determine if we need to start a transport layer + // if a new layer gets activated, we reregister for SHM and TCP to force the exchange of connection parameter + // without this forced registration we would need one additional registration loop for these two layers to establish the connection + const eTLayerType layer2activate = DetermineTransportLayer2Start(pub_layers, sub_layers, m_host_name == subscription_info_.host_name); + switch (layer2activate) + { + case tl_ecal_udp: + StartUdpLayer(); + break; + case tl_ecal_shm: + if (StartShmLayer()) Register(true); + break; + case tl_ecal_tcp: + if (StartTcpLayer()) Register(true); + break; + default: + break; + } + +#ifndef NDEBUG + // log it + //logLayerStates(m_layers); +#endif // add key to subscriber map { const std::lock_guard lock(m_sub_map_mtx); - m_sub_map[subscription_info_] = std::make_tuple(data_type_info_, layer_states_); + m_sub_map[subscription_info_] = std::make_tuple(data_type_info_, sub_layer_states_); } // add a new subscription @@ -478,6 +552,16 @@ namespace eCAL #endif } + void CDataWriter::SetLocalLayerPriority(const std::vector& layer_priority_) + { + m_local_layer_priority = layer_priority_; + } + + void CDataWriter::SetRemoteLayerPriority(const std::vector& layer_priority_) + { + m_remote_layer_priority = layer_priority_; + } + void CDataWriter::RefreshRegistration() { if (!m_created) return; @@ -492,7 +576,7 @@ namespace eCAL if (m_sub_map.empty()) { - Disconnect(); + FireDisconnectEvent(); } } } @@ -574,7 +658,8 @@ namespace eCAL eCAL::Registration::TLayer udp_tlayer; udp_tlayer.type = tl_ecal_udp; udp_tlayer.version = 1; - udp_tlayer.confirmed = m_confirmed_layers.udp; + udp_tlayer.enabled = m_layers.udp.write_enabled; + udp_tlayer.confirmed = m_layers.udp.write_confirmed; udp_tlayer.par_layer.layer_par_udpmc = m_writer_udp->GetConnectionParameter().layer_par_udpmc; ecal_reg_sample_topic.tlayer.push_back(udp_tlayer); } @@ -587,7 +672,8 @@ namespace eCAL eCAL::Registration::TLayer shm_tlayer; shm_tlayer.type = tl_ecal_shm; shm_tlayer.version = 1; - shm_tlayer.confirmed = m_confirmed_layers.shm; + shm_tlayer.enabled = m_layers.shm.write_enabled; + shm_tlayer.confirmed = m_layers.shm.write_confirmed; shm_tlayer.par_layer.layer_par_shm = m_writer_shm->GetConnectionParameter().layer_par_shm; ecal_reg_sample_topic.tlayer.push_back(shm_tlayer); } @@ -600,7 +686,8 @@ namespace eCAL eCAL::Registration::TLayer tcp_tlayer; tcp_tlayer.type = tl_ecal_tcp; tcp_tlayer.version = 1; - tcp_tlayer.confirmed = m_confirmed_layers.tcp; + tcp_tlayer.enabled = m_layers.tcp.write_enabled; + tcp_tlayer.confirmed = m_layers.tcp.write_confirmed; tcp_tlayer.par_layer.layer_par_tcp = m_writer_tcp->GetConnectionParameter().layer_par_tcp; ecal_reg_sample_topic.tlayer.push_back(tcp_tlayer); } @@ -637,8 +724,11 @@ namespace eCAL Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::Register"); #endif -#endif // ECAL_CORE_REGISTRATION return(true); +#else // ECAL_CORE_REGISTRATION +(void)force_; +return(false); +#endif // ECAL_CORE_REGISTRATION } bool CDataWriter::Unregister() @@ -667,11 +757,13 @@ namespace eCAL Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::UnRegister"); #endif -#endif // ECAL_CORE_REGISTRATION return(true); +#else // ECAL_CORE_REGISTRATION + return(false); +#endif // ECAL_CORE_REGISTRATION } - void CDataWriter::Connect(const std::string& tid_, const SDataTypeInformation& tinfo_) + void CDataWriter::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_) { SPubEventCallbackData data; data.time = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); @@ -709,7 +801,7 @@ namespace eCAL } } - void CDataWriter::Disconnect() + void CDataWriter::FireDisconnectEvent() { if (m_connected) { @@ -731,91 +823,102 @@ namespace eCAL } } - void CDataWriter::StartTransportLayer() + bool CDataWriter::StartUdpLayer() { #if ECAL_CORE_TRANSPORT_UDP - if (m_config.udp.enable) - { - ActivateUdpLayer(); - } -#endif -#if ECAL_CORE_TRANSPORT_SHM - if (m_config.shm.enable) - { - ActivateShmLayer(); - } -#endif -#if ECAL_CORE_TRANSPORT_TCP - if (m_config.tcp.enable) - { - ActivateTcpLayer(); - } -#endif - } + if (m_layers.udp.write_enabled) return false; - void CDataWriter::StopTransportLayer() - { - // destroy udp writer -#if ECAL_CORE_TRANSPORT_UDP - m_writer_udp.reset(); -#endif - - // destroy shm writer -#if ECAL_CORE_TRANSPORT_SHM - m_writer_shm.reset(); -#endif - - // destroy tcp writer -#if ECAL_CORE_TRANSPORT_TCP - m_writer_tcp.reset(); -#endif - } + // flag enabled + m_layers.udp.write_enabled = true; - void CDataWriter::ActivateUdpLayer() - { -#if ECAL_CORE_TRANSPORT_UDP // log state - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::ACTIVATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateUdpLayer::ACTIVATED"); // create writer m_writer_udp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.udp); #ifndef NDEBUG - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::WRITER_CREATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateUdpLayer::WRITER_CREATED"); #endif + return true; +#else // ECAL_CORE_TRANSPORT_UDP + return false; #endif // ECAL_CORE_TRANSPORT_UDP } - void CDataWriter::ActivateShmLayer() + bool CDataWriter::StartShmLayer() { #if ECAL_CORE_TRANSPORT_SHM + if (m_layers.shm.write_enabled) return false; + + // flag enabled + m_layers.shm.write_enabled = true; + // log state - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::ACTIVATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateShmLayer::ACTIVATED"); // create writer m_writer_shm = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.shm); #ifndef NDEBUG - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::WRITER_CREATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateShmLayer::WRITER_CREATED"); #endif + return true; +#else // ECAL_CORE_TRANSPORT_SHM + return false; #endif // ECAL_CORE_TRANSPORT_SHM } - void CDataWriter::ActivateTcpLayer() + bool CDataWriter::StartTcpLayer() { #if ECAL_CORE_TRANSPORT_TCP + if (m_layers.tcp.write_enabled) return false; + + // flag enabled + m_layers.tcp.write_enabled = true; + // log state - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::ACTIVATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateTcpLayer::ACTIVATED"); // create writer m_writer_tcp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.tcp); #ifndef NDEBUG - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::WRITER_CREATED"); + Logging::Log(log_level_debug2, m_topic_name + "::CDataWriter::ActivateTcpLayer::WRITER_CREATED"); #endif + return true; +#else // ECAL_CORE_TRANSPORT_TCP + return false; #endif // ECAL_CORE_TRANSPORT_TCP } + void CDataWriter::StopAllLayer() + { +#if ECAL_CORE_TRANSPORT_UDP + // flag disabled + m_layers.udp.write_enabled = false; + + // destroy writer + m_writer_udp.reset(); +#endif + +#if ECAL_CORE_TRANSPORT_SHM + // flag disabled + m_layers.shm.write_enabled = false; + + // destroy writer + m_writer_shm.reset(); +#endif + +#if ECAL_CORE_TRANSPORT_TCP + // flag disabled + m_layers.tcp.write_enabled = false; + + // destroy writer + m_writer_tcp.reset(); +#endif + } + size_t CDataWriter::PrepareWrite(long long id_, size_t len_) { // store id @@ -851,6 +954,25 @@ namespace eCAL return is_internal_only; } + eTLayerType CDataWriter::DetermineTransportLayer2Start(const std::vector& pub_layer_, const std::vector& sub_layer_, bool same_host_) + { + // determine the priority list to use + const std::vector& priorityList = same_host_ ? m_local_layer_priority : m_remote_layer_priority; + + // find the highest priority transport layer that is available in both publisher and subscriber options + for (const eTLayerType layer : priorityList) + { + if (std::find(pub_layer_.begin(), pub_layer_.end(), layer) != pub_layer_.end() + && std::find(sub_layer_.begin(), sub_layer_.end(), layer) != sub_layer_.end()) + { + return layer; + } + } + + // return tl_none if no common transport layer is found + return tl_none; + } + int32_t CDataWriter::GetFrequency() { const auto frequency_time = std::chrono::steady_clock::now(); diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index 019c06fd26..a9dafa7517 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,11 +58,18 @@ namespace eCAL class CDataWriter { public: + struct SLayerState + { + bool read_enabled = false; // subscriber side + bool write_enabled = false; // publisher side + bool write_confirmed = false; // publisher side + }; + struct SLayerStates { - bool udp = false; - bool shm = false; - bool tcp = false; + SLayerState udp; + SLayerState shm; + SLayerState tcp; }; struct SSubscriptionInfo @@ -83,24 +90,25 @@ namespace eCAL bool Stop(); - bool SetDataTypeInformation(const SDataTypeInformation& topic_info_); + size_t Write(CPayloadWriter& payload_, long long time_, long long id_); - bool SetAttribute(const std::string& attr_name_, const std::string& attr_value_); - bool ClearAttribute(const std::string& attr_name_); + bool SetDataTypeInformation(const SDataTypeInformation& topic_info_); bool AddEventCallback(eCAL_Publisher_Event type_, PubEventCallbackT callback_); bool RemEventCallback(eCAL_Publisher_Event type_); - size_t Write(CPayloadWriter& payload_, long long time_, long long id_); + bool SetAttribute(const std::string& attr_name_, const std::string& attr_value_); + bool ClearAttribute(const std::string& attr_name_); - void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_, const std::string& reader_par_); + void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_); void RemoveSubscription(const SSubscriptionInfo& subscription_info_); + void SetLocalLayerPriority(const std::vector& layer_priority_); + void SetRemoteLayerPriority(const std::vector& layer_priority_); + void RefreshRegistration(); void RefreshSendCounter(); - std::string Dump(const std::string& indent_ = ""); - bool IsCreated() const { return(m_created); } bool IsSubscribed() const @@ -115,26 +123,29 @@ namespace eCAL return(m_sub_map.size()); } - const std::string& GetTopicName() const { return(m_topic_name); } + const std::string& GetTopicName() const { return(m_topic_name); } const SDataTypeInformation& GetDataTypeInformation() const { return m_topic_info; } + std::string Dump(const std::string& indent_ = ""); + protected: bool Register(bool force_); bool Unregister(); - void Connect(const std::string& tid_, const SDataTypeInformation& tinfo_); - void Disconnect(); + bool StartUdpLayer(); + bool StartShmLayer(); + bool StartTcpLayer(); - void StartTransportLayer(); - void StopTransportLayer(); + void StopAllLayer(); - void ActivateUdpLayer(); - void ActivateShmLayer(); - void ActivateTcpLayer(); + void FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_); + void FireDisconnectEvent(); size_t PrepareWrite(long long id_, size_t len_); - bool IsInternalSubscribedOnly(); + bool IsInternalSubscribedOnly(); + eTLayerType DetermineTransportLayer2Start(const std::vector& pub_layer_, const std::vector& sub_layer_, bool same_host_); + int32_t GetFrequency(); std::string m_host_name; @@ -176,7 +187,9 @@ namespace eCAL std::unique_ptr m_writer_tcp; #endif - SLayerStates m_confirmed_layers; + std::vector m_local_layer_priority; + std::vector m_remote_layer_priority; + SLayerStates m_layers; std::atomic m_created; }; } diff --git a/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp b/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp index 244460f1eb..a20c0db7c5 100644 --- a/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp +++ b/ecal/core/src/readwrite/shm/ecal_reader_shm.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,18 @@ * @brief shared memory layer **/ -#include -#include -#include +#include +#include #include "ecal_global_accessors.h" -#include "pubsub/ecal_subgate.h" -#include "io/shm/ecal_memfile_pool.h" #include "ecal_reader_shm.h" +#include "io/shm/ecal_memfile_pool.h" +#include "pubsub/ecal_subgate.h" + +#include +#include + namespace eCAL { //////////////// diff --git a/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp b/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp index 43d1b56fe7..9194569bae 100644 --- a/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp +++ b/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,13 +21,13 @@ * @brief memory file data writer **/ -#include #include -#include #include "ecal_def.h" #include "ecal_writer_shm.h" +#include + namespace eCAL { const std::string CDataWriterSHM::m_memfile_base_name = "ecal_"; @@ -39,6 +39,7 @@ namespace eCAL m_topic_name = topic_name_; // initialize memory file buffer + if (m_config.memfile_buffer_count < 1) m_config.memfile_buffer_count = 1; SetBufferCount(m_config.memfile_buffer_count); } diff --git a/ecal/core/src/readwrite/shm/ecal_writer_shm.h b/ecal/core/src/readwrite/shm/ecal_writer_shm.h index 01517db79a..11817d8418 100644 --- a/ecal/core/src/readwrite/shm/ecal_writer_shm.h +++ b/ecal/core/src/readwrite/shm/ecal_writer_shm.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +25,8 @@ #include -#include "readwrite/ecal_writer_base.h" #include "io/shm/ecal_memfile_sync.h" +#include "readwrite/ecal_writer_base.h" #include #include diff --git a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp index ffbfbae0af..ed82df0fd1 100644 --- a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp +++ b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,14 @@ * @brief tcp reader and layer **/ -#include "ecal_global_accessors.h" - #include -#include "pubsub/ecal_subgate.h" - +#include "ecal_global_accessors.h" #include "ecal_reader_tcp.h" #include "ecal_tcp_pubsub_logger.h" +#include "pubsub/ecal_subgate.h" + #include "ecal_utils/portable_endian.h" namespace eCAL diff --git a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h index cc1a74c548..83fd64f557 100644 --- a/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h +++ b/ecal/core/src/readwrite/tcp/ecal_reader_tcp.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,9 @@ #include "serialization/ecal_struct_sample_payload.h" +#include +#include + namespace eCAL { //////////////// diff --git a/ecal/core/src/readwrite/udp/ecal_reader_udp.cpp b/ecal/core/src/readwrite/udp/ecal_reader_udp.cpp index 1bdcdf443e..1023bd8e39 100644 --- a/ecal/core/src/readwrite/udp/ecal_reader_udp.cpp +++ b/ecal/core/src/readwrite/udp/ecal_reader_udp.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,12 +21,14 @@ * @brief udp multicast reader and layer **/ -#include "ecal_reader_udp.h" +#include +#include "ecal_reader_udp.h" #include "ecal_global_accessors.h" -#include "pubsub/ecal_subgate.h" #include "io/udp/ecal_udp_configurations.h" +#include "pubsub/ecal_subgate.h" + #include #include #include diff --git a/ecal/core/src/serialization/ecal_serialize_common.cpp b/ecal/core/src/serialization/ecal_serialize_common.cpp index 7f8c49092d..5ffac31072 100644 --- a/ecal/core/src/serialization/ecal_serialize_common.cpp +++ b/ecal/core/src/serialization/ecal_serialize_common.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -263,6 +263,7 @@ namespace eCAL eCAL_pb_TLayer pb_layer = eCAL_pb_TLayer_init_default; pb_layer.type = static_cast(layer.type); pb_layer.version = layer.version; + pb_layer.enabled = layer.enabled; pb_layer.confirmed = layer.confirmed; // layer @@ -315,6 +316,7 @@ namespace eCAL // apply layer values layer.type = static_cast(pb_layer.type); layer.version = pb_layer.version; + layer.enabled = pb_layer.enabled; layer.confirmed = pb_layer.confirmed; // apply tcp layer parameter diff --git a/ecal/core/src/serialization/ecal_struct_sample_registration.h b/ecal/core/src/serialization/ecal_struct_sample_registration.h index 383b8712c1..9c4ffcb823 100644 --- a/ecal/core/src/serialization/ecal_struct_sample_registration.h +++ b/ecal/core/src/serialization/ecal_struct_sample_registration.h @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -116,7 +116,8 @@ namespace eCAL { eTLayerType type = tl_none; // transport layer type int32_t version = 0; // transport layer version - bool confirmed = false; // transport layer used? + bool enabled = false; // transport layer enabled ? + bool confirmed = false; // transport layer usage confirmed ? ConnectionPar par_layer; // transport layer parameter }; diff --git a/ecal/core/src/serialization/nanopb/ecal.pb.h b/ecal/core/src/serialization/nanopb/ecal.pb.h index 20c84373f2..8711ae4c83 100644 --- a/ecal/core/src/serialization/nanopb/ecal.pb.h +++ b/ecal/core/src/serialization/nanopb/ecal.pb.h @@ -14,7 +14,8 @@ #endif /* Enum definitions */ -typedef enum _eCAL_pb_eCmdType { +typedef enum _eCAL_pb_eCmdType { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 7 to 11; */ eCAL_pb_eCmdType_bct_none = 0, /* undefined command */ eCAL_pb_eCmdType_bct_set_sample = 1, /* set sample content */ eCAL_pb_eCmdType_bct_reg_publisher = 2, /* register publisher */ @@ -30,7 +31,8 @@ typedef enum _eCAL_pb_eCmdType { } eCAL_pb_eCmdType; /* Struct definitions */ -typedef struct _eCAL_pb_Content { +typedef struct _eCAL_pb_Content { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 5; */ int64_t id; /* sample id */ int64_t clock; /* internal used clock */ int64_t time; /* time the content was updated */ diff --git a/ecal/core/src/serialization/nanopb/layer.pb.h b/ecal/core/src/serialization/nanopb/layer.pb.h index 9abe83c506..f6a5fe2394 100644 --- a/ecal/core/src/serialization/nanopb/layer.pb.h +++ b/ecal/core/src/serialization/nanopb/layer.pb.h @@ -10,7 +10,8 @@ #endif /* Enum definitions */ -typedef enum _eCAL_pb_eTLayerType { +typedef enum _eCAL_pb_eTLayerType { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 2, 3, 42; */ eCAL_pb_eTLayerType_tl_none = 0, /* undefined */ eCAL_pb_eTLayerType_tl_ecal_udp_mc = 1, /* ecal udp multicast */ /* 2 = ecal udp unicast (not supported anymore) @@ -34,7 +35,8 @@ typedef struct _eCAL_pb_LayerParTcp { int32_t port; /* tcp writers port number */ } eCAL_pb_LayerParTcp; -typedef struct _eCAL_pb_ConnnectionPar { +typedef struct _eCAL_pb_ConnnectionPar { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 3; */ bool has_layer_par_udpmc; eCAL_pb_LayerParUdpMC layer_par_udpmc; /* parameter for ecal udp multicast */ bool has_layer_par_shm; @@ -44,12 +46,14 @@ typedef struct _eCAL_pb_ConnnectionPar { eCAL_pb_LayerParTcp layer_par_tcp; /* parameter for ecal tcp */ } eCAL_pb_ConnnectionPar; -typedef struct _eCAL_pb_TLayer { +typedef struct _eCAL_pb_TLayer { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 4; */ eCAL_pb_eTLayerType type; /* transport layer type */ int32_t version; /* transport layer version */ - bool confirmed; /* transport layer used ? */ + bool confirmed; /* transport layer usage confirmed ? */ bool has_par_layer; eCAL_pb_ConnnectionPar par_layer; /* transport layer parameter */ + bool enabled; /* transport enabled ? */ } eCAL_pb_TLayer; @@ -74,12 +78,12 @@ extern "C" { #define eCAL_pb_LayerParShm_init_default {{{NULL}, NULL}} #define eCAL_pb_LayerParTcp_init_default {0} #define eCAL_pb_ConnnectionPar_init_default {false, eCAL_pb_LayerParUdpMC_init_default, false, eCAL_pb_LayerParShm_init_default, false, eCAL_pb_LayerParTcp_init_default} -#define eCAL_pb_TLayer_init_default {_eCAL_pb_eTLayerType_MIN, 0, 0, false, eCAL_pb_ConnnectionPar_init_default} +#define eCAL_pb_TLayer_init_default {_eCAL_pb_eTLayerType_MIN, 0, 0, false, eCAL_pb_ConnnectionPar_init_default, 0} #define eCAL_pb_LayerParUdpMC_init_zero {0} #define eCAL_pb_LayerParShm_init_zero {{{NULL}, NULL}} #define eCAL_pb_LayerParTcp_init_zero {0} #define eCAL_pb_ConnnectionPar_init_zero {false, eCAL_pb_LayerParUdpMC_init_zero, false, eCAL_pb_LayerParShm_init_zero, false, eCAL_pb_LayerParTcp_init_zero} -#define eCAL_pb_TLayer_init_zero {_eCAL_pb_eTLayerType_MIN, 0, 0, false, eCAL_pb_ConnnectionPar_init_zero} +#define eCAL_pb_TLayer_init_zero {_eCAL_pb_eTLayerType_MIN, 0, 0, false, eCAL_pb_ConnnectionPar_init_zero, 0} /* Field tags (for use in manual encoding/decoding) */ #define eCAL_pb_LayerParShm_memory_file_list_tag 1 @@ -91,6 +95,7 @@ extern "C" { #define eCAL_pb_TLayer_version_tag 2 #define eCAL_pb_TLayer_confirmed_tag 3 #define eCAL_pb_TLayer_par_layer_tag 5 +#define eCAL_pb_TLayer_enabled_tag 6 /* Struct field encoding specification for nanopb */ #define eCAL_pb_LayerParUdpMC_FIELDLIST(X, a) \ @@ -122,7 +127,8 @@ X(a, STATIC, OPTIONAL, MESSAGE, layer_par_tcp, 4) X(a, STATIC, SINGULAR, UENUM, type, 1) \ X(a, STATIC, SINGULAR, INT32, version, 2) \ X(a, STATIC, SINGULAR, BOOL, confirmed, 3) \ -X(a, STATIC, OPTIONAL, MESSAGE, par_layer, 5) +X(a, STATIC, OPTIONAL, MESSAGE, par_layer, 5) \ +X(a, STATIC, SINGULAR, BOOL, enabled, 6) #define eCAL_pb_TLayer_CALLBACK NULL #define eCAL_pb_TLayer_DEFAULT NULL #define eCAL_pb_TLayer_par_layer_MSGTYPE eCAL_pb_ConnnectionPar diff --git a/ecal/core/src/serialization/nanopb/process.pb.h b/ecal/core/src/serialization/nanopb/process.pb.h index e3d3d4ae64..592f0b5f17 100644 --- a/ecal/core/src/serialization/nanopb/process.pb.h +++ b/ecal/core/src/serialization/nanopb/process.pb.h @@ -40,7 +40,8 @@ typedef struct _eCAL_pb_ProcessState { eCAL_pb_eProcessSeverityLevel severity_level; /* severity level */ } eCAL_pb_ProcessState; -typedef struct _eCAL_pb_Process { +typedef struct _eCAL_pb_Process { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 7 to 11; */ int32_t rclock; /* registration clock */ pb_callback_t hname; /* host name */ int32_t pid; /* process id */ diff --git a/ecal/core/src/serialization/nanopb/service.pb.h b/ecal/core/src/serialization/nanopb/service.pb.h index 753b7292f4..611f224e07 100644 --- a/ecal/core/src/serialization/nanopb/service.pb.h +++ b/ecal/core/src/serialization/nanopb/service.pb.h @@ -101,7 +101,7 @@ extern "C" { #define eCAL_pb_Response_init_default {false, eCAL_pb_ServiceHeader_init_default, {{NULL}, NULL}, 0} #define eCAL_pb_Method_init_default {{{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}} #define eCAL_pb_Service_init_default {0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, 0, 0} -#define eCAL_pb_Client_init_default {0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, 0} +#define eCAL_pb_Client_init_default {0, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, {{NULL}, NULL}, {{NULL}, NULL}, 0, {{NULL}, NULL}} #define eCAL_pb_ServiceHeader_init_zero {{{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, {{NULL}, NULL}, 0, _eCAL_pb_ServiceHeader_eCallState_MIN, {{NULL}, NULL}} #define eCAL_pb_Request_init_zero {false, eCAL_pb_ServiceHeader_init_zero, {{NULL}, NULL}} #define eCAL_pb_Response_init_zero {false, eCAL_pb_ServiceHeader_init_zero, {{NULL}, NULL}, 0} diff --git a/ecal/core/src/serialization/nanopb/topic.pb.h b/ecal/core/src/serialization/nanopb/topic.pb.h index 9bcc66ed79..c7eb1ffc22 100644 --- a/ecal/core/src/serialization/nanopb/topic.pb.h +++ b/ecal/core/src/serialization/nanopb/topic.pb.h @@ -17,7 +17,8 @@ typedef struct _eCAL_pb_DataTypeInformation { pb_callback_t desc; /* descriptor information of the datatype (necessary for reflection) */ } eCAL_pb_DataTypeInformation; -typedef struct _eCAL_pb_Topic { +typedef struct _eCAL_pb_Topic { /* Reserved fields in enums are not supported in protobuf 3.0 + reserved 9, 10, 11, 14, 15, 22 to 26, 29; */ int32_t rclock; /* registration clock (heart beat) */ pb_callback_t hname; /* host name */ int32_t pid; /* process id */ diff --git a/ecal/core_pb/src/ecal/core/pb/ecal.proto b/ecal/core_pb/src/ecal/core/pb/ecal.proto index 2b4605b192..b5c0044940 100644 --- a/ecal/core_pb/src/ecal/core/pb/ecal.proto +++ b/ecal/core_pb/src/ecal/core/pb/ecal.proto @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,8 @@ package eCAL.pb; message Content // topic content { - reserved 5; + // Reserved fields in enums are not supported in protobuf 3.0 + // reserved 5; int64 id = 1; // sample id int64 clock = 2; // internal used clock diff --git a/ecal/core_pb/src/ecal/core/pb/layer.proto b/ecal/core_pb/src/ecal/core/pb/layer.proto index dc45cd1b1a..6f15c4445b 100644 --- a/ecal/core_pb/src/ecal/core/pb/layer.proto +++ b/ecal/core_pb/src/ecal/core/pb/layer.proto @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,8 @@ message LayerParTcp message ConnnectionPar // connection parameter for reader / writer { - reserved 3; + // Reserved fields in enums are not supported in protobuf 3.0 + // reserved 3; LayerParUdpMC layer_par_udpmc = 1; // parameter for ecal udp multicast LayerParShm layer_par_shm = 2; // parameter for ecal shared memory @@ -62,10 +63,12 @@ enum eTLayerType // transport layer message TLayer { - reserved 4; + // Reserved fields in enums are not supported in protobuf 3.0 + // reserved 4; eTLayerType type = 1; // transport layer type int32 version = 2; // transport layer version - bool confirmed = 3; // transport layer used ? + bool enabled = 6; // transport layer enabled ? + bool confirmed = 3; // transport layer usage confirmed ? ConnnectionPar par_layer = 5; // transport layer parameter } diff --git a/ecal/core_pb/src/ecal/core/pb/process.proto b/ecal/core_pb/src/ecal/core/pb/process.proto index 014b51229f..e59d3dae77 100644 --- a/ecal/core_pb/src/ecal/core/pb/process.proto +++ b/ecal/core_pb/src/ecal/core/pb/process.proto @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,8 @@ enum eTSyncState // time synchronisatio message Process // process { - reserved 7 to 11; + // Reserved fields in enums are not supported in protobuf 3.0 + // reserved 7 to 11; int32 rclock = 1; // registration clock string hname = 2; // host name diff --git a/ecal/core_pb/src/ecal/core/pb/topic.proto b/ecal/core_pb/src/ecal/core/pb/topic.proto index 84d657bf15..0da480cf68 100644 --- a/ecal/core_pb/src/ecal/core/pb/topic.proto +++ b/ecal/core_pb/src/ecal/core/pb/topic.proto @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,7 +32,8 @@ message DataTypeInformation message Topic // eCAL topic { - reserved 9, 10, 11, 14, 15, 22 to 26, 29; + // Reserved fields in enums are not supported in protobuf 3.0 + // reserved 9, 10, 11, 14, 15, 22 to 26, 29; int32 rclock = 1; // registration clock (heart beat) string hname = 2; // host name diff --git a/ecal/samples/cpp/pubsub/protobuf/person_rec/src/person_rec.cpp b/ecal/samples/cpp/pubsub/protobuf/person_rec/src/person_rec.cpp index 9bb405b705..1d42320541 100644 --- a/ecal/samples/cpp/pubsub/protobuf/person_rec/src/person_rec.cpp +++ b/ecal/samples/cpp/pubsub/protobuf/person_rec/src/person_rec.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,8 +53,16 @@ int main(int argc, char **argv) // set process state eCAL::Process::SetState(proc_sev_healthy, proc_sev_level1, "I feel good !"); + // create a subscriber config + eCAL::Subscriber::Configuration sub_config; + + // activate transport layer + sub_config.shm.enable = true; + sub_config.udp.enable = true; + sub_config.tcp.enable = true; + // create a subscriber (topic name "person") - eCAL::protobuf::CSubscriber sub("person"); + eCAL::protobuf::CSubscriber sub("person", sub_config); // add receive callback function (_1 = topic_name, _2 = msg, _3 = time, _4 = clock, _5 = id) auto callback = std::bind(OnPerson, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); diff --git a/ecal/tests/cpp/serialization_test/src/registration_compare.cpp b/ecal/tests/cpp/serialization_test/src/registration_compare.cpp index f4eb4f8ecc..f0e7ac5512 100644 --- a/ecal/tests/cpp/serialization_test/src/registration_compare.cpp +++ b/ecal/tests/cpp/serialization_test/src/registration_compare.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -148,6 +148,7 @@ namespace eCAL // compare TLayer objects for equality return (layer1.type == layer2.type) && (layer1.version == layer2.version) && + (layer1.enabled == layer2.enabled) && (layer1.confirmed == layer2.confirmed) && CompareConnectionPar(layer1.par_layer, layer2.par_layer); }); diff --git a/ecal/tests/cpp/serialization_test/src/registration_generate.cpp b/ecal/tests/cpp/serialization_test/src/registration_generate.cpp index e29a95f6f2..c7510d0408 100644 --- a/ecal/tests/cpp/serialization_test/src/registration_generate.cpp +++ b/ecal/tests/cpp/serialization_test/src/registration_generate.cpp @@ -1,6 +1,6 @@ /* ========================= eCAL LICENSE ================================= * - * Copyright (C) 2016 - 2019 Continental Corporation + * Copyright (C) 2016 - 2024 Continental Corporation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,6 +95,7 @@ namespace eCAL TLayer layer; layer.type = static_cast(rand() % (tl_all + 1)); layer.version = rand() % 100; + layer.enabled = rand() % 2 == 1; layer.confirmed = rand() % 2 == 1; return layer; } From 18ac4e13654b8f8db4778adac3531188ab90babd Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Sun, 16 Jun 2024 12:52:09 +0200 Subject: [PATCH 2/5] some minor include fixes --- ecal/core/include/ecal/ecal_publisher.h | 4 ++-- ecal/core/src/ecal_global_accessors.cpp | 4 +++- ecal/core/src/readwrite/ecal_reader.h | 2 +- ecal/core/src/readwrite/ecal_writer.h | 4 ++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/ecal/core/include/ecal/ecal_publisher.h b/ecal/core/include/ecal/ecal_publisher.h index 08dcaa7bff..d564f2352e 100644 --- a/ecal/core/include/ecal/ecal_publisher.h +++ b/ecal/core/include/ecal/ecal_publisher.h @@ -24,16 +24,16 @@ #pragma once -#include #include #include #include #include -#include #include #include +#include #include +#include #include #include diff --git a/ecal/core/src/ecal_global_accessors.cpp b/ecal/core/src/ecal_global_accessors.cpp index eea548714e..e5353c7475 100644 --- a/ecal/core/src/ecal_global_accessors.cpp +++ b/ecal/core/src/ecal_global_accessors.cpp @@ -21,10 +21,12 @@ * @brief eCAL core functions **/ +#include "ecal/config/configuration.h" + #include "ecal_global_accessors.h" #include "ecal_def.h" #include "ecal_globals.h" -#include "ecal/config/configuration.h" + #include #include diff --git a/ecal/core/src/readwrite/ecal_reader.h b/ecal/core/src/readwrite/ecal_reader.h index d1d5c07ffc..70a9fbcc53 100644 --- a/ecal/core/src/readwrite/ecal_reader.h +++ b/ecal/core/src/readwrite/ecal_reader.h @@ -24,8 +24,8 @@ #pragma once #include -#include #include +#include #include "serialization/ecal_serialize_sample_payload.h" #include "serialization/ecal_serialize_sample_registration.h" diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index b5e357ee09..90f22451be 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -25,12 +25,12 @@ #include #include -#include #include #include +#include #include "util/ecal_expmap.h" -#include +#include "util/frequency_calculator.h" #if ECAL_CORE_TRANSPORT_UDP #include "udp/ecal_writer_udp.h" From 504e21e699f4b8b6813d94d4983ccf841c565742 Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Thu, 27 Jun 2024 21:43:08 +0200 Subject: [PATCH 3/5] new sample massive_pub_sub.cpp added --- ecal/samples/CMakeLists.txt | 1 + .../benchmarks/massive_pub_sub/CMakeLists.txt | 41 ++++++++ .../massive_pub_sub/src/massive_pub_sub.cpp | 99 +++++++++++++++++++ 3 files changed, 141 insertions(+) create mode 100644 ecal/samples/cpp/benchmarks/massive_pub_sub/CMakeLists.txt create mode 100644 ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp diff --git a/ecal/samples/CMakeLists.txt b/ecal/samples/CMakeLists.txt index 67f0754c91..eb751a33bd 100644 --- a/ecal/samples/CMakeLists.txt +++ b/ecal/samples/CMakeLists.txt @@ -66,6 +66,7 @@ endif() if(ECAL_CORE_PUBLISHER AND ECAL_CORE_SUBSCRIBER) add_subdirectory(cpp/benchmarks/perftool) + add_subdirectory(cpp/benchmarks/massive_pub_sub) endif() # misc diff --git a/ecal/samples/cpp/benchmarks/massive_pub_sub/CMakeLists.txt b/ecal/samples/cpp/benchmarks/massive_pub_sub/CMakeLists.txt new file mode 100644 index 0000000000..5c2f37e547 --- /dev/null +++ b/ecal/samples/cpp/benchmarks/massive_pub_sub/CMakeLists.txt @@ -0,0 +1,41 @@ +# ========================= eCAL LICENSE ================================= +# +# Copyright (C) 2016 - 2024 Continental Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ========================= eCAL LICENSE ================================= + +cmake_minimum_required(VERSION 3.10) + +set(CMAKE_FIND_PACKAGE_PREFER_CONFIG ON) + +project(massive_pub_sub) + +find_package(eCAL REQUIRED) + +set(massive_pub_sub_src + src/massive_pub_sub.cpp +) + +ecal_add_sample(${PROJECT_NAME} ${massive_pub_sub_src}) + +target_link_libraries(${PROJECT_NAME} + eCAL::core +) + +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) + +ecal_install_sample(${PROJECT_NAME}) + +set_property(TARGET ${PROJECT_NAME} PROPERTY FOLDER samples/cpp/benchmarks/massive_pub_sub) diff --git a/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp b/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp new file mode 100644 index 0000000000..f1c4ea3076 --- /dev/null +++ b/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp @@ -0,0 +1,99 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2024 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#include + +#include +#include +#include +#include +#include + +int main(int argc, char** argv) +{ + const int subscriber_number (5000); + const int publisher_number (5000); + const int in_between_sleep_sec (5); + const int final_sleep_sec (0); + + // initialize eCAL API + eCAL::Initialize(argc, argv, "massive_pub_sub"); + + eCAL::Util::EnableLoopback(true); + + // create subscriber + std::vector vector_of_subscriber; + std::cout << "Subscriber creation started. (" << subscriber_number << ")" << std::endl; + { + // start time measurement + auto start_time = std::chrono::high_resolution_clock::now(); + + for (int i = 0; i < subscriber_number; i++) + { + // publisher topic name + std::stringstream tname; + tname << "TOPIC_" << i; + + // create subscriber + vector_of_subscriber.emplace_back(tname.str()); + } + // stop time measurement + auto end_time = std::chrono::high_resolution_clock::now(); + + // calculate the duration + auto duration = std::chrono::duration_cast(end_time - start_time).count(); + std::cout << "Time taken for subscriber creation: " << duration << " milliseconds" << std::endl; + } + + // sleep for a few seconds + std::this_thread::sleep_for(std::chrono::seconds(in_between_sleep_sec)); + + // create publisher + std::vector vector_of_publisher; + std::cout << "Publisher creation started. (" << publisher_number << ")" << std::endl; + { + // start time measurement + auto start_time = std::chrono::high_resolution_clock::now(); + + for (int i = 0; i < publisher_number; i++) + { + // publisher topic name + std::stringstream tname; + tname << "TOPIC_" << i; + + // create publisher + vector_of_publisher.emplace_back(tname.str()); + } + // stop time measurement + auto end_time = std::chrono::high_resolution_clock::now(); + + // calculate the duration + auto duration = std::chrono::duration_cast(end_time - start_time).count(); + std::cout << "Time taken for publisher creation: " << duration << " milliseconds" << std::endl; + } + std::cout << std::endl; + + // sleep for a few seconds + std::this_thread::sleep_for(std::chrono::seconds(final_sleep_sec)); + + // finalize eCAL API + eCAL::Finalize(); + + return(0); +} From b5ca8c036bc740b03cef1135518e4bf7f88b00fd Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Thu, 4 Jul 2024 17:21:07 +0200 Subject: [PATCH 4/5] sample massive_pub_sub aligned with master --- .../massive_pub_sub/src/massive_pub_sub.cpp | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp b/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp index f1c4ea3076..7cf864a4e6 100644 --- a/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp +++ b/ecal/samples/cpp/benchmarks/massive_pub_sub/src/massive_pub_sub.cpp @@ -25,13 +25,37 @@ #include #include -int main(int argc, char** argv) +const int subscriber_number (5000); + +const int publisher_number (5000); +const int publisher_type_encoding_size_bytes (10*1024); +const int publisher_type_descriptor_size_bytes (10*1024); + +const int in_between_sleep_sec (5); +const int final_sleep_sec (120); + +std::string GenerateSizedString(const std::string& name, size_t totalSize) { - const int subscriber_number (5000); - const int publisher_number (5000); - const int in_between_sleep_sec (5); - const int final_sleep_sec (0); + if (name.empty() || totalSize == 0) { + return ""; + } + + std::string result; + result.reserve(totalSize); + + while (result.size() + name.size() <= totalSize) { + result += name; + } + if (result.size() < totalSize) { + result += name.substr(0, totalSize - result.size()); + } + + return result; +} + +int main(int argc, char** argv) +{ // initialize eCAL API eCAL::Initialize(argc, argv, "massive_pub_sub"); @@ -71,6 +95,11 @@ int main(int argc, char** argv) // start time measurement auto start_time = std::chrono::high_resolution_clock::now(); + eCAL::SDataTypeInformation data_type_info; + data_type_info.name = "TOPIC_TYPE_NAME"; + data_type_info.encoding = GenerateSizedString("TOPIC_TYPE_ENCODING", publisher_type_encoding_size_bytes); + data_type_info.descriptor = GenerateSizedString("TOPIC_TYPE_DESCRIPTOR", publisher_type_descriptor_size_bytes); + for (int i = 0; i < publisher_number; i++) { // publisher topic name @@ -78,7 +107,7 @@ int main(int argc, char** argv) tname << "TOPIC_" << i; // create publisher - vector_of_publisher.emplace_back(tname.str()); + vector_of_publisher.emplace_back(tname.str(), data_type_info); } // stop time measurement auto end_time = std::chrono::high_resolution_clock::now(); From c17e3a62ae7678dc6183d80659e46ec215afea3e Mon Sep 17 00:00:00 2001 From: rex-schilasky <49162693+rex-schilasky@users.noreply.github.com> Date: Wed, 10 Jul 2024 11:15:45 +0200 Subject: [PATCH 5/5] priority list (local, remote) moved into publisher configuration --- ecal/core/include/ecal/config/publisher.h | 16 +++++++++---- ecal/core/src/readwrite/ecal_writer.cpp | 29 +++++++---------------- ecal/core/src/readwrite/ecal_writer.h | 7 +----- 3 files changed, 20 insertions(+), 32 deletions(-) diff --git a/ecal/core/include/ecal/config/publisher.h b/ecal/core/include/ecal/config/publisher.h index 1af155c9b3..8ee38f228e 100644 --- a/ecal/core/include/ecal/config/publisher.h +++ b/ecal/core/include/ecal/config/publisher.h @@ -89,9 +89,11 @@ #pragma once #include +#include #include #include +#include namespace eCAL { @@ -133,12 +135,16 @@ namespace eCAL { ECAL_API Configuration(); - SHM::Configuration shm; - UDP::Configuration udp; - TCP::Configuration tcp; + SHM::Configuration shm; + UDP::Configuration udp; + TCP::Configuration tcp; - bool share_topic_type; //!< share topic type via registration - bool share_topic_description; //!< share topic description via registration + using LayerPriorityVector = std::vector; + LayerPriorityVector layer_priority_local = { TLayer::tlayer_shm, TLayer::tlayer_udp_mc, TLayer::tlayer_tcp }; + LayerPriorityVector layer_priority_remote = { TLayer::tlayer_udp_mc, TLayer::tlayer_tcp }; + + bool share_topic_type; //!< share topic type via registration + bool share_topic_description; //!< share topic description via registration }; } } diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index 7f9ff8c2cd..146d66243d 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -120,10 +120,6 @@ namespace eCAL const std::chrono::milliseconds registration_timeout(Config::GetRegistrationTimeoutMs()); m_sub_map.set_expiration(registration_timeout); - // set layer priority defaults - m_local_layer_priority = { tl_ecal_shm, tl_ecal_udp, tl_ecal_tcp }; - m_remote_layer_priority = { tl_ecal_udp, tl_ecal_tcp }; - // mark as created m_created = true; @@ -483,7 +479,7 @@ namespace eCAL // determine if we need to start a transport layer // if a new layer gets activated, we reregister for SHM and TCP to force the exchange of connection parameter // without this forced registration we would need one additional registration loop for these two layers to establish the connection - const eTLayerType layer2activate = DetermineTransportLayer2Start(pub_layers, sub_layers, m_host_name == subscription_info_.host_name); + const TLayer::eTransportLayer layer2activate = DetermineTransportLayer2Start(pub_layers, sub_layers, m_host_name == subscription_info_.host_name); switch (layer2activate) { case tl_ecal_udp: @@ -552,16 +548,6 @@ namespace eCAL #endif } - void CDataWriter::SetLocalLayerPriority(const std::vector& layer_priority_) - { - m_local_layer_priority = layer_priority_; - } - - void CDataWriter::SetRemoteLayerPriority(const std::vector& layer_priority_) - { - m_remote_layer_priority = layer_priority_; - } - void CDataWriter::RefreshRegistration() { if (!m_created) return; @@ -954,23 +940,24 @@ return(false); return is_internal_only; } - eTLayerType CDataWriter::DetermineTransportLayer2Start(const std::vector& pub_layer_, const std::vector& sub_layer_, bool same_host_) + TLayer::eTransportLayer CDataWriter::DetermineTransportLayer2Start(const std::vector& enabled_pub_layer_, const std::vector& enabled_sub_layer_, bool same_host_) { // determine the priority list to use - const std::vector& priorityList = same_host_ ? m_local_layer_priority : m_remote_layer_priority; + Publisher::Configuration::LayerPriorityVector& layer_priority_vector = same_host_ ? m_config.layer_priority_local : m_config.layer_priority_remote; // find the highest priority transport layer that is available in both publisher and subscriber options - for (const eTLayerType layer : priorityList) + // TODO: we need to fusion the two layer enum types (eTransportLayer) in ecal_tlayer.h and ecal_struct_sample_common.hf + for (const TLayer::eTransportLayer layer : layer_priority_vector) { - if (std::find(pub_layer_.begin(), pub_layer_.end(), layer) != pub_layer_.end() - && std::find(sub_layer_.begin(), sub_layer_.end(), layer) != sub_layer_.end()) + if (std::find(enabled_pub_layer_.begin(), enabled_pub_layer_.end(), layer) != enabled_pub_layer_.end() + && std::find(enabled_sub_layer_.begin(), enabled_sub_layer_.end(), layer) != enabled_sub_layer_.end()) { return layer; } } // return tl_none if no common transport layer is found - return tl_none; + return TLayer::eTransportLayer::tlayer_none; } int32_t CDataWriter::GetFrequency() diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index 4a8112f0fa..70611cf605 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -104,9 +104,6 @@ namespace eCAL void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_); void RemoveSubscription(const SSubscriptionInfo& subscription_info_); - void SetLocalLayerPriority(const std::vector& layer_priority_); - void SetRemoteLayerPriority(const std::vector& layer_priority_); - void RefreshRegistration(); void RefreshSendCounter(); @@ -145,7 +142,7 @@ namespace eCAL size_t PrepareWrite(long long id_, size_t len_); bool IsInternalSubscribedOnly(); - eTLayerType DetermineTransportLayer2Start(const std::vector& pub_layer_, const std::vector& sub_layer_, bool same_host_); + TLayer::eTransportLayer DetermineTransportLayer2Start(const std::vector& enabled_pub_layer_, const std::vector& enabled_sub_layer_, bool same_host_); int32_t GetFrequency(); @@ -188,8 +185,6 @@ namespace eCAL std::unique_ptr m_writer_tcp; #endif - std::vector m_local_layer_priority; - std::vector m_remote_layer_priority; SLayerStates m_layers; std::atomic m_created; };