Skip to content

Commit d7ce64a

Browse files
authored
Merge pull request #734 from ApexAI/iox-#14-make-gateway-aware-of-large-payload-alignments
Iox #14 make gateway aware of large payload alignments
2 parents 8fd54b6 + d6d9813 commit d7ce64a

File tree

21 files changed

+538
-232
lines changed

21 files changed

+538
-232
lines changed

doc/design/chunk_header.md

Lines changed: 5 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class ChunkHeader
4545
{
4646
uint32_t chunkSize;
4747
uint8_t chunkHeaderVersion;
48-
uint8_t reserved[3];
48+
uint8_t reserved{0};
49+
uint16_t userHeaderId;
4950
uint64_t originId;
5051
uint64_t sequenceNumber;
5152
uint32_t userHeaderSize{0U};
@@ -57,7 +58,8 @@ class ChunkHeader
5758

5859
- **chunkSize** is the size of the whole chunk
5960
- **chunkHeaderVersion** is used to detect incompatibilities for record&replay functionality
60-
- **reserved[3]** is currently not used and set to `0`
61+
- **reserved** is currently not used and set to `0`
62+
- **userHeaderId** is currently not used and set to `NO_USER_HEADER`
6163
- **originId** is the unique identifier of the publisher the chunk was sent from
6264
- **sequenceNumber** is a serial number for the sent chunks
6365
- **userPayloadSize** is the size of the chunk occupied by the user-header
@@ -247,33 +249,7 @@ Furthermore, the `Publisher` and `Subscriber` have access to the `ChunkHeader` a
247249

248250
## Open Issues
249251

250-
- the design was done with the intention to have a user-header of arbitrary size, if the size is limited to e.g. 32 bytes, some things could be simplified
251-
- publisher/subscriber API proposal
252-
```
253-
// publisher
254-
auto pub = iox::popo::Publisher<MyPayload, MyUserHeader>(serviceDescription);
255-
pub.loan()
256-
.and_then([&](auto& sample) {
257-
sample.getHeader()->userHeader<MyUserHeader>()->data = 42;
258-
sample->a = 42;
259-
sample->b = 13;
260-
sample.publish();
261-
})
262-
.or_else([](iox::popo::AllocationError) {
263-
// Do something with error.
264-
});
265-
266-
// subscriber
267-
auto sub = iox::popo::Subscriber<MyPayload, MyUserHeader>(serviceDescription);
268-
sub->take()
269-
.and_then([](auto& sample){
270-
std::cout << "User-Header data: " << sample.getHeader()->userHeader<MyUserHeader>()->data << std::endl;
271-
std::cout << "User-Payload data: " << static_cast<const MyPayload*>(sample->get())->data << std::endl;
272-
});
273-
```
274-
- the publisher/subscriber would have a default parameter for the user-header to be source compatible with our current API
275-
- the drawback is that the user could use the wrong user-header. Maybe `Sample` also needs an additional template parameter
276-
- additionally, a `ChunkHeaderHook` could be used on the publisher side
252+
- a `ChunkHeaderHook` could be used on the publisher side
277253
```
278254
template <typename UserHeader>
279255
class MyChunkHeaderHook : public ChunkHeaderHook
@@ -290,43 +266,6 @@ auto pub = iox::popo::Publisher<MyPayload>(serviceDescription, userHeaderHook);
290266
```
291267
- alternatively, instead of the ChunkHeaderHook class, the publisher could have a method `registerDeliveryHook(std::function<void(ChunkHeader&)>)`
292268
- allow the user only read access to the `ChunkHeader` and write access to the `UserHeader`
293-
- untyped publisher/subscriber API proposal
294-
```
295-
// publisher option 1
296-
auto pub = iox::popo::UntypedPublisher<MyUserHeader>(serviceDescription);
297-
298-
// publisher option 2
299-
auto userHeaderSize = sizeOf(MyUserHeader);
300-
auto pub = iox::popo::UntypedPublisher(serviceDescription, userHeaderSize);
301-
302-
auto payloadSize = sizeof(MyPayload);
303-
auto payloadAlignment = alignof(MyPayload);
304-
pub.loan(payloadSize, payloadAlignment)
305-
.and_then([&](auto& sample) {
306-
sample.getHeader()->userHeader<MyUserHeader>()->data = 42;
307-
auto payload = new (sample.get()) MyPayload();
308-
payload->data = 73;
309-
sample.publish();
310-
})
311-
.or_else([](iox::popo::AllocationError) {
312-
// Do something with error.
313-
});
314-
315-
// subscriber option 1
316-
auto pub = iox::popo::UntypedPublisher<MyUserHeader>(serviceDescription);
317-
318-
// subscriber option 2
319-
auto userHeaderSize = sizeOf(MyUserHeader);
320-
auto pub = iox::popo::UntypedSubscriber(serviceDescription, userHeaderSize);
321-
sub->take()
322-
.and_then([](auto& sample){
323-
std::cout << "User-Header data: " << sample.getHeader()->userHeader<MyUserHeader>()->data << std::endl;
324-
std::cout << "User-Payload data: " << static_cast<const MyPayload*>(sample->get())->data << std::endl;
325-
});
326-
```
327-
- option 1 has the benefit to catch a wrong alignment of the user-header and would be necessary if we make the `Sample` aware of the user-header
328-
- C bindings
329-
- PoC is needed
330269
- user defined sequence number
331270
- this can probably be done by a `ChunkHeaderHook`
332271
- alternatively, a flag could be introduce into the `ChunkHeader`

iceoryx_dds/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ if(USE_CYCLONE_DDS)
9090
source/iceoryx_dds/dds/cyclone_context.cpp
9191
source/iceoryx_dds/dds/cyclone_data_reader.cpp
9292
source/iceoryx_dds/dds/cyclone_data_writer.cpp
93+
source/iceoryx_dds/dds/iox_chunk_datagram_header.cpp
9394
)
9495

