From 8642aeeac6d26535a84e2dd101a0bd148e5f417c Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 13 Dec 2023 21:45:27 +0100 Subject: [PATCH] DPL: wait as long as possible for Sporadic inputs Right now if we have the standard consumeWhenAll policy and we have a sporadic input, it will wait indefinitely until all the inputs arrive or it will drop timeframes without that sporadic input. This changes the behavior and waits only until the oldest possible timeframe does not allow the Sporadic input to be there. At which point, it schedules the processing in any case, under the assumption that a task declaring a sporadic input knows what to do in case it's not there. --- .../Core/src/CompletionPolicyHelpers.cxx | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/Framework/Core/src/CompletionPolicyHelpers.cxx b/Framework/Core/src/CompletionPolicyHelpers.cxx index b3e0621bf984e..ed18d89eea013 100644 --- a/Framework/Core/src/CompletionPolicyHelpers.cxx +++ b/Framework/Core/src/CompletionPolicyHelpers.cxx @@ -15,6 +15,7 @@ #include "Framework/DeviceSpec.h" #include "Framework/CompilerBuiltins.h" #include "Framework/Logger.h" +#include "Framework/TimesliceIndex.h" #include "Framework/TimingInfo.h" #include "DecongestionService.h" @@ -107,10 +108,32 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl { auto callback = [](InputSpan const& inputs, std::vector const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp { assert(inputs.size() == specs.size()); + + size_t si = 0; + bool missingSporadic = false; + size_t currentTimeslice = -1; for (auto& input : inputs) { - if (input.header == nullptr) { + assert(si < specs.size()); + auto& spec = specs[si++]; + if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) { return CompletionPolicy::CompletionOp::Wait; } + if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) { + missingSporadic |= true; + } + if (input.header != nullptr) { + auto const* dph = framework::DataRefUtils::getHeader(input); + if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) { + currentTimeslice = dph->startTime; + break; + } + } + } + // If some sporadic inputs are missing, we wait for them util we are sure they will not come, + // i.e. until the oldest possible timeslice is beyond the timeslice of the input. + auto& timesliceIndex = ref.get(); + if (missingSporadic && currentTimeslice >= timesliceIndex.getOldestPossibleInput().timeslice.value) { + return CompletionPolicy::CompletionOp::Wait; } return CompletionPolicy::CompletionOp::Consume; };