Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disable zero copy partial write combined with multibuffering (#1163) #1168

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ecal/core/src/io/ecal_memfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,15 @@ namespace eCAL
}
}

size_t CMemoryFile::WritePayload(CPayloadWriter& payload_, const size_t len_, const size_t offset_)
size_t CMemoryFile::WritePayload(CPayloadWriter& payload_, const size_t len_, const size_t offset_, bool force_full_write_ /*= false*/)
{
if (!m_created) return(0);

void* wbuf(nullptr);
if (GetWriteAddress(wbuf, len_ + offset_) != 0u)
{
// (re)write complete buffer
if (!m_payload_initialized)
if (!m_payload_initialized || force_full_write_)
{
bool const success = payload_.Write(static_cast<char*>(wbuf) + offset_, len_);
if (!success)
Expand Down
11 changes: 6 additions & 5 deletions ecal/core/src/io/ecal_memfile.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ namespace eCAL
/**
* @brief Apply payload on the memory file.
*
* @param payload_ The payload.
* @param len_ The number of bytes to write.
* @param offset_ The offset for writing the data.
* @param payload_ The payload.
* @param len_ The number of bytes to write.
* @param offset_ The offset for writing the data.
* @param force_full_write_ Force full write action.
*
* @return Number of bytes access (len if succeeded otherwise zero).
* @return Number of bytes accessed (len if succeeded otherwise zero).
**/
size_t WritePayload(CPayloadWriter& payload_, size_t len_, size_t offset_);
size_t WritePayload(CPayloadWriter& payload_, size_t len_, size_t offset_, bool force_full_write_ = false);

/**
* @brief Maximum data size of the whole memory file.
Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/io/ecal_memfile_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ namespace eCAL
return false;
}

bool CSyncMemoryFile::Write(CPayloadWriter& payload_, const SWriterAttr& data_)
bool CSyncMemoryFile::Write(CPayloadWriter& payload_, const SWriterAttr& data_, bool force_full_write_/* = false*/)
{
if (!m_created)
{
Expand Down Expand Up @@ -197,7 +197,7 @@ namespace eCAL
// write the buffer
if (data_.len > 0)
{
written &= m_memfile.WritePayload(payload_, data_.len, wbytes) > 0;
written &= m_memfile.WritePayload(payload_, data_.len, wbytes, force_full_write_) > 0;
}
// release write access
m_memfile.ReleaseWriteAccess();
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/io/ecal_memfile_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace eCAL
bool Disconnect(const std::string& process_id_);

bool CheckSize(size_t size_);
bool Write(CPayloadWriter& payload_, const SWriterAttr& data_);
bool Write(CPayloadWriter& payload_, const SWriterAttr& data_, bool force_full_write_ = false);

std::string GetName() const;
size_t GetSize() const;
Expand Down
35 changes: 9 additions & 26 deletions ecal/core/src/readwrite/ecal_writer_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,17 @@ namespace eCAL

bool CDataWriterSHM::SetBufferCount(size_t buffer_count_)
{
// no need to adapt anything
if (m_memory_file_vec.size() == buffer_count_) return true;

// buffer count zero not allowed
if (buffer_count_ < 1)
{
Logging::Log(log_level_error, m_topic_name + "::CDataWriterSHM::SetBufferCount minimal number of memory files is 1 !");
return false;
}

// retrieve the memory file size of existing files
size_t memory_file_size(0);
if (!m_memory_file_vec.empty())
{
Expand All @@ -119,37 +124,14 @@ namespace eCAL
memory_file_size = m_memory_file_attr.min_size;
}

// ----------------------------------------------------------------------
// REMOVE ME IN ECAL6
// ----------------------------------------------------------------------
// recreate memory buffer list to stay compatible to older versions
// for the case that we have ONE existing buffer
// and that single buffer is communicated with an older shm datareader
// in this case we need to invalidate (destroy) the existing buffer
// and the old datareader will get blind (fail safe)
// otherwise it would still receive every n-th write
// this state change will lead to some lost samples
if ((m_memory_file_vec.size() == 1) && (m_memory_file_vec.size() < buffer_count_))
{
m_memory_file_vec.clear();
}
// ----------------------------------------------------------------------
// REMOVE ME IN ECAL6
// ----------------------------------------------------------------------

// increase buffer count
// recreate memory file vector
m_memory_file_vec.clear();
while (m_memory_file_vec.size() < buffer_count_)
{
auto sync_memfile = std::make_shared<CSyncMemoryFile>(m_memfile_base_name, memory_file_size, m_memory_file_attr);
m_memory_file_vec.push_back(sync_memfile);
}

// decrease buffer count
while (m_memory_file_vec.size() > buffer_count_)
{
m_memory_file_vec.pop_back();
}

return true;
}

Expand Down Expand Up @@ -185,7 +167,8 @@ namespace eCAL
if (!m_created) return false;

// write content
const bool sent = m_memory_file_vec[m_write_idx]->Write(payload_, attr_);
const bool force_full_write(m_memory_file_vec.size() > 1);
const bool sent = m_memory_file_vec[m_write_idx]->Write(payload_, attr_, force_full_write);

// and increment file index
m_write_idx++;
Expand Down
1 change: 1 addition & 0 deletions testing/ecal/pubsub_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ find_package(GTest REQUIRED)

set(pubsub_test_src
src/pubsub_gettopics.cpp
src/pubsub_multibuffer
src/pubsub_test.cpp
src/pubsub_receive_test.cpp
)
Expand Down
Loading
Loading