Skip to content

Commit 7999763

Browse files
committed
DPL: wait as long as possible for Sporadic inputs
Right now if we have the standard consumeWhenAll policy and we have a sporadic input, it will wait indefinitely until all the inputs arrive or it will drop timeframes without that sporadic input. This changes the behavior and waits only until the oldest possible timeframe does not allow the Sporadic input to be there. At which point, it schedules the processing in any case, under the assumption that a task declaring a sporadic input knows what to do in case it's not there.
1 parent 99d40d8 commit 7999763

File tree

4 files changed

+123
-1
lines changed

4 files changed

+123
-1
lines changed

Framework/Core/src/CompletionPolicyHelpers.cxx

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "Framework/DeviceSpec.h"
1616
#include "Framework/CompilerBuiltins.h"
1717
#include "Framework/Logger.h"
18+
#include "Framework/TimesliceIndex.h"
1819
#include "Framework/TimingInfo.h"
1920
#include "DecongestionService.h"
2021

@@ -107,10 +108,33 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
107108
{
108109
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
109110
assert(inputs.size() == specs.size());
111+
112+
size_t si = 0;
113+
bool missingSporadic = false;
114+
size_t currentTimeslice = -1;
110115
for (auto& input : inputs) {
111-
if (input.header == nullptr) {
116+
assert(si < specs.size());
117+
auto& spec = specs[si++];
118+
if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) {
112119
return CompletionPolicy::CompletionOp::Wait;
113120
}
121+
if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) {
122+
missingSporadic = true;
123+
}
124+
if (input.header != nullptr && currentTimeslice == -1) {
125+
auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
126+
if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
127+
currentTimeslice = dph->startTime;
128+
}
129+
}
130+
}
131+
// If some sporadic inputs are missing, we wait for them util we are sure they will not come,
132+
// i.e. until the oldest possible timeslice is beyond the timeslice of the input.
133+
auto& timesliceIndex = ref.get<TimesliceIndex>();
134+
auto oldestPossibleTimeslice = timesliceIndex.getOldestPossibleInput().timeslice.value;
135+
136+
if (missingSporadic && currentTimeslice >= oldestPossibleTimeslice) {
137+
return CompletionPolicy::CompletionOp::Retry;
114138
}
115139
return CompletionPolicy::CompletionOp::Consume;
116140
};

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ TEST_CASE("DataRelayer")
7676
std::vector<ForwardRoute> forwards;
7777
std::vector<InputChannelInfo> infos{1};
7878
TimesliceIndex index{1, infos};
79+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
7980

8081
auto policy = CompletionPolicyHelpers::consumeWhenAny();
8182
DataRelayer relayer(policy, inputs, index, {registry});
@@ -124,6 +125,7 @@ TEST_CASE("DataRelayer")
124125
std::vector<ForwardRoute> forwards;
125126
std::vector<InputChannelInfo> infos{1};
126127
TimesliceIndex index{1, infos};
128+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
127129

128130
auto policy = CompletionPolicyHelpers::consumeWhenAny();
129131
DataRelayer relayer(policy, inputs, index, {registry});
@@ -184,6 +186,7 @@ TEST_CASE("DataRelayer")
184186

185187
std::vector<InputChannelInfo> infos{1};
186188
TimesliceIndex index{1, infos};
189+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
187190

188191
auto policy = CompletionPolicyHelpers::consumeWhenAll();
189192
DataRelayer relayer(policy, inputs, index, {registry});
@@ -263,6 +266,7 @@ TEST_CASE("DataRelayer")
263266

264267
std::vector<InputChannelInfo> infos{1};
265268
TimesliceIndex index{1, infos};
269+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
266270

267271
auto policy = CompletionPolicyHelpers::consumeWhenAll();
268272
DataRelayer relayer(policy, inputs, index, {registry});
@@ -346,6 +350,7 @@ TEST_CASE("DataRelayer")
346350
auto policy = CompletionPolicyHelpers::consumeWhenAll();
347351
std::vector<InputChannelInfo> infos{1};
348352
TimesliceIndex index{1, infos};
353+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
349354
DataRelayer relayer(policy, inputs, index, {registry});
350355
// Only two messages to fill the cache.
351356
relayer.setPipelineLength(2);
@@ -420,6 +425,7 @@ TEST_CASE("DataRelayer")
420425
std::vector<ForwardRoute> forwards;
421426
std::vector<InputChannelInfo> infos{1};
422427
TimesliceIndex index{1, infos};
428+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
423429

424430
auto policy = CompletionPolicyHelpers::processWhenAny();
425431
DataRelayer relayer(policy, inputs, index, {registry});
@@ -490,6 +496,7 @@ TEST_CASE("DataRelayer")
490496
std::vector<ForwardRoute> forwards;
491497
std::vector<InputChannelInfo> infos{1};
492498
TimesliceIndex index{1, infos};
499+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
493500

494501
auto policy = CompletionPolicyHelpers::processWhenAny();
495502
DataRelayer relayer(policy, inputs, index, {registry});
@@ -547,6 +554,7 @@ TEST_CASE("DataRelayer")
547554
std::vector<ForwardRoute> forwards;
548555
std::vector<InputChannelInfo> infos{1};
549556
TimesliceIndex index{1, infos};
557+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
550558

