Skip to content

Commit

Permalink
core: CPayloadWriter methods renamed Write -> WriteFull, Update -> Wr…
Browse files Browse the repository at this point in the history
…iteModified

doc: documentation of CPublisher::ShmEnableZeroCopy updated
  • Loading branch information
rex-schilasky committed Aug 1, 2023
1 parent 625f549 commit 6f8e730
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 47 deletions.
35 changes: 22 additions & 13 deletions ecal/core/include/ecal/ecal_payload_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ namespace eCAL
* @brief Base payload writer class to allow zero copy memory operations.
*
* This class serves as the base class for payload writers, allowing zero-copy memory
* operations. The `Write` and `Update` calls may operate on the target memory file
* directly in zero-copy mode.
* operations. The `WriteFull` and `WriteModified` calls may operate on the target
* memory file directly in zero-copy mode.
*
* A partial writing / modification of the memory file is only possible when zero-copy mode
* is activated. If zero-copy is not enabled, the `WriteModified` method is ignored and the
* `WriteFull` method is always executed (see CPublisher::ShmEnableZeroCopy)
*
*/
class CPayloadWriter
{
Expand Down Expand Up @@ -69,32 +74,36 @@ namespace eCAL
CPayloadWriter& operator=(CPayloadWriter&&) = default;

/**
* @brief Perform a full write operation with uninitialized memory.
* @brief Perform a full write operation on uninitialized memory.
*
* This virtual function allows derived classes to perform a full write operation
* when the provisioned memory is uninitialized.
* when the provisioned memory is uninitialized. Typically, this is the case when a
* memory file had to be recreated or its size had to be changed.
*
* @param buffer_ Pointer to the buffer containing the data to be written.
* @param size_ Size of the data to be written.
*
* @return True if the write operation is successful, false otherwise.
*/
virtual bool Write(void* buffer_, size_t size_) = 0;
virtual bool WriteFull(void* buffer_, size_t size_) = 0;

/**
* @brief Perform a partial write operation or modify existing data.
* @brief Perform a partial write operation to modify existing data.
*
* This virtual function allows derived classes to perform a partial write operation
* or modify existing data when the provisioned memory is already initialized and
* contains the data from the last write operation. By default, this operation will
* just call the `Write` function.
* This virtual function allows derived classes to modify existing data when the provisioned
* memory is already initialized by a WriteFull call (i.e. contains the data from that full write operation).
*
* @param buffer_ Pointer to the buffer containing the data to be written or modified.
* @param size_ Size of the data to be written or modified.
* The memory can be partially modified and does not have to be completely rewritten, which leads to significantly
* higher performance (lower latency).
*
* If not implemented (by default), this operation will just call the `WriteFull` function.
*
* @param buffer_ Pointer to the buffer containing the data to be modified.
* @param size_ Size of the data to be modified.
*
* @return True if the write/update operation is successful, false otherwise.
*/
virtual bool Update(void* buffer_, size_t size_) { return Write(buffer_, size_); };
virtual bool WriteModified(void* buffer_, size_t size_) { return WriteFull(buffer_, size_); };

/**
* @brief Get the size of the required memory.
Expand Down
25 changes: 15 additions & 10 deletions ecal/core/include/ecal/ecal_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,25 +287,30 @@ namespace eCAL
/**
* @brief Enable zero copy shared memory transport mode.
*
* By default, the built-in shared memory layer is configured to make one memory copy
* on the receiver side. That means the payload is copied by the internal eCAL memory pool manager
* out of the memory file and the file is closed immediately after this.
* The intention of this implementation is to free the file as fast as possible after reading
* its content to allow other subscribing processes to access the content with minimal latency.
* The different reading subscribers are fully decoupled and can access their memory copy
* independently.
* By default, the built-in shared memory layer is configured to make two memory copies
* one on the publisher and one on the subscriber side.
*
* If ShmEnableZeroCopy is switched on no memory will be copied at all. The user message callback is
* called right after opening the memory file. A direct pointer to the memory payload is forwarded
* The intention of this implementation is to free the file as fast as possible after writing and reading
* its content to allow other processes to access the content with minimal latency. The publisher and subscribers
* are fully decoupled and can access their internal memory copy independently.
*
* If ShmEnableZeroCopy is switched on no memory will be copied at all using the low level binary publish / subscribe API.
* On publisher side the memory copy is exectuted into the opened memory file. On the subscriber side the user message
* callback is called right after opening the memory file. A direct pointer to the memory payload is forwarded
* and can be processed with no latency. The memory file will be closed after the user callback function
* returned. The advantage of this configuration is a much higher performance for large payloads (> 1024 kB).
* returned.
*
* The advantage of this configuration is a much higher performance for large payloads (> 1024 kB).
* The disadvantage of this configuration is that in the time when the callback is executed the memory file
* is blocked for other subscribers and for writing publishers too. Maybe this can be eliminated
* by a better memory file read/write access implementation (lock free read) in future releases.
*
* Today, for specific scenarios (1:1 pub/sub connections with large payloads for example) this feature
* can increase the performance remarkable. But please keep in mind to return from the message callback function
* as fast as possible to not delay subsequent read/write access operations.
*
* By using the eCAL::CPayloadWriter API a full zero copy implementation is possible by providing separate methods
* for the initialization and the modification of the memory file content (see CPayloadWriter documentation).
*
* @param state_ Set type zero copy mode for shared memory transport layer (true == zero copy enabled).
*
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/include/ecal/msg/capnproto/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace eCAL
CPayload& operator=(const CPayload&) = delete;
CPayload& operator=(CPayload&&) noexcept = delete;

bool Write(void* buf_, size_t len_) override
bool WriteFull(void* buf_, size_t len_) override
{
kj::Array<capnp::word> words = capnp::messageToFlatArray(const_cast<capnp::MallocMessageBuilder&>(message_builder));
kj::ArrayPtr<kj::byte> bytes = words.asBytes();
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/include/ecal/msg/protobuf/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace eCAL
CPayload& operator=(const CPayload&) = delete;
CPayload& operator=(CPayload&&) noexcept = delete;

bool Write(void* buf_, size_t len_) override
bool WriteFull(void* buf_, size_t len_) override
{
return message.SerializeToArray(buf_, static_cast<int>(len_));
}
Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/ecal_buffer_payload_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace eCAL
*
* @return True if the copy operation is successful, false otherwise.
*/
bool Write(void* buffer_, size_t size_) override
bool WriteFull(void* buffer_, size_t size_) override
{
if (buffer_ == nullptr) return false;
if (size_ < m_size) return false;
Expand Down
8 changes: 4 additions & 4 deletions ecal/core/src/io/ecal_memfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,10 @@ namespace eCAL
// (re)write complete buffer
if (!m_payload_initialized || force_full_write_)
{
bool const success = payload_.Write(static_cast<char*>(wbuf) + offset_, len_);
bool const success = payload_.WriteFull(static_cast<char *>(wbuf) + offset_, len_);
if (!success)
{
printf("Could not write payload content to the memory file (CPayload::Write returned false): %s.\n\n", m_name.c_str());
printf("Could not write payload content to the memory file (CPayload::WriteFull returned false): %s.\n\n", m_name.c_str());
}
else
{
Expand All @@ -337,10 +337,10 @@ namespace eCAL
else
{
// apply update to write buffer
bool const success = payload_.Update(static_cast<char*>(wbuf) + offset_, len_);
bool const success = payload_.WriteModified(static_cast<char *>(wbuf) + offset_, len_);
if (!success)
{
printf("Could not write payload content to the memory file (CPayload::Update returned false): %s.\n\n", m_name.c_str());
printf("Could not write payload content to the memory file (CPayload::WriteModified returned false): %s.\n\n", m_name.c_str());
}
}

Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ namespace eCAL
if (!allow_zero_copy)
{
m_payload_buffer.resize(payload_buf_size);
payload_.Write(m_payload_buffer.data(), m_payload_buffer.size());
payload_.WriteFull(m_payload_buffer.data(), m_payload_buffer.size());
}

// prepare counter and internal states
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ class CBinaryPayload : public eCAL::CPayloadWriter
public:
CBinaryPayload(size_t size_) : size(size_) {}

bool Write(void* buf_, size_t len_) override
bool WriteFull(void* buf_, size_t len_) override
{
// write complete content to the shared memory file
if (len_ < size) return false;
memset(buf_, 42, size);
return true;
};

bool Update(void* buf_, size_t len_) override
bool WriteModified(void* buf_, size_t len_) override
{
// update content of the shared memory file
if (len_ < size) return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ class CBinaryPayload : public eCAL::CPayloadWriter
public:
CBinaryPayload(size_t size_) : size(size_) {}

bool Write(void* buf_, size_t len_) override
bool WriteFull(void* buf_, size_t len_) override
{
// write complete content to the shared memory file
if (len_ < size) return false;
memset(buf_, 42, size);
return true;
};

bool Update(void* buf_, size_t len_) override
bool WriteModified(void* buf_, size_t len_) override
{
// update content of the shared memory file
if (len_ < size) return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ class CBinaryPayload : public eCAL::CPayloadWriter
public:
CBinaryPayload(size_t size_) : size(size_) {}

bool Write(void* buf_, size_t len_) override
bool WriteFull(void* buf_, size_t len_) override
{
// write complete content to the shared memory file
if (len_ < size) return false;
memset(buf_, 42, size);
return true;
};

bool Update(void* buf_, size_t len_) override
bool WriteModified(void* buf_, size_t len_) override
{
// update content of the shared memory file
if (len_ < size) return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ void log_struct(const char* action_name, const SSimpleStruct& s)
}

// a binary payload object that handles
// SSimpleStruct Write and Update functionality
// SSimpleStruct WriteFull and WriteModified functionality
class CStructPayload : public eCAL::CPayloadWriter
{
public:
// Write the complete SSimpleStruct to the shared memory
bool Write(void* buf_, size_t len_) override
// WriteFull the complete SSimpleStruct to the shared memory
bool WriteFull(void* buf_, size_t len_) override
{
// check available size and pointer
if (len_ < GetSize() || buf_ == nullptr) return false;
Expand All @@ -75,13 +75,13 @@ class CStructPayload : public eCAL::CPayloadWriter
*static_cast<SSimpleStruct*>(buf_) = simple_struct;

// log action
log_struct("Write SSimpleStruct :", simple_struct);
log_struct("WriteFull SSimpleStruct :", simple_struct);

return true;
};

// Modify the SSimpleStruct in the shared memory
bool Update(void* buf_, size_t len_) override
bool WriteModified(void* buf_, size_t len_) override
{
// check available size and pointer
if (len_ < GetSize() || buf_ == nullptr) return false;
Expand All @@ -90,7 +90,7 @@ class CStructPayload : public eCAL::CPayloadWriter
UpdateStruct(static_cast<SSimpleStruct*>(buf_));

// log action
log_struct("Update SSimpleStruct :", *static_cast<SSimpleStruct*>(buf_));
log_struct("WriteModified SSimpleStruct :", *static_cast<SSimpleStruct*>(buf_));

return true;
};
Expand Down
8 changes: 4 additions & 4 deletions testing/ecal/pubsub_test/src/pubsub_multibuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@
#define PAYLOAD_SIZE 1024

// a binary payload object for testing
// full (Write) and partial (Update) writing
// full (WriteFull) and partial (WriteModified) writing
class CBinaryPayload : public eCAL::CPayloadWriter
{
public:
CBinaryPayload(size_t size_) : size(size_) {}

bool Write(void* buf_, size_t len_) override
bool WriteFull(void* buf_, size_t len_) override
{
// write complete content to the shared memory file
if (len_ < size) return false;
memset(buf_, 42, size);
return true;
};

bool Update(void* buf_, size_t len_) override
bool WriteModified(void* buf_, size_t len_) override
{
// update content of the shared memory file
// modify content of the shared memory file
if (len_ < size) return false;
const size_t write_idx(clock % len_);
const char write_chr(clock % 100);
Expand Down

0 comments on commit 6f8e730

Please sign in to comment.