9596
# Generate IDL at configure time

iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_reader.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
2+
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
23
//
34
// Licensed under the Apache License, Version 2.0 (the "License");
45
// you may not use this file except in compliance with the License.
@@ -46,12 +47,11 @@ class CycloneDataReader : public DataReader
4647

4748
void connect() noexcept override;
4849

49-
iox::cxx::optional<uint32_t> peekNextSize() override;
50-
bool hasSamples() override;
51-
iox::cxx::expected<DataReaderError> takeNext(uint8_t* const buffer, const uint64_t& bufferSize) override;
52-
53-
iox::cxx::expected<uint64_t, DataReaderError>
54-
take(uint8_t* const buffer, const uint64_t& bufferSize, const iox::cxx::optional<uint64_t>& maxSamples) override;
50+
iox::cxx::optional<IoxChunkDatagramHeader> peekNextIoxChunkDatagramHeader() noexcept override;
51+
bool hasSamples() noexcept override;
52+
iox::cxx::expected<DataReaderError> takeNext(const IoxChunkDatagramHeader datagramHeader,
53+
uint8_t* const userHeaderBuffer,
54+
uint8_t* const userPayloadBuffer) noexcept override;
5555

5656
capro::IdString_t getServiceId() const noexcept override;
5757
capro::IdString_t getInstanceId() const noexcept override;

iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_writer.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
2+
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
23
//
34
// Licensed under the Apache License, Version 2.0 (the "License");
45
// you may not use this file except in compliance with the License.
@@ -44,7 +45,9 @@ class CycloneDataWriter : public iox::dds::DataWriter
4445
CycloneDataWriter& operator=(CycloneDataWriter&&) = default;
4546