551559
auto policy = CompletionPolicyHelpers::processWhenAny();
552560
DataRelayer relayer(policy, inputs, index, {registry});
@@ -605,6 +613,7 @@ TEST_CASE("DataRelayer")
605613
std::vector<ForwardRoute> forwards;
606614
std::vector<InputChannelInfo> infos{1};
607615
TimesliceIndex index{1, infos};
616+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
608617

609618
auto policy = CompletionPolicyHelpers::processWhenAny();
610619
DataRelayer relayer(policy, inputs, index, {registry});
@@ -670,6 +679,7 @@ TEST_CASE("DataRelayer")
670679
std::vector<ForwardRoute> forwards;
671680
std::vector<InputChannelInfo> infos{1};
672681
TimesliceIndex index{1, infos};
682+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
673683

674684
auto policy = CompletionPolicyHelpers::consumeWhenAny();
675685
DataRelayer relayer(policy, inputs, index, {registry});
@@ -722,6 +732,7 @@ TEST_CASE("DataRelayer")
722732
std::vector<ForwardRoute> forwards;
723733
std::vector<InputChannelInfo> infos{1};
724734
TimesliceIndex index{1, infos};
735+
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
725736

726737
auto policy = CompletionPolicyHelpers::consumeWhenAny();
727738
DataRelayer relayer(policy, inputs, index, {registry});

Framework/TestWorkflows/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,15 @@
1212
o2_add_dpl_workflow(dummy-workflow
1313
SOURCES src/o2DummyWorkflow.cxx
1414
COMPONENT_NAME TestWorkflows)
15+
1516
o2_add_dpl_workflow(detect-missing-timeframe
1617
SOURCES test/test_DetectMissingTimeframe.cxx
1718
COMPONENT_NAME TestWorkflows)
1819

20+
o2_add_dpl_workflow(wait-until-possible
21+
SOURCES test/test_WaitUntilPossible.cxx
22+
COMPONENT_NAME TestWorkflows)
23+
1924
o2_add_dpl_workflow(o2rootmessage-workflow
2025
SOURCES "src/test_o2RootMessageWorkflow.cxx"
2126
COMPONENT_NAME TestWorkflows)
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#include "Framework/ConfigParamSpec.h"
12+
#include "Framework/DataTakingContext.h"
13+
#include "Framework/CompletionPolicyHelpers.h"
14+
#include "Framework/DeviceSpec.h"
15+
#include "Framework/RawDeviceService.h"
16+
#include "Framework/ControlService.h"
17+
#include "Framework/Configurable.h"
18+
#include "Framework/RunningWorkflowInfo.h"
19+
#include "Framework/RateLimiter.h"
20+
#include <fairmq/Device.h>
21+
22+
#include <iostream>
23+
#include <chrono>
24+
#include <thread>
25+
#include <vector>
26+
27+
using namespace o2::framework;
28+
29+
#include "Framework/runDataProcessing.h"
30+
31+
// This is how you can define your processing in a declarative way
32+
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
33+
{
34+
DataProcessorSpec a{
35+
.name = "A",
36+
.outputs = {OutputSpec{{"data"}, "TST", "A1", 0}},
37+
.algorithm = AlgorithmSpec{adaptStateless(
38+
[](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context, ProcessingContext& pcx) {
39+
LOG(info) << "Data TST/A1/0 created";
40+
outputs.make<int>(OutputRef{"data"}, 1);
41+
})},
42+
};
43+
DataProcessorSpec b{
44+
.name = "B",
45+
.outputs = {OutputSpec{{"sporadic"}, "TST", "B1", 0, Lifetime::Sporadic}},
46+
.algorithm = AlgorithmSpec{adaptStateless(
47+
[](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context, ProcessingContext& pcx) {
48+
// This will always be late, however since the oldest possible timeframe
49+
// will be used to decide the scheduling, it will not be dropped.
50+
sleep(1);
51+
// We also create it only every second time, so that we can check that
52+
// the sporadic output is not mandatory.
53+
static int i = 0;
54+
if (i++ % 2 == 0) {
55+
LOG(info) << "Data TST/B1/0 created";
56+
outputs.make<int>(OutputRef{"sporadic"}, 1);
57+
}
58+
})},
59+
};
60+
DataProcessorSpec d{
61+
.name = "D",
62+
.inputs = {InputSpec{"a1", "TST", "A1", 0, Lifetime::Timeframe},
63+
InputSpec{"b1", "TST", "B1", 0, Lifetime::Sporadic}},
64+
.algorithm = AlgorithmSpec{adaptStateless(
65+
[](InputRecord& inputs) {
66+
auto refA = inputs.get("a1");
67+
auto headerA = o2::header::get<const DataProcessingHeader*>(refA.header);
68+
LOG(info) << "Start time: " << headerA->startTime;
69+
auto refB = inputs.get("b1");
70+
if (!refB.header) {
71+
LOG(info) << "No sporadic input for start time " << headerA->startTime;
72+
return;
73+
}
74+
auto headerB = o2::header::get<const DataProcessingHeader*>(refB.header);
75+
LOG(info) << "Start time: " << headerB->startTime;
76+
})},
77+
};
78+
79+
return workflow::concat(WorkflowSpec{a},
80+
WorkflowSpec{b},
81+
WorkflowSpec{d});
82+
}

0 commit comments

Comments
 (0)