diff --git a/ecal/core/src/readwrite/ecal_writer.cpp b/ecal/core/src/readwrite/ecal_writer.cpp index 608d9ebf22..abb265ed28 100644 --- a/ecal/core/src/readwrite/ecal_writer.cpp +++ b/ecal/core/src/readwrite/ecal_writer.cpp @@ -99,10 +99,8 @@ namespace eCAL // register Register(false); - // enable transport layers - EnableUdpLayer(config_.udp.enable); - EnableShmLayer(config_.shm.enable); - EnableTcpLayer(config_.tcp.enable); + // start udp, shm, tcp layer + StartTransportLayer(); } CDataWriter::~CDataWriter() @@ -123,20 +121,8 @@ namespace eCAL Logging::Log(log_level_debug1, m_topic_name + "::CDataWriter::Stop"); #endif - // destroy udp multicast writer -#if ECAL_CORE_TRANSPORT_UDP - m_writer_udp.reset(); -#endif - - // destroy memory file writer -#if ECAL_CORE_TRANSPORT_SHM - m_writer_shm.reset(); -#endif - - // destroy tcp writer -#if ECAL_CORE_TRANSPORT_TCP - m_writer_tcp.reset(); -#endif + // stop udp, shm, tcp layer + StopTransportLayer(); // clear subscriber maps { @@ -259,10 +245,10 @@ namespace eCAL // can we do a zero copy write ? const bool allow_zero_copy = - m_config.shm.zero_copy_mode // zero copy mode activated by user - && m_config.shm.enable // shm layer active - && !m_config.udp.enable // udp layer inactive - && !m_config.tcp.enable; // tcp layer inactive + m_config.shm.zero_copy_mode // zero copy mode activated by user + && m_writer_shm // shm layer active + && !m_writer_udp // udp layer inactive + && !m_writer_tcp; // tcp layer inactive // create a payload copy for all layer if (!allow_zero_copy) @@ -281,7 +267,7 @@ namespace eCAL // SHM //////////////////////////////////////////////////////////////////////////// #if ECAL_CORE_TRANSPORT_SHM - if (m_writer_shm && m_config.shm.enable) + if (m_writer_shm) { #ifndef NDEBUG // log it @@ -346,7 +332,7 @@ namespace eCAL // UDP (MC) //////////////////////////////////////////////////////////////////////////// #if ECAL_CORE_TRANSPORT_UDP - if (m_writer_udp && m_config.udp.enable) + if (m_writer_udp) { #ifndef NDEBUG // log it @@ -397,7 +383,7 @@ namespace eCAL // TCP //////////////////////////////////////////////////////////////////////////// #if ECAL_CORE_TRANSPORT_TCP - if (m_writer_tcp && m_config.tcp.enable) + if (m_writer_tcp) { #ifndef NDEBUG // log it @@ -736,79 +722,88 @@ namespace eCAL } } - void CDataWriter::EnableUdpLayer(bool state_) + void CDataWriter::StartTransportLayer() { #if ECAL_CORE_TRANSPORT_UDP - m_config.udp.enable = state_; - if (!m_created) return; - - // log state - LogLayerState(state_, "CDataWriter::ActivateUdpLayer::UDP_SENDMODE"); - - if (state_) + if (m_config.udp.enable) { - m_writer_udp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.udp); -#ifndef NDEBUG - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::UDP_WRITER"); + ActivateUdpLayer(); + } #endif +#if ECAL_CORE_TRANSPORT_SHM + if (m_config.shm.enable) + { + ActivateShmLayer(); } - else +#endif +#if ECAL_CORE_TRANSPORT_TCP + if (m_config.tcp.enable) { - m_writer_udp.reset(); + ActivateTcpLayer(); } -#else // ECAL_CORE_TRANSPORT_UDP - (void)state_; -#endif // ECAL_CORE_TRANSPORT_UDP +#endif } - void CDataWriter::EnableShmLayer(bool state_) + void CDataWriter::StopTransportLayer() { + // destroy udp writer +#if ECAL_CORE_TRANSPORT_UDP + m_writer_udp.reset(); +#endif + + // destroy shm writer #if ECAL_CORE_TRANSPORT_SHM - m_config.shm.enable = state_; - if (!m_created) return; + m_writer_shm.reset(); +#endif + // destroy tcp writer +#if ECAL_CORE_TRANSPORT_TCP + m_writer_tcp.reset(); +#endif + } + + void CDataWriter::ActivateUdpLayer() + { +#if ECAL_CORE_TRANSPORT_UDP // log state - LogLayerState(state_, "CDataWriter::ActivateShmLayer::SHM_SENDMODE"); + Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::ACTIVATED"); + + // create writer + m_writer_udp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.udp); - if (state_) - { - m_writer_shm = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.shm); - m_writer_shm->SetBufferCount(m_config.shm.memfile_buffer_count); #ifndef NDEBUG - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::SHM_WRITER"); + Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateUdpLayer::WRITER_CREATED"); +#endif +#endif // ECAL_CORE_TRANSPORT_UDP + } + + void CDataWriter::ActivateShmLayer() + { +#if ECAL_CORE_TRANSPORT_SHM + // log state + Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::ACTIVATED"); + + // create writer + m_writer_shm = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.shm); + +#ifndef NDEBUG + Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateShmLayer::WRITER_CREATED"); #endif - } - else - { - m_writer_shm.reset(); - } -#else // ECAL_CORE_TRANSPORT_SHM - (void)state_; #endif // ECAL_CORE_TRANSPORT_SHM } - void CDataWriter::EnableTcpLayer(bool state_) + void CDataWriter::ActivateTcpLayer() { #if ECAL_CORE_TRANSPORT_TCP - m_config.tcp.enable = state_; - if (!m_created) return; - // log state - LogLayerState(state_, "CDataWriter::ActivateTcpLayer::TCP_SENDMODE"); + Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::ACTIVATED"); + + // create writer + m_writer_tcp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.tcp); - if (state_) - { - m_writer_tcp = std::make_unique(m_host_name, m_topic_name, m_topic_id, m_config.tcp); #ifndef NDEBUG - Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::TCP_WRITER - SUCCESS"); + Logging::Log(log_level_debug4, m_topic_name + "::CDataWriter::ActivateTcpLayer::WRITER_CREATED"); #endif - } - else - { - m_writer_tcp.reset(); - } -#else // ECAL_CORE_TRANSPORT_TCP - (void)state_; #endif // ECAL_CORE_TRANSPORT_TCP } @@ -847,23 +842,6 @@ namespace eCAL return is_internal_only; } - void CDataWriter::LogLayerState(bool state_, const std::string& base_msg_) - { -#ifndef NDEBUG - if (state_) - { - Logging::Log(log_level_debug4, m_topic_name + "::" + base_msg_ + "::ON"); - } - else - { - Logging::Log(log_level_debug4, m_topic_name + "::" + base_msg_ + "::OFF"); - } -#else - (void)state_; - (void)base_msg_; -#endif - } - int32_t CDataWriter::GetFrequency() { const auto frequency_time = std::chrono::steady_clock::now(); diff --git a/ecal/core/src/readwrite/ecal_writer.h b/ecal/core/src/readwrite/ecal_writer.h index 6356f322e5..019c06fd26 100644 --- a/ecal/core/src/readwrite/ecal_writer.h +++ b/ecal/core/src/readwrite/ecal_writer.h @@ -125,13 +125,15 @@ namespace eCAL void Connect(const std::string& tid_, const SDataTypeInformation& tinfo_); void Disconnect(); - void EnableUdpLayer(bool state_); - void EnableShmLayer(bool state_); - void EnableTcpLayer(bool state_); + void StartTransportLayer(); + void StopTransportLayer(); + + void ActivateUdpLayer(); + void ActivateShmLayer(); + void ActivateTcpLayer(); size_t PrepareWrite(long long id_, size_t len_); bool IsInternalSubscribedOnly(); - void LogLayerState(bool state_, const std::string& base_msg_); int32_t GetFrequency(); diff --git a/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp b/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp index a866e323e5..43d1b56fe7 100644 --- a/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp +++ b/ecal/core/src/readwrite/shm/ecal_writer_shm.cpp @@ -56,56 +56,6 @@ namespace eCAL return info_; } - - bool CDataWriterSHM::SetBufferCount(size_t buffer_count_) - { - // no need to adapt anything - if (m_memory_file_vec.size() == buffer_count_) return true; - - // buffer count zero not allowed - if (buffer_count_ < 1) - { - Logging::Log(log_level_error, m_topic_name + "::CDataWriterSHM::SetBufferCount minimal number of memory files is 1 !"); - return false; - } - - // prepare memfile attributes - SSyncMemoryFileAttr memory_file_attr = {}; - memory_file_attr.min_size = m_config.memfile_min_size_bytes; - memory_file_attr.reserve = m_config.memfile_reserve_percent; - memory_file_attr.timeout_open_ms = PUB_MEMFILE_OPEN_TO; - memory_file_attr.timeout_ack_ms = m_config.acknowledge_timeout_ms; - - // retrieve the memory file size of existing files - size_t memory_file_size(0); - if (!m_memory_file_vec.empty()) - { - memory_file_size = m_memory_file_vec[0]->GetSize(); - } - else - { - memory_file_size = memory_file_attr.min_size; - } - - // create memory file vector - m_memory_file_vec.clear(); - while (m_memory_file_vec.size() < buffer_count_) - { - auto sync_memfile = std::make_shared(m_memfile_base_name, memory_file_size, memory_file_attr); - if (sync_memfile->IsCreated()) - { - m_memory_file_vec.push_back(sync_memfile); - } - else - { - m_memory_file_vec.clear(); - Logging::Log(log_level_error, "CDataWriterSHM::SetBufferCount - FAILED"); - return false; - } - } - - return true; - } bool CDataWriterSHM::PrepareWrite(const SWriterAttr& attr_) { @@ -158,4 +108,54 @@ namespace eCAL } return connection_par; } + + bool CDataWriterSHM::SetBufferCount(size_t buffer_count_) + { + // no need to adapt anything + if (m_memory_file_vec.size() == buffer_count_) return true; + + // buffer count zero not allowed + if (buffer_count_ < 1) + { + Logging::Log(log_level_error, m_topic_name + "::CDataWriterSHM::SetBufferCount minimal number of memory files is 1 !"); + return false; + } + + // prepare memfile attributes + SSyncMemoryFileAttr memory_file_attr = {}; + memory_file_attr.min_size = m_config.memfile_min_size_bytes; + memory_file_attr.reserve = m_config.memfile_reserve_percent; + memory_file_attr.timeout_open_ms = PUB_MEMFILE_OPEN_TO; + memory_file_attr.timeout_ack_ms = m_config.acknowledge_timeout_ms; + + // retrieve the memory file size of existing files + size_t memory_file_size(0); + if (!m_memory_file_vec.empty()) + { + memory_file_size = m_memory_file_vec[0]->GetSize(); + } + else + { + memory_file_size = memory_file_attr.min_size; + } + + // create memory file vector + m_memory_file_vec.clear(); + while (m_memory_file_vec.size() < buffer_count_) + { + auto sync_memfile = std::make_shared(m_memfile_base_name, memory_file_size, memory_file_attr); + if (sync_memfile->IsCreated()) + { + m_memory_file_vec.push_back(sync_memfile); + } + else + { + m_memory_file_vec.clear(); + Logging::Log(log_level_error, "CDataWriterSHM::SetBufferCount - FAILED"); + return false; + } + } + + return true; + } } diff --git a/ecal/core/src/readwrite/shm/ecal_writer_shm.h b/ecal/core/src/readwrite/shm/ecal_writer_shm.h index d49799a2f1..01517db79a 100644 --- a/ecal/core/src/readwrite/shm/ecal_writer_shm.h +++ b/ecal/core/src/readwrite/shm/ecal_writer_shm.h @@ -42,8 +42,6 @@ namespace eCAL SWriterInfo GetInfo() override; - bool SetBufferCount(size_t buffer_count_); - bool PrepareWrite(const SWriterAttr& attr_) override; bool Write(CPayloadWriter& payload_, const SWriterAttr& attr_) override; @@ -53,6 +51,8 @@ namespace eCAL Registration::ConnectionPar GetConnectionParameter() override; protected: + bool SetBufferCount(size_t buffer_count_); + Publisher::SHM::Configuration m_config; size_t m_write_idx = 0;