4647
void connect() noexcept override;
47-
void write(const uint8_t* const bytes, const uint64_t size) noexcept override;
48+
void write(iox::dds::IoxChunkDatagramHeader datagramHeader,
49+
const uint8_t* const userHeaderBytes,
50+
const uint8_t* const userPayloadBytes) noexcept override;
4851
capro::IdString_t getServiceId() const noexcept override;
4952
capro::IdString_t getInstanceId() const noexcept override;
5053
capro::IdString_t getEventId() const noexcept override;

iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#ifndef IOX_DDS_DDS_DATA_READER_HPP
1919
#define IOX_DDS_DDS_DATA_READER_HPP
2020

21+
#include "iceoryx_dds/dds/iox_chunk_datagram_header.hpp"
2122
#include "iceoryx_posh/iceoryx_posh_types.hpp"
2223
#include "iceoryx_utils/cxx/expected.hpp"
2324
#include "iceoryx_utils/cxx/optional.hpp"
@@ -30,13 +31,20 @@ enum class DataReaderError : uint8_t
3031
{
3132
INVALID_STATE,
3233
NOT_CONNECTED,
33-
INVALID_RECV_BUFFER,
34+
INVALID_DATAGRAM_HEADER_SIZE,
35+
INVALID_BUFFER_PARAMETER_FOR_USER_HEADER,
36+
INVALID_BUFFER_PARAMETER_FOR_USER_PAYLOAD,
3437
INVALID_DATA,
35-
RECV_BUFFER_TOO_SMALL
38+
BUFFER_SIZE_MISMATCH
3639
};
3740

38-
constexpr char DataReaderErrorString[][64] = {
39-
"NOT_CONNECTED", "INVALID_RECV_BUFFER", "INVALID_DATA", "RECV_BUFFER_TOO_SMALL"};
41+
constexpr const char* DataReaderErrorString[] = {"INVALID_STATE",
42+
"NOT_CONNECTED",
43+
"INVALID_DATAGRAM_HEADER_SIZE",
44+
"INVALID_BUFFER_PARAMETER_FOR_USER_HEADER",
45+
"INVALID_BUFFER_PARAMETER_FOR_USER_PAYLOAD",
46+
"INVALID_DATA",
47+
"BUFFER_SIZE_MISMATCH"};
4048

4149
class DataReader
4250
{
@@ -47,37 +55,27 @@ class DataReader
4755
virtual void connect() noexcept = 0;
4856

4957
///
50-
/// @brief peekNextSize Get the size of the next sample if one is available.
51-
/// @return The size of the next sample if one is available.
58+
/// @brief peekNextIoxChunkDatagramHeader Get the IoxChunkDatagramHeader of the next sample if one is available.
59+
/// @return The IoxChunkDatagramHeader of the next sample if one is available.
5260
///
53-
virtual iox::cxx::optional<uint32_t> peekNextSize() = 0;
61+
virtual iox::cxx::optional<IoxChunkDatagramHeader> peekNextIoxChunkDatagramHeader() noexcept = 0;
5462

5563
///
5664
/// @brief hasSamples Checks if new samples ready to take.
5765
/// @return True if new samples available.
5866
///
59-
virtual bool hasSamples() = 0;
67+
virtual bool hasSamples() noexcept = 0;
6068

6169
///
6270
/// @brief take Take the next available sample from the DDS data space.
63-
/// @param buffer Receive buffer in which sample will be stored.
64-
/// @param bufferSize Size of the provided buffer.
71+
/// @param datagramHeader with size information
72+
/// @param userHeaderBuffer buffer for the user-header
73+
/// @param userPayloadBuffer buffer for the user-payload
6574
/// @return Error if unsuccessful.
6675
///
67-
virtual iox::cxx::expected<DataReaderError> takeNext(uint8_t* const buffer, const uint64_t& bufferSize) = 0;
68-
69-
70-
///
71-
/// @brief take Take up to a maximum number of samples from the DDS data space.
72-
/// @param buffer Receive buffer in which samples will be stored.
73-
/// @param bufferSize The size of the buffer (in bytes).
74-
/// @param maxSamples The maximum number of samples to request from the network.
75-
/// @return Number of samples taken if successful. Number of samples will be in the sange [0,maxSamples].
76-
///
77-
/// @note Sample size must be known ahead of time & can be checked using @ref peekNextSize() .
78-
///
79-
virtual iox::cxx::expected<uint64_t, DataReaderError>
80-
take(uint8_t* const buffer, const uint64_t& bufferSize, const iox::cxx::optional<uint64_t>& maxSamples) = 0;
76+
virtual iox::cxx::expected<DataReaderError> takeNext(const IoxChunkDatagramHeader datagramHeader,
77+
uint8_t* const userHeaderBuffer,
78+
uint8_t* const userPayloadBuffer) noexcept = 0;
8179

8280
///
8381
/// @brief getServiceId
@@ -98,7 +96,7 @@ class DataReader
9896
virtual capro::IdString_t getEventId() const noexcept = 0;
9997

10098
protected:
101-
DataReader() = default;
99+
DataReader() noexcept = default;
102100
};
103101
} // namespace dds
104102
} // namespace iox

