Skip to content

Commit 3882264

Browse files
committed
DPL: detect when a Lifetime::Timeframe output is missing
1 parent 5182053 commit 3882264

File tree

5 files changed

+102
-0
lines changed

5 files changed

+102
-0
lines changed

Framework/Core/include/Framework/StreamContext.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ struct StreamContext {
6363
// Notice that in such a case all the services will be created upfront, so
6464
// the callback will be called for all of them.
6565
std::vector<ServiceStartStreamHandle> preStartStreamHandles;
66+
67+
// Information on wether or not all the required routes have been created.
68+
// This is used to check if the LifetimeTimeframe routes were all created
69+
// for a given iteration.
70+
// This is in the stream context to allow tracking data creation on a per thread
71+
// basis.
72+
std::vector<bool> routeCreated;
6673
};
6774

6875
} // namespace o2::framework

Framework/Core/src/CommonServices.cxx

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,33 @@ o2::framework::ServiceSpec CommonServices::streamContextSpec()
162162
.uniqueId = simpleServiceId<StreamContext>(),
163163
.init = simpleServiceInit<StreamContext, StreamContext, ServiceKind::Stream>(),
164164
.configure = noConfiguration(),
165+
.preProcessing = [](ProcessingContext& context, void* service) {
166+
auto* stream = (StreamContext*)service;
167+
auto& routes = context.services().get<DeviceSpec const>().outputs;
168+
// Notice I need to do this here, because different invocation for
169+
// the same stream might be referring to different data processors.
170+
// We should probably have a context which is per stream of a specific
171+
// data processor.
172+
stream->routeCreated.resize(routes.size());
173+
// Reset the routeCreated at every processing step
174+
std::fill(stream->routeCreated.begin(), stream->routeCreated.end(), false); },
175+
.postProcessing = [](ProcessingContext& processingContext, void* service) {
176+
auto* stream = (StreamContext*)service;
177+
auto& routes = processingContext.services().get<DeviceSpec const>().outputs;
178+
auto& timeslice = processingContext.services().get<TimingInfo>().timeslice;
179+
for (size_t ri = 0; ri < routes.size(); ++ri) {
180+
if (stream->routeCreated[ri] == true) {
181+
continue;
182+
}
183+
auto &route = routes[ri];
184+
auto &matcher = route.matcher;
185+
if ((timeslice % route.maxTimeslices) != route.timeslice) {
186+
continue;
187+
}
188+
if (matcher.lifetime == Lifetime::Timeframe) {
189+
LOGP(error, "Expected Lifetime::Timeframe data {} was not created for timeslice {} and might result in dropped timeframes", DataSpecUtils::describe(matcher), timeslice);
190+
}
191+
} },
165192
.kind = ServiceKind::Stream};
166193
}
167194

Framework/Core/src/DataAllocator.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "Framework/FairMQResizableBuffer.h"
2020
#include "Framework/DataProcessingContext.h"
2121
#include "Framework/DeviceSpec.h"
22+
#include "Framework/StreamContext.h"
2223
#include "Headers/Stack.h"
2324

2425
#include <fairmq/Device.h>
@@ -47,10 +48,12 @@ DataAllocator::DataAllocator(ServiceRegistryRef contextRegistry)
4748
RouteIndex DataAllocator::matchDataHeader(const Output& spec, size_t timeslice)
4849
{
4950
auto& allowedOutputRoutes = mRegistry.get<DeviceSpec const>().outputs;
51+
auto& stream = mRegistry.get<o2::framework::StreamContext>();
5052
// FIXME: we should take timeframeId into account as well.
5153
for (auto ri = 0; ri < allowedOutputRoutes.size(); ++ri) {
5254
auto& route = allowedOutputRoutes[ri];
5355
if (DataSpecUtils::match(route.matcher, spec.origin, spec.description, spec.subSpec) && ((timeslice % route.maxTimeslices) == route.timeslice)) {
56+
stream.routeCreated[ri] = true;
5457
return RouteIndex{ri};
5558
}
5659
}

Framework/TestWorkflows/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
o2_add_dpl_workflow(dummy-workflow
1313
SOURCES src/o2DummyWorkflow.cxx
1414
COMPONENT_NAME TestWorkflows)
15+
o2_add_dpl_workflow(detect-missing-timeframe
16+
SOURCES test/test_DetectMissingTimeframe.cxx
17+
COMPONENT_NAME TestWorkflows)
1518

1619
o2_add_dpl_workflow(o2rootmessage-workflow
1720
SOURCES "src/test_o2RootMessageWorkflow.cxx"
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
#include "Framework/ConfigParamSpec.h"
12+
#include "Framework/DataTakingContext.h"
13+
#include "Framework/CompletionPolicyHelpers.h"
14+
#include "Framework/DeviceSpec.h"
15+
#include "Framework/RawDeviceService.h"
16+
#include "Framework/ControlService.h"
17+
#include "Framework/Configurable.h"
18+
#include "Framework/RunningWorkflowInfo.h"
19+
#include "Framework/RateLimiter.h"
20+
#include <fairmq/Device.h>
21+
22+
#include <iostream>
23+
#include <chrono>
24+
#include <thread>
25+
#include <vector>
26+
27+
using namespace o2::framework;
28+
29+
#include "Framework/runDataProcessing.h"
30+
31+
// This is how you can define your processing in a declarative way
32+
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
33+
{
34+
DataProcessorSpec a{
35+
.name = "A",
36+
.outputs = {OutputSpec{{"a1"}, "TST", "A1"},
37+
OutputSpec{{"a2"}, "TST", "A2"}},
38+
.algorithm = AlgorithmSpec{adaptStateless(
39+
[](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context, ProcessingContext& pcx) {
40+
outputs.make<int>(OutputRef{"a1"}, 1);
41+
static int i = 0;
42+
outputs.make<int>(OutputRef{"a1"}, 1);
43+
if (i++ % 2 == 0) {
44+
outputs.make<int>(OutputRef{"a2"}, 1);
45+
}
46+
})},
47+
};
48+
DataProcessorSpec d{
49+
.name = "D",
50+
.inputs = {InputSpec{"a1", "TST", "A1"},
51+
InputSpec{"a2", "TST", "A2"}},
52+
.algorithm = AlgorithmSpec{adaptStateless(
53+
[](InputRecord& inputs) {
54+
auto ref = inputs.get("a1");
55+
auto header = o2::header::get<const DataProcessingHeader*>(ref.header);
56+
LOG(info) << "Start time: " << header->startTime;
57+
})},
58+
};
59+
60+
return workflow::concat(WorkflowSpec{a},
61+
WorkflowSpec{d});
62+
}

0 commit comments

Comments
 (0)