diff --git a/Framework/Core/src/CompletionPolicyHelpers.cxx b/Framework/Core/src/CompletionPolicyHelpers.cxx index b3e0621bf984e..ed18d89eea013 100644 --- a/Framework/Core/src/CompletionPolicyHelpers.cxx +++ b/Framework/Core/src/CompletionPolicyHelpers.cxx @@ -15,6 +15,7 @@ #include "Framework/DeviceSpec.h" #include "Framework/CompilerBuiltins.h" #include "Framework/Logger.h" +#include "Framework/TimesliceIndex.h" #include "Framework/TimingInfo.h" #include "DecongestionService.h" @@ -107,10 +108,32 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl { auto callback = [](InputSpan const& inputs, std::vector const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp { assert(inputs.size() == specs.size()); + + size_t si = 0; + bool missingSporadic = false; + size_t currentTimeslice = -1; for (auto& input : inputs) { - if (input.header == nullptr) { + assert(si < specs.size()); + auto& spec = specs[si++]; + if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) { return CompletionPolicy::CompletionOp::Wait; } + if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) { + missingSporadic |= true; + } + if (input.header != nullptr) { + auto const* dph = framework::DataRefUtils::getHeader(input); + if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) { + currentTimeslice = dph->startTime; + break; + } + } + } + // If some sporadic inputs are missing, we wait for them util we are sure they will not come, + // i.e. until the oldest possible timeslice is beyond the timeslice of the input. + auto& timesliceIndex = ref.get(); + if (missingSporadic && currentTimeslice >= timesliceIndex.getOldestPossibleInput().timeslice.value) { + return CompletionPolicy::CompletionOp::Wait; } return CompletionPolicy::CompletionOp::Consume; };