iceoryx_dds/include/iceoryx_dds/dds/data_writer.hpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
2+
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
23
//
34
// Licensed under the Apache License, Version 2.0 (the "License");
45
// you may not use this file except in compliance with the License.
@@ -17,6 +18,8 @@
1718
#ifndef IOX_DDS_DDS_DATA_WRITER_HPP
1819
#define IOX_DDS_DDS_DATA_WRITER_HPP
1920

21+
#include "iceoryx_dds/dds/iox_chunk_datagram_header.hpp"
22+
2023
#include "iceoryx_posh/iceoryx_posh_types.hpp"
2124

2225
#include <cstdint>
@@ -40,11 +43,14 @@ class DataWriter
4043
virtual void connect() noexcept = 0;
4144

4245
///
43-
/// @brief write Write the provided bytes on the DDS network on the topic: serviceId/instanceId/eventId
44-
/// @param bytes
45-
/// @param size
46+
/// @brief write Write the provided header and bytes on the DDS network on the topic: serviceId/instanceId/eventId
47+
/// @param datagramHeader with size information
48+
/// @param userHeaderBytes buffer with the user-header
49+
/// @param userPayloadBytes buffer with the user-payload
4650
///
47-
virtual void write(const uint8_t* const bytes, const uint64_t size) noexcept = 0;
51+
virtual void write(iox::dds::IoxChunkDatagramHeader datagramHeader,
52+
const uint8_t* const userHeaderBytes,
53+
const uint8_t* const userPayloadBytes) noexcept = 0;
4854

