diff --git a/ecal/core/src/io/ecal_memfile.cpp b/ecal/core/src/io/ecal_memfile.cpp index c74acab2c7..0b1c1d6701 100644 --- a/ecal/core/src/io/ecal_memfile.cpp +++ b/ecal/core/src/io/ecal_memfile.cpp @@ -314,7 +314,7 @@ namespace eCAL } } - size_t CMemoryFile::WritePayload(CPayloadWriter& payload_, const size_t len_, const size_t offset_) + size_t CMemoryFile::WritePayload(CPayloadWriter& payload_, const size_t len_, const size_t offset_, bool force_full_write_ /*= false*/) { if (!m_created) return(0); @@ -322,7 +322,7 @@ namespace eCAL if (GetWriteAddress(wbuf, len_ + offset_) != 0u) { // (re)write complete buffer - if (!m_payload_initialized) + if (!m_payload_initialized || force_full_write_) { bool const success = payload_.Write(static_cast(wbuf) + offset_, len_); if (!success) diff --git a/ecal/core/src/io/ecal_memfile.h b/ecal/core/src/io/ecal_memfile.h index 9084def8da..39514d3e2a 100644 --- a/ecal/core/src/io/ecal_memfile.h +++ b/ecal/core/src/io/ecal_memfile.h @@ -147,13 +147,14 @@ namespace eCAL /** * @brief Apply payload on the memory file. * - * @param payload_ The payload. - * @param len_ The number of bytes to write. - * @param offset_ The offset for writing the data. + * @param payload_ The payload. + * @param len_ The number of bytes to write. + * @param offset_ The offset for writing the data. + * @param force_full_write_ Force full write action. * - * @return Number of bytes access (len if succeeded otherwise zero). + * @return Number of bytes accessed (len if succeeded otherwise zero). **/ - size_t WritePayload(CPayloadWriter& payload_, size_t len_, size_t offset_); + size_t WritePayload(CPayloadWriter& payload_, size_t len_, size_t offset_, bool force_full_write_ = false); /** * @brief Maximum data size of the whole memory file. diff --git a/ecal/core/src/io/ecal_memfile_sync.cpp b/ecal/core/src/io/ecal_memfile_sync.cpp index 30d90d5afd..f9deaab2b6 100644 --- a/ecal/core/src/io/ecal_memfile_sync.cpp +++ b/ecal/core/src/io/ecal_memfile_sync.cpp @@ -129,7 +129,7 @@ namespace eCAL return false; } - bool CSyncMemoryFile::Write(CPayloadWriter& payload_, const SWriterAttr& data_) + bool CSyncMemoryFile::Write(CPayloadWriter& payload_, const SWriterAttr& data_, bool force_full_write_/* = false*/) { if (!m_created) { @@ -197,7 +197,7 @@ namespace eCAL // write the buffer if (data_.len > 0) { - written &= m_memfile.WritePayload(payload_, data_.len, wbytes) > 0; + written &= m_memfile.WritePayload(payload_, data_.len, wbytes, force_full_write_) > 0; } // release write access m_memfile.ReleaseWriteAccess(); diff --git a/ecal/core/src/io/ecal_memfile_sync.h b/ecal/core/src/io/ecal_memfile_sync.h index 05b610e8d4..05b60d553d 100644 --- a/ecal/core/src/io/ecal_memfile_sync.h +++ b/ecal/core/src/io/ecal_memfile_sync.h @@ -53,7 +53,7 @@ namespace eCAL bool Disconnect(const std::string& process_id_); bool CheckSize(size_t size_); - bool Write(CPayloadWriter& payload_, const SWriterAttr& data_); + bool Write(CPayloadWriter& payload_, const SWriterAttr& data_, bool force_full_write_ = false); std::string GetName() const; size_t GetSize() const; diff --git a/ecal/core/src/readwrite/ecal_writer_shm.cpp b/ecal/core/src/readwrite/ecal_writer_shm.cpp index 761993fe42..eefbb74227 100644 --- a/ecal/core/src/readwrite/ecal_writer_shm.cpp +++ b/ecal/core/src/readwrite/ecal_writer_shm.cpp @@ -103,12 +103,17 @@ namespace eCAL bool CDataWriterSHM::SetBufferCount(size_t buffer_count_) { + // no need to adapt anything + if (m_memory_file_vec.size() == buffer_count_) return true; + + // buffer count zero not allowed if (buffer_count_ < 1) { Logging::Log(log_level_error, m_topic_name + "::CDataWriterSHM::SetBufferCount minimal number of memory files is 1 !"); return false; } + // retrieve the memory file size of existing files size_t memory_file_size(0); if (!m_memory_file_vec.empty()) { @@ -119,37 +124,14 @@ namespace eCAL memory_file_size = m_memory_file_attr.min_size; } - // ---------------------------------------------------------------------- - // REMOVE ME IN ECAL6 - // ---------------------------------------------------------------------- - // recreate memory buffer list to stay compatible to older versions - // for the case that we have ONE existing buffer - // and that single buffer is communicated with an older shm datareader - // in this case we need to invalidate (destroy) the existing buffer - // and the old datareader will get blind (fail safe) - // otherwise it would still receive every n-th write - // this state change will lead to some lost samples - if ((m_memory_file_vec.size() == 1) && (m_memory_file_vec.size() < buffer_count_)) - { - m_memory_file_vec.clear(); - } - // ---------------------------------------------------------------------- - // REMOVE ME IN ECAL6 - // ---------------------------------------------------------------------- - - // increase buffer count + // recreate memory file vector + m_memory_file_vec.clear(); while (m_memory_file_vec.size() < buffer_count_) { auto sync_memfile = std::make_shared(m_memfile_base_name, memory_file_size, m_memory_file_attr); m_memory_file_vec.push_back(sync_memfile); } - // decrease buffer count - while (m_memory_file_vec.size() > buffer_count_) - { - m_memory_file_vec.pop_back(); - } - return true; } @@ -185,7 +167,8 @@ namespace eCAL if (!m_created) return false; // write content - const bool sent = m_memory_file_vec[m_write_idx]->Write(payload_, attr_); + const bool force_full_write(m_memory_file_vec.size() > 1); + const bool sent = m_memory_file_vec[m_write_idx]->Write(payload_, attr_, force_full_write); // and increment file index m_write_idx++; diff --git a/testing/ecal/pubsub_test/CMakeLists.txt b/testing/ecal/pubsub_test/CMakeLists.txt index d25728b63e..67523625d6 100644 --- a/testing/ecal/pubsub_test/CMakeLists.txt +++ b/testing/ecal/pubsub_test/CMakeLists.txt @@ -23,6 +23,7 @@ find_package(GTest REQUIRED) set(pubsub_test_src src/pubsub_gettopics.cpp + src/pubsub_multibuffer src/pubsub_test.cpp src/pubsub_receive_test.cpp ) diff --git a/testing/ecal/pubsub_test/src/pubsub_multibuffer.cpp b/testing/ecal/pubsub_test/src/pubsub_multibuffer.cpp new file mode 100644 index 0000000000..e6e0934c07 --- /dev/null +++ b/testing/ecal/pubsub_test/src/pubsub_multibuffer.cpp @@ -0,0 +1,266 @@ +/* ========================= eCAL LICENSE ================================= + * + * Copyright (C) 2016 - 2019 Continental Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ========================= eCAL LICENSE ================================= +*/ + +#include +#include + +#include +#include +#include + +#include + +#define CMN_REGISTRATION_REFRESH 1000 +#define DATA_FLOW_TIME 50 +#define PAYLOAD_SIZE 1024 + +// a binary payload object for testing +// full (Write) and partial (Update) writing +class CBinaryPayload : public eCAL::CPayloadWriter +{ +public: + CBinaryPayload(size_t size_) : size(size_) {} + + bool Write(void* buf_, size_t len_) override + { + // write complete content to the shared memory file + if (len_ < size) return false; + memset(buf_, 42, size); + return true; + }; + + bool Update(void* buf_, size_t len_) override + { + // update content of the shared memory file + if (len_ < size) return false; + const size_t write_idx(clock % len_); + const char write_chr(clock % 100); + static_cast(buf_)[write_idx] = write_chr; + clock++; + return true; + }; + + size_t GetSize() override { return size; }; + + void ResetClock() { clock = 0; }; + +private: + size_t size = 0; + int clock = 0; +}; + +TEST(IO, MultibufferPubSub) +{ + // create payload + CBinaryPayload binary_payload(PAYLOAD_SIZE); + + // initialize eCAL API + eCAL::Initialize(0, nullptr, "pubsub_test"); + + // publish / subscribe match in the same process + eCAL::Util::EnableLoopback(true); + + // create subscriber for topic "A" + eCAL::CSubscriber sub("A"); + + // create publisher for topic "A" + eCAL::CPublisher pub("A"); + pub.SetLayerMode(eCAL::TLayer::tlayer_all, eCAL::TLayer::smode_off); + pub.SetLayerMode(eCAL::TLayer::tlayer_shm, eCAL::TLayer::smode_on); + + std::atomic received_count{ 0 }; + std::atomic received_bytes{ 0 }; + std::vector received_content; + + // add callback + auto lambda = [&received_count, &received_bytes, &received_content](const char* /*topic_name_*/, const eCAL::SReceiveCallbackData* data_) { + received_bytes += data_->size; + ++received_count; + for (auto i = 0; i < 10; ++i) + { + const char rec_char(static_cast(data_->buf)[i]); + received_content.push_back(rec_char); + std::cout << std::setw(2) << std::setfill('0') << static_cast(rec_char) << " "; + } + std::cout << std::endl; + }; + EXPECT_EQ(true, sub.AddReceiveCallback(lambda)); + + const int iterations(11); + int rec_char_sum(0); + + ////////////////////////////////////////////////////////// + // one buffer, no zero copy / PARTIAL WRITING DISABLED + ////////////////////////////////////////////////////////// + // expected output: + // 42 42 42 42 42 42 42 42 42 42 < full initial write + // 42 42 42 42 42 42 42 42 42 42 < full write + // 42 42 42 42 42 42 42 42 42 42 < .. + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + + std::cout << std::endl << "Buffer = 1, Zero Copy Off -> partial writing disabled" << std::endl; + binary_payload.ResetClock(); + received_content.clear(); + pub.ShmSetBufferCount(1); + pub.ShmEnableZeroCopy(false); + + // let's match them + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); + + // run 11 iterations (11 full writes) + for (int i = 0; i < iterations; ++i) + { + EXPECT_EQ(PAYLOAD_SIZE, pub.Send(binary_payload)); + eCAL::Process::SleepMS(DATA_FLOW_TIME); + } + std::cout << std::endl; + + // check receive content + // one buffer, no zero copy + // we expect 11 full writes == 11 * 10 * 42 == 4620 + rec_char_sum = std::accumulate(received_content.begin(), received_content.end(), 0); + EXPECT_EQ(11 * 10 * 42, rec_char_sum); + + ////////////////////////////////////////////////////////// + // two buffer, no zero copy / PARTIAL WRITING DISABLED + ////////////////////////////////////////////////////////// + // expected output: + // 42 42 42 42 42 42 42 42 42 42 < full initial write + // 42 42 42 42 42 42 42 42 42 42 < full write + // 42 42 42 42 42 42 42 42 42 42 < .. + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + + std::cout << std::endl << "Buffer = 2, Zero Copy Off -> partial writing disabled" << std::endl; + binary_payload.ResetClock(); + received_content.clear(); + pub.ShmSetBufferCount(2); + pub.ShmEnableZeroCopy(false); + + // let's match them + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); + + // run 11 iterations (11 full writes) + for (int i = 0; i < iterations; ++i) + { + EXPECT_EQ(PAYLOAD_SIZE, pub.Send(binary_payload)); + eCAL::Process::SleepMS(DATA_FLOW_TIME); + } + std::cout << std::endl; + + // check receive content + // two buffer, no zero copy + // we expect 11 full writes == 11 * 10 * 42 == 4620 + rec_char_sum = std::accumulate(received_content.begin(), received_content.end(), 0); + EXPECT_EQ(11 * 10 * 42, rec_char_sum); + + ////////////////////////////////////////////////////////// + // two buffer, zero copy on / PARTIAL WRITING DISABLED + ////////////////////////////////////////////////////////// + // expected output: + // 42 42 42 42 42 42 42 42 42 42 < full initial write + // 42 42 42 42 42 42 42 42 42 42 < full write + // 42 42 42 42 42 42 42 42 42 42 < .. + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + // 42 42 42 42 42 42 42 42 42 42 + + std::cout << std::endl << "Buffer = 2, Zero Copy On -> partial writing disabled" << std::endl; + binary_payload.ResetClock(); + received_content.clear(); + pub.ShmSetBufferCount(2); + pub.ShmEnableZeroCopy(true); + + // let's match them + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); + + // run 11 iterations (11 full writes) + for (int i = 0; i < iterations; ++i) + { + EXPECT_EQ(PAYLOAD_SIZE, pub.Send(binary_payload)); + eCAL::Process::SleepMS(DATA_FLOW_TIME); + } + std::cout << std::endl; + + // check receive content + // two buffer, zero copy on + // we expect 11 full writes == 11 * 10 * 42 == 4620 + rec_char_sum = std::accumulate(received_content.begin(), received_content.end(), 0); + EXPECT_EQ(11 * 10 * 42, rec_char_sum); + + ////////////////////////////////////////////////////////// + // one buffer, zero copy on / PARTIAL WRITING ENABLED + ////////////////////////////////////////////////////////// + // expected output: + // 42 42 42 42 42 42 42 42 42 42 < full initial write + // 00 42 42 42 42 42 42 42 42 42 < 1. partial write + // 00 01 42 42 42 42 42 42 42 42 < 2. partial write + // 00 01 02 42 42 42 42 42 42 42 < .. + // 00 01 02 03 42 42 42 42 42 42 + // 00 01 02 03 04 42 42 42 42 42 + // 00 01 02 03 04 05 42 42 42 42 + // 00 01 02 03 04 05 06 42 42 42 + // 00 01 02 03 04 05 06 07 42 42 + // 00 01 02 03 04 05 06 07 08 42 + // 00 01 02 03 04 05 06 07 08 09 + + std::cout << std::endl << "Buffer = 1, Zero Copy On -> partial writing enabled" << std::endl; + binary_payload.ResetClock(); + received_content.clear(); + pub.ShmSetBufferCount(1); + pub.ShmEnableZeroCopy(true); + + // let's match them + eCAL::Process::SleepMS(2 * CMN_REGISTRATION_REFRESH); + + // run 11 iterations (1 full write, 10 updates) + for (int i = 0; i < iterations; ++i) + { + EXPECT_EQ(PAYLOAD_SIZE, pub.Send(binary_payload)); + eCAL::Process::SleepMS(DATA_FLOW_TIME); + } + std::cout << std::endl; + + // check receive content + // one buffer, zero copy on + // we expect 1 full write + 10 updates == 2475 + rec_char_sum = std::accumulate(received_content.begin(), received_content.end(), 0); + EXPECT_EQ(2475, rec_char_sum); + + // finalize eCAL API + eCAL::Finalize(); +}