Skip to content

Commit

Permalink
DPL use Signposts to debug consumeWhenAll
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Jan 16, 2024
1 parent 9f4370f commit 0766b8a
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions Framework/Core/src/CompletionPolicyHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
#include "Framework/TimesliceIndex.h"
#include "Framework/TimingInfo.h"
#include "DecongestionService.h"
#include "Framework/Signpost.h"

#include <cassert>
#include <regex>

O2_DECLARE_DYNAMIC_LOG(completion);

namespace o2::framework
{

Expand Down Expand Up @@ -108,6 +111,8 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
{
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
assert(inputs.size() == specs.size());
O2_SIGNPOST_ID_GENERATE(sid, completion);
O2_SIGNPOST_START(completion, sid, "consumeWhenAll", "Completion policy invoked");

size_t si = 0;
bool missingSporadic = false;
Expand All @@ -116,15 +121,18 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
assert(si < specs.size());
auto& spec = specs[si++];
if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) {
O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s due to missing input %lu", "Wait", si);
return CompletionPolicy::CompletionOp::Wait;
}
if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) {
O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "Missing sporadic found for route index %lu", si);
missingSporadic = true;
}
if (input.header != nullptr && currentTimeslice == -1) {
auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
currentTimeslice = dph->startTime;
O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "currentTimeslice %lu from route index %lu", currentTimeslice, si);
}
}
}
Expand All @@ -134,8 +142,10 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
auto oldestPossibleTimeslice = timesliceIndex.getOldestPossibleInput().timeslice.value;

if (missingSporadic && currentTimeslice >= oldestPossibleTimeslice) {
O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu", "Retry", currentTimeslice, oldestPossibleTimeslice);
return CompletionPolicy::CompletionOp::Retry;
}
O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu <= oldestPossibleTimeslice %lu", "Consume", currentTimeslice, oldestPossibleTimeslice);
return CompletionPolicy::CompletionOp::Consume;
};
return CompletionPolicy{name, matcher, callback};
Expand Down

0 comments on commit 0766b8a

Please sign in to comment.