4955
///
5056
/// @brief getServiceId
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// SPDX-License-Identifier: Apache-2.0
16+
17+
#ifndef IOX_DDS_DDS_IOX_CHUNK_DATAGRAM_HEADER_HPP
18+
#define IOX_DDS_DDS_IOX_CHUNK_DATAGRAM_HEADER_HPP
19+
20+
#include "iceoryx_utils/cxx/vector.hpp"
21+
22+
#include <cstdint>
23+
24+
namespace iox
25+
{
26+
namespace dds
27+
{
28+
/// @brief the endianess of the serialized data
29+
enum class Endianess : uint8_t
30+
{
31+
UNDEFINED,
32+
LITTLE,
33+
BIG,
34+
MIXED,
35+
};
36+
37+
constexpr const char* EndianessString[] = {"UNDEFINED", "LITTLE", "BIG", "MIXED"};
38+
39+
/// @brief Detects the endianness of the system
40+
Endianess getEndianess();
41+
42+
/// @brief The datagram header with chunk metadata for user-header and user-payload
43+
struct IoxChunkDatagramHeader
44+
{
45+
using Serialized_t = iox::cxx::vector<uint8_t, 16U>;
46+
47+
/// @brief Serializes a IoxChunkDatagramHeader into a vector of uint8_t
48+
/// @param[in] datagramHeader to serialize
49+
/// @return the serialized IoxChunkDatagramHeader
50+
static Serialized_t serialize(const IoxChunkDatagramHeader& datagramHeader);
51+
52+
/// @brief Deserializes a vector of uint8_t into a IoxChunkDatagramHeader
53+
/// @param[in] serializedDatagram is the serialized IoxChunkDatagramHeader
54+
/// @return the deserialized IoxChunkDatagramHeader
55+
static IoxChunkDatagramHeader deserialize(const Serialized_t& serializedDatagramHeader);
56+
57+
/// @brief From the 1.0 release onward, this must be incremented for each incompatible change, e.g.
58+
/// - data width of members changes
59+
/// - members are rearranged
60+
/// - semantic meaning of a member changes
61+
static constexpr uint8_t DATAGRAM_VERSION{1U};
62+
63+
/// @note This must always be the first member and always 1 bytes in order to prevent issues with endianess when
64+
/// deserialized or incorrectly detected versions due to different size
65+
uint8_t datagramVersion{DATAGRAM_VERSION};
66+
/// @note This must always be 1 byte in order to prevent issues with endianess when deserialized
67+
Endianess endianness{Endianess::UNDEFINED};
68+
uint16_t userHeaderId{0xFFFF};
69+
uint32_t userHeaderSize{0U};
70+
uint32_t userPayloadSize{0U};
71+
uint32_t userPayloadAlignment{0U};
72+
};
73+
74+
} // namespace dds
75+
} // namespace iox
76+
77+
#endif // IOX_DDS_DDS_IOX_CHUNK_DATAGRAM_HEADER_HPP

iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,34 @@ inline void DDS2IceoryxGateway<channel_t, gateway_t>::forward(const channel_t& c
6464

6565
while (reader->hasSamples())
6666
{
67-
reader->peekNextSize().and_then([&](auto size) {
68-
publisher->loan(size).and_then([&](auto chunk) {
69-
reader->takeNext(static_cast<uint8_t*>(chunk), size)
70-
.and_then([&]() { publisher->publish(chunk); })
71-
.or_else([&](DataReaderError err) {
72-
publisher->release(chunk);
73-
LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: "
74-
<< dds::DataReaderErrorString[static_cast<uint8_t>(err)];
75-
});
76-
});
67+
reader->peekNextIoxChunkDatagramHeader().and_then([&](auto datagramHeader) {
68+
// this is safe, it is just used to check if the alignment doesn't exceed the
69+
// alignment of the ChunkHeader but since this is data from a previously valid
70+
// chunk, we can assume that the alignment was correct and use this value
71+
constexpr uint32_t USER_HEADER_ALIGNMENT{1U};
72+
publisher
73+
->loan(datagramHeader.userPayloadSize,
74+
datagramHeader.userPayloadAlignment,
75+
datagramHeader.userHeaderSize,
76+
USER_HEADER_ALIGNMENT)
77+
.and_then([&](auto userPayload) {
78+
auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(userPayload);
79+
reader
80+
->takeNext(datagramHeader,
81+
static_cast<uint8_t*>(chunkHeader->userHeader()),
82+
static_cast<uint8_t*>(chunkHeader->userPayload()))
83+
.and_then([&]() { publisher->publish(userPayload); })
84+
.or_else([&](DataReaderError err) {
85+
publisher->release(userPayload);
86+
LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: "
87+
<< dds::DataReaderErrorString[static_cast<uint8_t>(err)];
88+
});
89+
})
90+
.or_else([](auto& error) {
91+
LogError() << "[DDS2IceoryxGateway] Could not loan chunk! Error code: "
92+
<< static_cast<uint64_t>(error);
93+
});
94+
;
7795
});
7896
}
7997
}

0 commit comments

Comments
 (0)