Skip to content

Commit 4b446f8

Browse files
committed
Multi-topic latency measurement example
This adds a simple tool for doing latency measurements between processes when writing multiple topics. It assumes the clocks are synchronised to a high degree so that one-way latencies can be computed directly. It can operate with a number of different types, all very simple: struct Hop128 { @key uint32 key; uint32 seq; octet z[128 - 8]; }; and variants where the total size is 8, 1k, 8k and 128k bytes. A process is either a source of data, writing an each instance for each of N topics every 10ms, or it is a sink, recording for each received sample the latency and the topic. It can publish these N samples as quickly in succession as possible, or it can do it while requesting 100us sleeps in between. Signed-off-by: Erik Boasson <eb@ilities.com>
1 parent f25b0e4 commit 4b446f8

File tree

3 files changed

+323
-0
lines changed

3 files changed

+323
-0
lines changed

examples/hop/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,10 @@ if(NOT TARGET CycloneDDS-CXX::ddscxx)
2020
endif()
2121

2222
idlcxx_generate(TARGET hop_type FILES hop_type.idl)
23+
idlcxx_generate(TARGET mop_type FILES mop_type.idl)
24+
2325
add_executable(hop hop.cpp)
2426
target_link_libraries(hop CycloneDDS-CXX::ddscxx hop_type)
27+
28+
add_executable(mop mop.cpp)
29+
target_link_libraries(mop CycloneDDS-CXX::ddscxx mop_type)

