Skip to content

Commit 21e89b0

Browse files
committed
Some refinements to "mop"
The sinks now record triplets: latency, topic idx and source id. Sources now try to agree on a reference time, then start publishing at configurable delay from that reference time. This allows running two sources publishing at exactly the same time (if you have enough cores), or rather preventing them from publishing at the same time. It adds a run script that does a small 2x2 matrix of tests with two sources and two sinks: starting at the same time or the second delayed by 2ms, and with/without sleep 100us in between samples. It also fixes a potential race condition on appending latencies in the listener. Since the listeners in this experiment always get invoked on the same thread it didn't cause any problems. Signed-off-by: Erik Boasson <eb@ilities.com>
1 parent 4b446f8 commit 21e89b0

File tree

3 files changed

+225
-70
lines changed

3 files changed

+225
-70
lines changed

examples/hop/mop.cpp

Lines changed: 190 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,23 @@
2828

2929
using namespace org::eclipse::cyclonedds;
3030
using namespace std::chrono_literals;
31+
using namespace std::chrono;
3132

33+
using CLK = high_resolution_clock;
34+
35+
enum class Type { T8, T128, T1k, T8k, T128k };
36+
37+
static Type type = Type::T128;
3238
static bool sleep_between_write = false;
33-
static uint32_t ntopics = 1;
39+
static uint32_t keyval = static_cast<uint32_t>(getpid());
40+
static uint32_t ntopics = 10;
41+
static uint32_t stagger = 0; // ms
3442
static std::optional<uint32_t> pubidx;
3543
static std::optional<std::string> datafile;
3644

37-
template<typename CLK>
38-
static dds::core::Time mkDDSTime (const std::chrono::time_point<CLK> x)
45+
static dds::core::Time mkDDSTime (const time_point<CLK> x)
3946
{
40-
int64_t t = std::chrono::duration_cast<std::chrono::nanoseconds>(x.time_since_epoch()).count();
47+
int64_t t = duration_cast<nanoseconds>(x.time_since_epoch()).count();
4148
return dds::core::Time(t / 1000000000, static_cast<uint32_t>(t % 1000000000));
4249
}
4350

@@ -68,32 +75,74 @@ static dds::pub::DataWriter<T> make_writer(dds::topic::Topic<T> tp)
6875
return dds::pub::DataWriter<T>{pub, tp, tp.qos()};
6976
}
7077