examples/hop/mop.cpp

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Copyright(c) 2024 ZettaScale Technology and others
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
7+
* v. 1.0 which is available at
8+
* http://www.eclipse.org/org/documents/edl-v10.php.
9+
*
10+
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
11+
*/
12+
13+
#include <algorithm>
14+
#include <iostream>
15+
#include <fstream>
16+
#include <sstream>
17+
#include <iomanip>
18+
#include <chrono>
19+
#include <thread>
20+
#include <string>
21+
#include <cstdlib>
22+
#include <random>
23+
24+
#include <unistd.h>
25+
26+
#include "dds/dds.hpp"
27+
#include "mop_type.hpp"
28+
29+
using namespace org::eclipse::cyclonedds;
30+
using namespace std::chrono_literals;
31+
32+
static bool sleep_between_write = false;
33+
static uint32_t ntopics = 1;
34+
static std::optional<uint32_t> pubidx;
35+
static std::optional<std::string> datafile;
36+
37+
template<typename CLK>
38+
static dds::core::Time mkDDSTime (const std::chrono::time_point<CLK> x)
39+
{
40+
int64_t t = std::chrono::duration_cast<std::chrono::nanoseconds>(x.time_since_epoch()).count();
41+
return dds::core::Time(t / 1000000000, static_cast<uint32_t>(t % 1000000000));
42+
}
43+
44+
static volatile std::atomic<bool> interrupted = false;
45+
static void sigh(int sig)
46+
{
47+
static_cast<void>(sig);
48+
interrupted = true;
49+
}
50+
51+
template<typename T>
52+
static dds::sub::DataReader<T> make_reader(dds::topic::Topic<T> tp)
53+
{
54+
dds::domain::DomainParticipant dp = tp.domain_participant();
55+
std::vector<std::string> spart{"P"};
56+
dds::sub::qos::SubscriberQos sqos = dp.default_subscriber_qos() << dds::core::policy::Partition(spart);
57+
dds::sub::Subscriber sub{dp, sqos};
58+
return dds::sub::DataReader<T>{sub, tp, tp.qos()};
59+
}
60+
61+
template<typename T>
62+
static dds::pub::DataWriter<T> make_writer(dds::topic::Topic<T> tp)
63+
{
64+
dds::domain::DomainParticipant dp = tp.domain_participant();
65+
std::vector<std::string> ppart{"P"};
66+
dds::pub::qos::PublisherQos pqos = dp.default_publisher_qos() << dds::core::policy::Partition(ppart);
67+
dds::pub::Publisher pub{dp, pqos};
68+
return dds::pub::DataWriter<T>{pub, tp, tp.qos()};
69+
}
70+
71+
template<typename T>
72+
static void source(std::vector<dds::topic::Topic<T>>& tps)
73+
{
74+
std::vector<dds::pub::DataWriter<T>> wrs;
75+
for (auto tp : tps)
76+
wrs.push_back(make_writer(tp));
77+
signal(SIGINT, sigh);
78+
T sample{};
79+
sample.k(static_cast<uint32_t>(getpid()));
80+
auto now = std::chrono::high_resolution_clock::now();
81+
// give forwarders and sink time to start & discovery to run
82+
std::cout << "starting in 1s" << std::endl;
83+
now += 1s;
84+
std::this_thread::sleep_until(now);
85+
while (!interrupted)
86+
{
87+
if (pubidx.has_value())
88+
{
89+
wrs[pubidx.value()].write(sample, mkDDSTime(std::chrono::high_resolution_clock::now()));
90+
}
91+
else
92+
{
93+
auto nowx = now;
94+
for (auto wr : wrs)
95+
{
96+
wr.write(sample, mkDDSTime(std::chrono::high_resolution_clock::now()));
97+
if (sleep_between_write)
98+
{
99+
nowx += 100us;
100+
std::this_thread::sleep_until(nowx);
101+
}
102+
}
103+
}
104+
++sample.seq();
105+
now += 10ms;
106+
std::this_thread::sleep_until(now);
107+
}
108+
std::cout << "wrote " << ntopics << " * " << sample.seq() << " samples" << std::endl;
109+
}
110+
111+
template<typename T>
112+
class Sink : public dds::sub::NoOpDataReaderListener<T> {
113+
public:
114+
Sink() = delete;
115+
Sink(size_t idx, std::vector<std::pair<double, size_t>>& lats) : idx_{idx}, lats_{lats} { }
116+
117+
private:
118+
void on_data_available(dds::sub::DataReader<T>& rd)
119+
{
120+
const auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
121+
auto xs = rd.take();
122+
for (const auto& x : xs) {
123+
if (x.info().valid()) {
124+
const auto lat = now - (x.info().timestamp().sec() * 1000000000 + x.info().timestamp().nanosec());
125+
lats_.push_back(std::make_pair(lat / 1e3, idx_));
126+
} else {
127+
interrupted = true;
128+
}
129+
};
130+
}
131+
132+
size_t idx_;
133+
std::vector<std::pair<double, size_t>>& lats_;
134+
};
135+
136+
template<typename T>
137+
static void sink(std::vector<dds::topic::Topic<T>>& tps)
138+
{
139+
// latencies in microseconds
140+
std::vector<std::pair<double, size_t>> lats;
141+
// read until source disappears
142+
// always create the "junk reader": it costs us nothing if no junk data is being published
143+
{
144+
std::vector<dds::sub::DataReader<T>> rds;
145+
std::vector<Sink<T>> ls;
146+
for (size_t i = 0; i < tps.size(); i++)
147+
ls.push_back(Sink<T>{i, lats});
148+
for (size_t i = 0; i < tps.size(); i++)
149+
{
150+
rds.push_back(make_reader(tps[i]));
151+
rds[i].listener(&ls[i], dds::core::status::StatusMask::data_available());
152+
}
153+
while (!interrupted)
154+
{
155+
std::this_thread::sleep_for(103ms);
156+
}
157+
for (auto rd : rds)
158+
{
159+
rd.listener();
160+
rd.close();
161+
}
162+
}
163+
// destructors will have run, latencies are ours now
164+
if (datafile.has_value())
165+
{
166+
std::ofstream f;
167+
f.open(datafile.value());
168+
for (const auto& l : lats)
169+
f << l.first << " " << l.second << std::endl;
170+
f.close();
171+
}
172+
const size_t n = lats.size();
173+
if (n < 2) {
174+
std::cout << "insufficient data" << std::endl;
175+
} else {
176+
std::sort(lats.begin(), lats.end());
177+
std::cout << "received " << n << " samples; min " << lats[0].first << " max-1 " << lats[n-2].first << " max " << lats[n-1].first << std::endl;
178+
}
179+
}
180+
181+
enum class Mode { Source, Sink };
182+
183+
template<typename T>
184+
static void run(const Mode mode)
185+
{
186+
dds::domain::DomainParticipant dp{0};
187+
auto tpqos = dp.default_topic_qos()
188+
<< dds::core::policy::Reliability::Reliable(dds::core::Duration::infinite())
189+
<< dds::core::policy::History::KeepLast(1);
190+
std::vector<dds::topic::Topic<T>> tps;
191+
for (uint32_t i = 0; i < ntopics; i++)
192+
tps.push_back(dds::topic::Topic<T>{dp, "Mop" + std::to_string(i), tpqos});
193+
switch (mode)
194+
{
195+
case Mode::Source: source(tps); break;
196+
case Mode::Sink: sink(tps); break;
197+
}
198+
}
199+
200+
[[noreturn]]
201+
static void usage()
202+
{
203+
std::cout
204+
<< "usage: mop {source|sink} [OPTIONS] TYPE" << std::endl
205+
<< "OPTIONS:" << std::endl
206+
<< "-nNTPS use N topics in parallel (def = 1)" << std::endl
207+
<< "-pIDX publish only on topic IDX" << std::endl
208+
<< "-oFILE write latencies to FILE (sink)" << std::endl
209+
<< "-x sleep 100us between successive writes" << std::endl
210+
<< "TYPE: one of 8, 128, 1k, 8k, 128k" << std::endl;
211+
std::exit(1);
212+
}
213+
214+
int main (int argc, char **argv)
215+
{
216+
if (argc < 2)
217+
usage();
218+
const std::string modestr = std::string(argv[1]);
219+
Mode mode;
220+
if (modestr == "source") {
221+
mode = Mode::Source;
222+
} else if (modestr == "sink") {
223+
mode = Mode::Sink;
224+
} else {
225+
std::cout << "invalid mode, should be source or sink" << std::endl;
226+
return 1;
227+
}
228+
229+
optind = 2;
230+
int opt;
231+
while ((opt = getopt (argc, argv, "n:o:p:x")) != EOF)
232+
{
233+
switch (opt)
234+
{
235+
case 'n':
236+
ntopics = static_cast<uint32_t>(std::atoi(optarg));
237+
break;
238+
case 'o':
239+
datafile = std::string(optarg);
240+
break;
241+
case 'p':
242+
pubidx = static_cast<uint32_t>(std::atoi(optarg));
243+
break;
244+
case 'x':
245+
sleep_between_write = true;
246+
break;
247+
default:
248+
usage();
249+
}
250+
}
251+
if (pubidx.has_value() && pubidx.value() >= ntopics)
252+
{
253+
std::cout << "topic index for publishing out of range" << std::endl;
254+
return 1;
255+
}
256+
if (argc - optind != 1)
257+
{
258+
usage();
259+
}
260+
const std::string typestr = std::string(argv[optind]);
261+
if (typestr == "8") {
262+
run<Mop8>(mode);
263+
} else if (typestr == "128") {
264+
run<Mop128>(mode);
265+
} else if (typestr == "1k") {
266+
run<Mop1k>(mode);
267+
} else if (typestr == "8k") {
268+
run<Mop8k>(mode);
269+
} else if (typestr == "128k") {
270+
run<Mop128k>(mode);
271+
} else {
272+
std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl;
273+
return 1;
274+
}
275+
return 0;
276+
}

examples/hop/mop_type.idl

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright(c) 2024 ZettaScale Technology and others
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
7+
* v. 1.0 which is available at
8+
* http://www.eclipse.org/org/documents/edl-v10.php.
9+
*
10+
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
11+
*/
12+
13+
@final @topic
14+
struct Mop8 {
15+
@key uint32 k;
16+
uint32 seq;
17+
//octet z[8 - 8];
18+
};
19+
@final @topic
20+
struct Mop128 {
21+
@key uint32 k;
22+
uint32 seq;
23+
octet z[128 - 8];
24+
};
25+
@final @topic
26+
struct Mop1k {
27+
@key uint32 k;
28+
uint32 seq;
29+
octet z[1024 - 8];
30+
};
31+
@final @topic
32+
struct Mop8k {
33+
@key uint32 k;
34+
uint32 seq;
35+
octet z[8*1024 - 8];
36+
};
37+
@final @topic
38+
struct Mop128k {
39+
@key uint32 k;
40+
uint32 seq;
41+
octet z[128*1024 - 8];
42+
};

0 commit comments

Comments
 (0)