78+
static void make_start_entities (dds::domain::DomainParticipant dp, dds::sub::DataReader<MopSync>& rd, dds::pub::DataWriter<MopSync>& wr)
79+
{
80+
auto tpqos = dp.default_topic_qos()
81+
<< dds::core::policy::Reliability::Reliable(dds::core::Duration::infinite())
82+
<< dds::core::policy::Durability::TransientLocal()
83+
<< dds::core::policy::History::KeepLast(1);
84+
auto tp = dds::topic::Topic<MopSync>(dp, "MopSync", tpqos);
85+
rd = make_reader(tp);
86+
wr = make_writer(tp);
87+
}
88+
89+
static time_point<CLK> get_start_time(dds::sub::DataReader<MopSync> rd, dds::pub::DataWriter<MopSync> wr)
90+
{
91+
auto tstart = CLK::now() + 1s;
92+
int64_t tstart_int64 = duration_cast<nanoseconds>(tstart.time_since_epoch()).count();
93+
std::cout << keyval << " proposing " << tstart.time_since_epoch().count() << std::endl;
94+
wr << MopSync{keyval, tstart_int64};
95+
while (CLK::now() < tstart - 2ms)
96+
{
97+
auto ms = rd.take();
98+
for (const auto& m : ms)
99+
{
100+
if (!m.info().valid())
101+
continue;
102+
auto prop = time_point<CLK>(nanoseconds(m.data().tstart()));
103+
if (prop < tstart)
104+
{
105+
tstart = prop;
106+
std::cout << keyval << " updating to " << tstart.time_since_epoch().count() << " from " << m.data().k() << std::endl;
107+
}
108+
}
109+
std::this_thread::sleep_for(1ms);
110+
}
111+
tstart += milliseconds(stagger);
112+
std::cout << keyval << " starting at " << tstart.time_since_epoch().count() << std::endl;
113+
return tstart;
114+
}
115+
71116
template<typename T>
72117
static void source(std::vector<dds::topic::Topic<T>>& tps)
73118
{
119+
// make entities for synchronised start first, make sure they stay around while
120+
// we measure to avoid disturbing the measurement with the entity deletion and
121+
// associated discovery work
122+
dds::sub::DataReader<MopSync> start_rd = dds::core::null;
123+
dds::pub::DataWriter<MopSync> start_wr = dds::core::null;
124+
make_start_entities(tps[0].domain_participant(), start_rd, start_wr);
74125
std::vector<dds::pub::DataWriter<T>> wrs;
75126
for (auto tp : tps)
76127
wrs.push_back(make_writer(tp));
77128
signal(SIGINT, sigh);
129+
signal(SIGTERM, sigh);
78130
T sample{};
79-
sample.k(static_cast<uint32_t>(getpid()));
80-
auto now = std::chrono::high_resolution_clock::now();
81-
// give forwarders and sink time to start & discovery to run
82-
std::cout << "starting in 1s" << std::endl;
83-
now += 1s;
131+
sample.k(keyval);
132+
auto now = get_start_time (start_rd, start_wr);
84133
std::this_thread::sleep_until(now);
85134
while (!interrupted)
86135
{
87136
if (pubidx.has_value())
88137
{
89-
wrs[pubidx.value()].write(sample, mkDDSTime(std::chrono::high_resolution_clock::now()));
138+
wrs[pubidx.value()].write(sample, mkDDSTime(CLK::now()));
90139
}
91140
else
92141
{
93142
auto nowx = now;
94143
for (auto wr : wrs)
95144
{
96-
wr.write(sample, mkDDSTime(std::chrono::high_resolution_clock::now()));
145+
wr.write(sample, mkDDSTime(CLK::now()));
97146
if (sleep_between_write)
98147
{
99148
nowx += 100us;
@@ -105,76 +154,89 @@ static void source(std::vector<dds::topic::Topic<T>>& tps)
105154
now += 10ms;
106155
std::this_thread::sleep_until(now);
107156
}
108-
std::cout << "wrote " << ntopics << " * " << sample.seq() << " samples" << std::endl;
157+
std::cout << keyval << "wrote " << ntopics << " * " << sample.seq() << " samples" << std::endl;
109158
}
110159

160+
// t = reception time, l = latency, i = topic index, k = source key
161+
struct TLK { int64_t t; double l; uint32_t k; };
162+
struct TLIK { int64_t t; double l; size_t i; uint32_t k; };
163+
struct LIK { double l; size_t i; uint32_t k; };
164+
111165
template<typename T>
112166
class Sink : public dds::sub::NoOpDataReaderListener<T> {
113167
public:
114-
Sink() = delete;
115-
Sink(size_t idx, std::vector<std::pair<double, size_t>>& lats) : idx_{idx}, lats_{lats} { }
168+
Sink() = default;
169+
170+
const std::vector<TLK>& lats() const {
171+
return lats_;
172+
};
116173

117174
private:
118175
void on_data_available(dds::sub::DataReader<T>& rd)
119176
{
120-
const auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
177+
const auto now_clk = CLK::now();
178+
const int64_t now = duration_cast<nanoseconds>(now_clk.time_since_epoch()).count();
121179
auto xs = rd.take();
122180
for (const auto& x : xs) {
123181
if (x.info().valid()) {
124182
const auto lat = now - (x.info().timestamp().sec() * 1000000000 + x.info().timestamp().nanosec());
125-
lats_.push_back(std::make_pair(lat / 1e3, idx_));
183+
lats_.push_back(TLK{now, lat / 1e3, x.data().k()});
126184
} else {
127185
interrupted = true;
128186
}
129187
};
130188
}
131189

132-
size_t idx_;
133-
std::vector<std::pair<double, size_t>>& lats_;
190+
std::vector<TLK> lats_;
134191
};
135192

136193
template<typename T>
137194
static void sink(std::vector<dds::topic::Topic<T>>& tps)
138195
{
139196
// latencies in microseconds
140-
std::vector<std::pair<double, size_t>> lats;
197+
std::vector<LIK> lats;
141198
// read until source disappears
142-
// always create the "junk reader": it costs us nothing if no junk data is being published
143199
{
144200
std::vector<dds::sub::DataReader<T>> rds;
145201
std::vector<Sink<T>> ls;
146202
for (size_t i = 0; i < tps.size(); i++)
147-
ls.push_back(Sink<T>{i, lats});
203+
ls.push_back(Sink<T>{});
148204
for (size_t i = 0; i < tps.size(); i++)
149-
{
150205
rds.push_back(make_reader(tps[i]));
206+
for (size_t i = 0; i < tps.size(); i++)
151207
rds[i].listener(&ls[i], dds::core::status::StatusMask::data_available());
152-
}
153208
while (!interrupted)
154-
{
155209
std::this_thread::sleep_for(103ms);
156-
}
157210
for (auto rd : rds)
158-
{
159-
rd.listener();
160211
rd.close();
161-
}
212+
// collect latencies for all topics and sort by reception time
213+
std::vector<TLIK> tlats;
214+
for (size_t i = 0; i < ls.size(); i++)
215+
for (const auto& x : ls[i].lats())
216+
tlats.push_back(TLIK{x.t, x.l, i, x.k});
217+
std::sort(tlats.begin(), tlats.end(), [](const TLIK& a, const TLIK& b) -> bool { return a.t < b.t; });
218+
// then reduce to just latency, topic and key
219+
for (const auto& x : tlats)
220+
lats.push_back(LIK{x.l, x.i, x.k});
162221
}
163-
// destructors will have run, latencies are ours now
164222
if (datafile.has_value())
165223
{
166224
std::ofstream f;
167225
f.open(datafile.value());
168226
for (const auto& l : lats)
169-
f << l.first << " " << l.second << std::endl;
227+
f << l.l << " " << l.i << " " << l.k << std::endl;
170228
f.close();
171229
}
172230
const size_t n = lats.size();
173231
if (n < 2) {
174232
std::cout << "insufficient data" << std::endl;
175233
} else {
176-
std::sort(lats.begin(), lats.end());
177-
std::cout << "received " << n << " samples; min " << lats[0].first << " max-1 " << lats[n-2].first << " max " << lats[n-1].first << std::endl;
234+
std::sort(lats.begin(), lats.end(), [](const LIK& a, const LIK& b) -> bool { return a.l < b.l; });
235+
std::cout
236+
<< "received " << n
237+
<< " samples; min " << lats[0].l
238+
<< " max-1 " << lats[n-2].l
239+
<< " max " << lats[n-1].l << std::endl;
178240
}
179241
}
180242

@@ -201,16 +263,58 @@ static void run(const Mode mode)
201263
static void usage()
202264
{
203265
std::cout
204-
<< "usage: mop {source|sink} [OPTIONS] TYPE" << std::endl
205-
<< "OPTIONS:" << std::endl
206-
<< "-nNTPS use N topics in parallel (def = 1)" << std::endl
266+
<< "usage: mop {source|sink} [OPTIONS]" << std::endl
267+
<< std::endl
268+
<< "COMMON OPTIONS:" << std::endl
269+
<< "-tTYPE type to use one of 8, 128 (def), 1k, 8k, 128k" << std::endl
270+
<< "-nNTPS use N (def = 10) topics in parallel" << std::endl
271+
<< std::endl
272+
<< "SOURCE OPTIONS:" << std::endl
273+
<< "-kKVAL use KVAL as key value instead of process id" << std::endl
207274
<< "-pIDX publish only on topic IDX" << std::endl
208-
<< "-oFILE write latencies to FILE (sink)" << std::endl
275+
<< "-sDELAY stagger: offset by DELAY ms (def = 0)" << std::endl
209276
<< "-x sleep 100us between successive writes" << std::endl
210-
<< "TYPE: one of 8, 128, 1k, 8k, 128k" << std::endl;
277+
<< std::endl
278+
<< "SINK OPTIONS:" << std::endl
279+
<< "-oFILE write latencies to FILE" << std::endl;
211280
std::exit(1);
212281
}
213282

283+
static Type convert_typestr (const std::string& typestr)
284+
{
285+
if (typestr == "8") {
286+
return Type::T8;
287+
} else if (typestr == "128") {
288+
return Type::T128;
289+
} else if (typestr == "1k") {
290+
return Type::T1k;
291+
} else if (typestr == "8k") {
292+
return Type::T8k;
293+
} else if (typestr == "128k") {
294+
return Type::T128k;
295+
} else {
296+
std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl;
297+
std::exit(1);
298+
return Type::T128;
299+
}
300+
}
301+
302+
static bool handle_common_opt (int opt)
303+
{
304+
switch (opt)
305+
{
306+
case 'n':
307+
ntopics = static_cast<uint32_t>(std::atoi(optarg));
308+
return true;
309+
case 't':
310+
type = convert_typestr(std::string(optarg));
311+
return true;
312+
default:
313+
// not a common option
314+
return false;
315+
}
316+
}
317+
214318
int main (int argc, char **argv)
215319
{
216320
if (argc < 2)
@@ -226,51 +330,67 @@ int main (int argc, char **argv)
226330
return 1;
227331
}
228332

333+
const std::string common_opt = "n:t:";
229334
optind = 2;
230335
int opt;
231-
while ((opt = getopt (argc, argv, "n:o:p:x")) != EOF)
336+
switch (mode)
232337
{
233-
switch (opt)
234-
{
235-
case 'n':
236-
ntopics = static_cast<uint32_t>(std::atoi(optarg));
237-
break;
238-
case 'o':
239-
datafile = std::string(optarg);
240-
break;
241-
case 'p':
242-
pubidx = static_cast<uint32_t>(std::atoi(optarg));
243-
break;
244-
case 'x':
245-
sleep_between_write = true;
246-
break;
247-
default:
248-
usage();
249-
}
338+
case Mode::Source:
339+
while ((opt = getopt (argc, argv, (common_opt + "k:p:s:x").c_str())) != EOF)
340+
{
341+
if (handle_common_opt (opt))
342+
continue;
343+
switch (opt)
344+
{
345+
case 'k':
346+
keyval = static_cast<uint32_t>(std::atoi(optarg));
347+
break;
348+
case 'p':
349+
pubidx = static_cast<uint32_t>(std::atoi(optarg));
350+
break;
351+
case 's':
352+
stagger = static_cast<uint32_t>(std::atoi(optarg));
353+
break;
354+
case 'x':
355+
sleep_between_write = true;
356+
break;
357+
default:
358+
usage();
359+
}
360+
}
361+
break;
362+
case Mode::Sink:
363+
while ((opt = getopt (argc, argv, (common_opt + "o:").c_str())) != EOF)
364+
{
365+
if (handle_common_opt (opt))
366+
continue;
367+
switch (opt)
368+
{
369+
case 'o':
370+
datafile = std::string(optarg);
371+
break;
372+
default:
373+
usage();
374+
}
375+
}
376+
break;
250377
}
251378
if (pubidx.has_value() && pubidx.value() >= ntopics)
252379
{
253380
std::cout << "topic index for publishing out of range" << std::endl;
254381
return 1;
255382
}
256-
if (argc - optind != 1)
383+
if (argc - optind != 0)
257384
{
258385
usage();
259386
}
260-
const std::string typestr = std::string(argv[optind]);
261-
if (typestr == "8") {
262-
run<Mop8>(mode);
263-
} else if (typestr == "128") {
264-
run<Mop128>(mode);
265-
} else if (typestr == "1k") {
266-
run<Mop1k>(mode);
267-
} else if (typestr == "8k") {
268-
run<Mop8k>(mode);
269-
} else if (typestr == "128k") {
270-
run<Mop128k>(mode);
271-
} else {
272-
std::cout << "invalid type, should be 8, 128, 1k, 8k, 128k" << std::endl;
273-
return 1;
387+
switch (type)
388+
{
389+
case Type::T8: run<Mop8>(mode); break;
390+
case Type::T128: run<Mop128>(mode); break;
391+
case Type::T1k: run<Mop1k>(mode); break;
392+
case Type::T8k: run<Mop8k>(mode); break;
393+
case Type::T128k: run<Mop128k>(mode); break;
274394
}
275395
return 0;
276396
}

examples/hop/mop_type.idl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@
1010
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
1111
*/
1212

13+
@final @topic
14+
struct MopSync {
15+
@key uint32 k;
16+
int64 tstart;
17+
};
18+
1319
@final @topic
1420
struct Mop8 {
1521
@key uint32 k;

0 commit comments

Comments
 (0)