From f22bacf11e8517a7edd7db336488d00a3f8bf3cb Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 13 Nov 2024 14:06:56 +0100 Subject: [PATCH] DPL: move writers to plugin This will allow us at some point to remove the dependency on TTree in the framework, hopefully saving memory and allowing us to more easily customize the writing backend. --- Framework/AnalysisSupport/CMakeLists.txt | 1 + .../AnalysisSupport/src/AODWriterHelpers.cxx | 414 +++++++++++++++ .../AnalysisSupport/src/AODWriterHelpers.h | 28 + Framework/AnalysisSupport/src/Plugin.cxx | 17 + .../Core/include/Framework/AnalysisContext.h | 58 +++ .../Framework}/AnalysisSupportHelpers.h | 40 +- .../Core/include/Framework/ConfigContext.h | 15 +- .../include/Framework/runDataProcessing.h | 4 +- Framework/Core/src/AnalysisSupportHelpers.cxx | 484 ++++-------------- Framework/Core/src/ArrowSupport.cxx | 10 +- Framework/Core/src/ConfigContext.cxx | 3 + Framework/Core/src/WorkflowHelpers.cxx | 149 +----- Framework/Core/src/WorkflowHelpers.h | 4 +- Framework/Core/test/Mocking.h | 3 +- .../Core/test/benchmark_WorkflowHelpers.cxx | 3 +- Framework/Core/test/test_OverrideLabels.cxx | 3 +- .../TestWorkflows/src/o2TestHistograms.cxx | 5 +- 17 files changed, 688 insertions(+), 553 deletions(-) create mode 100644 Framework/AnalysisSupport/src/AODWriterHelpers.cxx create mode 100644 Framework/AnalysisSupport/src/AODWriterHelpers.h create mode 100644 Framework/Core/include/Framework/AnalysisContext.h rename Framework/Core/{src => include/Framework}/AnalysisSupportHelpers.h (71%) diff --git a/Framework/AnalysisSupport/CMakeLists.txt b/Framework/AnalysisSupport/CMakeLists.txt index eb5706817704b..5fb1282469711 100644 --- a/Framework/AnalysisSupport/CMakeLists.txt +++ b/Framework/AnalysisSupport/CMakeLists.txt @@ -20,6 +20,7 @@ o2_add_library(FrameworkAnalysisSupport SOURCES src/Plugin.cxx src/DataInputDirector.cxx src/AODJAlienReaderHelpers.cxx + src/AODWriterHelpers.cxx PRIVATE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_LIST_DIR}/src PUBLIC_LINK_LIBRARIES O2::Framework ${EXTRA_TARGETS} ROOT::TreePlayer) diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.cxx b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx new file mode 100644 index 0000000000000..fa10d4661f537 --- /dev/null +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.cxx @@ -0,0 +1,414 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/AnalysisContext.h" +#include "Framework/ConfigContext.h" +#include "Framework/ControlService.h" +#include "AODWriterHelpers.h" +#include "Framework/OutputObjHeader.h" +#include "Framework/EndOfStreamContext.h" +#include "Framework/ProcessingContext.h" +#include "Framework/InitContext.h" +#include "Framework/CallbackService.h" +#include "Framework/AnalysisSupportHelpers.h" +#include "Framework/TableConsumer.h" +#include "Framework/DataOutputDirector.h" +#include "Framework/TableTreeHelpers.h" + +#include +#include +#include +#include +#include +#include + +namespace o2::framework::writers +{ + +struct InputObjectRoute { + std::string name; + uint32_t uniqueId; + std::string directory; + uint32_t taskHash; + OutputObjHandlingPolicy policy; + OutputObjSourceType sourceType; +}; + +struct InputObject { + TClass* kind = nullptr; + void* obj = nullptr; + std::string name; + int count = -1; +}; + +const static std::unordered_map ROOTfileNames = {{OutputObjHandlingPolicy::AnalysisObject, "AnalysisResults.root"}, + {OutputObjHandlingPolicy::QAObject, "QAResults.root"}}; + +AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx) +{ + auto& ac = ctx.services().get(); + auto dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); + int compressionLevel = 505; + if (ctx.options().hasOption("aod-writer-compression")) { + compressionLevel = ctx.options().get("aod-writer-compression"); + } + return AlgorithmSpec{[dod, outputInputs = ac.outputsInputsAOD, compressionLevel](InitContext& ic) -> std::function { + LOGP(debug, "======== getGlobalAODSink::Init =========="); + + // find out if any table needs to be saved + bool hasOutputsToWrite = false; + for (auto& outobj : outputInputs) { + auto ds = dod->getDataOutputDescriptors(outobj); + if (ds.size() > 0) { + hasOutputsToWrite = true; + break; + } + } + + // if nothing needs to be saved then return a trivial functor + // this happens when nothing needs to be saved but there are dangling outputs + if (!hasOutputsToWrite) { + return [](ProcessingContext&) mutable -> void { + static bool once = false; + if (!once) { + LOG(info) << "No AODs to be saved."; + once = true; + } + }; + } + + // end of data functor is called at the end of the data stream + auto endofdatacb = [dod](EndOfStreamContext& context) { + dod->closeDataFiles(); + context.services().get().readyToQuit(QuitRequest::Me); + }; + + auto& callbacks = ic.services().get(); + callbacks.set(endofdatacb); + + // prepare map(startTime, tfNumber) + std::map tfNumbers; + std::map tfFilenames; + + std::vector aodMetaDataKeys; + std::vector aodMetaDataVals; + + // this functor is called once per time frame + return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void { + LOGP(debug, "======== getGlobalAODSink::processing =========="); + LOGP(debug, " processing data set with {} entries", pc.inputs().size()); + + // return immediately if pc.inputs() is empty. This should never happen! + if (pc.inputs().size() == 0) { + LOGP(info, "No inputs available!"); + return; + } + + // update tfNumbers + uint64_t startTime = 0; + uint64_t tfNumber = 0; + auto ref = pc.inputs().get("tfn"); + if (ref.spec && ref.payload) { + startTime = DataRefUtils::getHeader(ref)->startTime; + tfNumber = pc.inputs().get("tfn"); + tfNumbers.insert(std::pair(startTime, tfNumber)); + } + // update tfFilenames + std::string aodInputFile; + auto ref2 = pc.inputs().get("tff"); + if (ref2.spec && ref2.payload) { + startTime = DataRefUtils::getHeader(ref2)->startTime; + aodInputFile = pc.inputs().get("tff"); + tfFilenames.insert(std::pair(startTime, aodInputFile)); + } + + // close all output files if one has reached size limit + dod->checkFileSizes(); + + // loop over the DataRefs which are contained in pc.inputs() + for (const auto& ref : pc.inputs()) { + if (!ref.spec) { + LOGP(debug, "Invalid input will be skipped!"); + continue; + } + + // get metadata + if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataKeys"))) { + aodMetaDataKeys = pc.inputs().get>(ref.spec->binding); + } + if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataVals"))) { + aodMetaDataVals = pc.inputs().get>(ref.spec->binding); + } + + // skip non-AOD refs + if (!DataSpecUtils::partialMatch(*ref.spec, writableAODOrigins)) { + continue; + } + startTime = DataRefUtils::getHeader(ref)->startTime; + + // does this need to be saved? + auto dh = DataRefUtils::getHeader(ref); + auto tableName = dh->dataDescription.as(); + auto ds = dod->getDataOutputDescriptors(*dh); + if (ds.empty()) { + continue; + } + + // get TF number from startTime + auto it = tfNumbers.find(startTime); + if (it != tfNumbers.end()) { + tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge(); + } else { + LOGP(fatal, "No time frame number found for output with start time {}", startTime); + throw std::runtime_error("Processing is stopped!"); + } + // get aod input file from startTime + auto it2 = tfFilenames.find(startTime); + if (it2 != tfFilenames.end()) { + aodInputFile = it2->second; + } + + // get the TableConsumer and corresponding arrow table + auto msg = pc.inputs().get(ref.spec->binding); + if (msg.header == nullptr) { + LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec)); + continue; + } + auto s = pc.inputs().get(ref.spec->binding); + auto table = s->asArrowTable(); + if (!table->Validate().ok()) { + LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName); + continue; + } + if (table->schema()->fields().empty()) { + LOGP(debug, "The table \"{}\" is empty but will be saved anyway!", tableName); + } + + // loop over all DataOutputDescriptors + // a table can be saved in multiple ways + // e.g. different selections of columns to different files + for (auto d : ds) { + auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel); + auto treename = fileAndFolder.folderName + "/" + d->treename; + TableToTree ta2tr(table, + fileAndFolder.file, + treename.c_str()); + + // update metadata + if (fileAndFolder.file->FindObjectAny("metaData")) { + LOGF(debug, "Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName()); + } else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) { + TMap aodMetaDataMap; + for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) { + aodMetaDataMap.Add(new TObjString(aodMetaDataKeys[imd]), new TObjString(aodMetaDataVals[imd])); + } + fileAndFolder.file->WriteObject(&aodMetaDataMap, "metaData", "Overwrite"); + } + + if (!d->colnames.empty()) { + for (auto& cn : d->colnames) { + auto idx = table->schema()->GetFieldIndex(cn); + auto col = table->column(idx); + auto field = table->schema()->field(idx); + if (idx != -1) { + ta2tr.addBranch(col, field); + } + } + } else { + ta2tr.addAllBranches(); + } + ta2tr.process(); + } + } + }; + } + + }; +} + +AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx) +{ + auto& ac = ctx.services().get(); + auto tskmap = ac.outTskMap; + auto objmap = ac.outObjHistMap; + + return AlgorithmSpec{[objmap, tskmap](InitContext& ic) -> std::function { + auto& callbacks = ic.services().get(); + auto inputObjects = std::make_shared>>(); + + static TFile* f[OutputObjHandlingPolicy::numPolicies]; + for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { + f[i] = nullptr; + } + + static std::string currentDirectory = ""; + static std::string currentFile = ""; + + auto endofdatacb = [inputObjects](EndOfStreamContext& context) { + LOG(debug) << "Writing merged objects and histograms to file"; + if (inputObjects->empty()) { + LOG(error) << "Output object map is empty!"; + context.services().get().readyToQuit(QuitRequest::Me); + return; + } + for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { + if (f[i] != nullptr) { + f[i]->Close(); + } + } + LOG(debug) << "All outputs merged in their respective target files"; + context.services().get().readyToQuit(QuitRequest::Me); + }; + + callbacks.set(endofdatacb); + return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void { + auto const& ref = pc.inputs().get("x"); + if (!ref.header) { + LOG(error) << "Header not found"; + return; + } + if (!ref.payload) { + LOG(error) << "Payload not found"; + return; + } + auto datah = o2::header::get(ref.header); + if (!datah) { + LOG(error) << "No data header in stack"; + return; + } + + auto objh = o2::header::get(ref.header); + if (!objh) { + LOG(error) << "No output object header in stack"; + return; + } + + InputObject obj; + FairInputTBuffer tm(const_cast(ref.payload), static_cast(datah->payloadSize)); + tm.InitMap(); + obj.kind = tm.ReadClass(); + tm.SetBufferOffset(0); + tm.ResetMap(); + if (obj.kind == nullptr) { + LOG(error) << "Cannot read class info from buffer."; + return; + } + + auto policy = objh->mPolicy; + auto sourceType = objh->mSourceType; + auto hash = objh->mTaskHash; + + obj.obj = tm.ReadObjectAny(obj.kind); + auto* named = static_cast(obj.obj); + obj.name = named->GetName(); + auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; }); + if (hpos == tskmap.end()) { + LOG(error) << "No task found for hash " << hash; + return; + } + auto taskname = hpos->name; + auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; }); + if (opos == objmap.end()) { + LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")"; + return; + } + auto objects = opos->bindings; + if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) { + LOG(error) << "No object " << obj.name << " in map for task " << taskname; + return; + } + auto nameHash = runtime_hash(obj.name.c_str()); + InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType}; + auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); }); + // If it's the first one, we just add it to the list. + if (existing == inputObjects->end()) { + obj.count = objh->mPipelineSize; + inputObjects->push_back(std::make_pair(key, obj)); + existing = inputObjects->end() - 1; + } else { + obj.count = existing->second.count; + // Otherwise, we merge it with the existing one. + auto merger = existing->second.kind->GetMerge(); + if (!merger) { + LOG(error) << "Already one unmergeable object found for " << obj.name; + return; + } + TList coll; + coll.Add(static_cast(obj.obj)); + merger(existing->second.obj, &coll, nullptr); + } + // We expect as many objects as the pipeline size, for + // a given object name and task hash. + existing->second.count -= 1; + + if (existing->second.count != 0) { + return; + } + // Write the object here. + auto route = existing->first; + auto entry = existing->second; + auto file = ROOTfileNames.find(route.policy); + if (file == ROOTfileNames.end()) { + return; + } + auto filename = file->second; + if (f[route.policy] == nullptr) { + f[route.policy] = TFile::Open(filename.c_str(), "RECREATE"); + } + auto nextDirectory = route.directory; + if ((nextDirectory != currentDirectory) || (filename != currentFile)) { + if (!f[route.policy]->FindKey(nextDirectory.c_str())) { + f[route.policy]->mkdir(nextDirectory.c_str()); + } + currentDirectory = nextDirectory; + currentFile = filename; + } + + // translate the list-structure created by the registry into a directory structure within the file + std::function writeListToFile; + writeListToFile = [&](TList* list, TDirectory* parentDir) { + TIter next(list); + TObject* object = nullptr; + while ((object = next())) { + if (object->InheritsFrom(TList::Class())) { + writeListToFile(static_cast(object), parentDir->mkdir(object->GetName(), object->GetName(), true)); + } else { + parentDir->WriteObjectAny(object, object->Class(), object->GetName()); + auto* written = list->Remove(object); + delete written; + } + } + }; + + TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str()); + if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) { + auto* outputList = static_cast(entry.obj); + outputList->SetOwner(false); + + // if registry should live in dedicated folder a TNamed object is appended to the list + if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) { + delete outputList->Last(); + outputList->RemoveLast(); + currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true); + } + + writeListToFile(outputList, currentDir); + outputList->SetOwner(); + delete outputList; + entry.obj = nullptr; + } else { + currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str()); + delete (TObject*)entry.obj; + entry.obj = nullptr; + } + }; + }}; +} +} // namespace o2::framework::writers diff --git a/Framework/AnalysisSupport/src/AODWriterHelpers.h b/Framework/AnalysisSupport/src/AODWriterHelpers.h new file mode 100644 index 0000000000000..7ae59a5cf3b01 --- /dev/null +++ b/Framework/AnalysisSupport/src/AODWriterHelpers.h @@ -0,0 +1,28 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef O2_FRAMEWORK_AODROOTWRITERHELPERS_H_ +#define O2_FRAMEWORK_AODROOTWRITERHELPERS_H_ + +#include "Framework/AlgorithmSpec.h" +#include + +namespace o2::framework::writers +{ + +struct AODWriterHelpers { + static AlgorithmSpec getOutputObjHistWriter(ConfigContext const& context); + static AlgorithmSpec getOutputTTreeWriter(ConfigContext const& context); +}; + +} // namespace o2::framework::writers + +#endif // O2_FRAMEWORK_AODROOTWRITERHELPERS_H_ diff --git a/Framework/AnalysisSupport/src/Plugin.cxx b/Framework/AnalysisSupport/src/Plugin.cxx index bba3499286e08..52435375d7e9e 100644 --- a/Framework/AnalysisSupport/src/Plugin.cxx +++ b/Framework/AnalysisSupport/src/Plugin.cxx @@ -16,6 +16,7 @@ #include "Framework/Capability.h" #include "Framework/Signpost.h" #include "AODJAlienReaderHelpers.h" +#include "AODWriterHelpers.h" #include #include #include @@ -33,6 +34,20 @@ struct ROOTFileReader : o2::framework::AlgorithmPlugin { } }; +struct ROOTObjWriter : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override + { + return o2::framework::writers::AODWriterHelpers::getOutputObjHistWriter(config); + } +}; + +struct ROOTTTreeWriter : o2::framework::AlgorithmPlugin { + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override + { + return o2::framework::writers::AODWriterHelpers::getOutputTTreeWriter(config); + } +}; + using namespace o2::framework; struct RunSummary : o2::framework::ServicePlugin { o2::framework::ServiceSpec* create() final @@ -211,6 +226,8 @@ struct DiscoverMetadataInAOD : o2::framework::ConfigDiscoveryPlugin { DEFINE_DPL_PLUGINS_BEGIN DEFINE_DPL_PLUGIN_INSTANCE(ROOTFileReader, CustomAlgorithm); +DEFINE_DPL_PLUGIN_INSTANCE(ROOTObjWriter, CustomAlgorithm); +DEFINE_DPL_PLUGIN_INSTANCE(ROOTTTreeWriter, CustomAlgorithm); DEFINE_DPL_PLUGIN_INSTANCE(RunSummary, CustomService); DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInAOD, ConfigDiscovery); DEFINE_DPL_PLUGINS_END diff --git a/Framework/Core/include/Framework/AnalysisContext.h b/Framework/Core/include/Framework/AnalysisContext.h new file mode 100644 index 0000000000000..0f62f952d0aaa --- /dev/null +++ b/Framework/Core/include/Framework/AnalysisContext.h @@ -0,0 +1,58 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_ANALYSISCONTEXT_H_ +#define O2_FRAMEWORK_ANALYSISCONTEXT_H_ + +#include +#include "Framework/InputSpec.h" +#include "Framework/OutputSpec.h" + +namespace o2::framework +{ +class DataOutputDirector; + +struct OutputTaskInfo { + uint32_t id; + std::string name; +}; + +struct OutputObjectInfo { + uint32_t id; + std::vector bindings; +}; + +// +struct AnalysisContext { + std::vector requestedAODs; + std::vector providedAODs; + std::vector requestedDYNs; + std::vector providedDYNs; + std::vector requestedIDXs; + std::vector providedOutputObjHist; + std::vector spawnerInputs; + + // Needed to created the hist writer + std::vector outTskMap; + std::vector outObjHistMap; + + // Needed to create the output director + std::vector outputsInputs; + std::vector isDangling; + + // Needed to create the aod writer + std::vector outputsInputsAOD; +}; +} // namespace o2::framework + +extern template class std::vector; +extern template class std::vector; + +#endif // O2_FRAMEWORK_ANALYSISCONTEXT_H_ diff --git a/Framework/Core/src/AnalysisSupportHelpers.h b/Framework/Core/include/Framework/AnalysisSupportHelpers.h similarity index 71% rename from Framework/Core/src/AnalysisSupportHelpers.h rename to Framework/Core/include/Framework/AnalysisSupportHelpers.h index ba5bcedb4bc67..4ae601dc9e4a2 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.h +++ b/Framework/Core/include/Framework/AnalysisSupportHelpers.h @@ -14,6 +14,7 @@ #include "Framework/OutputSpec.h" #include "Framework/InputSpec.h" #include "Framework/DataProcessorSpec.h" +#include "Framework/AnalysisContext.h" #include "Headers/DataHeader.h" #include @@ -24,36 +25,7 @@ static constexpr std::array extendedAODOrigins{header::Da static constexpr std::array writableAODOrigins{header::DataOrigin{"AOD"}, header::DataOrigin{"AOD1"}, header::DataOrigin{"AOD2"}, header::DataOrigin{"DYN"}}; class DataOutputDirector; - -struct OutputTaskInfo { - uint32_t id; - std::string name; -}; - -struct OutputObjectInfo { - uint32_t id; - std::vector bindings; -}; -} // namespace o2::framework - -extern template class std::vector; -extern template class std::vector; - -namespace o2::framework -{ -// -struct AnalysisContext { - std::vector requestedAODs; - std::vector providedAODs; - std::vector requestedDYNs; - std::vector providedDYNs; - std::vector requestedIDXs; - std::vector providedOutputObjHist; - std::vector spawnerInputs; - - std::vector outTskMap; - std::vector outObjHistMap; -}; +class ConfigContext; // Helper class to be moved in the AnalysisSupport plugin at some point struct AnalysisSupportHelpers { @@ -74,11 +46,11 @@ struct AnalysisSupportHelpers { /// Match all inputs of kind ATSK and write them to a ROOT file, /// one root file per originating task. - static DataProcessorSpec getOutputObjHistSink(std::vector const& objmap, - std::vector const& tskmap); + static DataProcessorSpec getOutputObjHistSink(ConfigContext const&); /// writes inputs of kind AOD to file - static DataProcessorSpec getGlobalAODSink(std::shared_ptr dod, - std::vector const& outputInputs, int compression); + static DataProcessorSpec getGlobalAODSink(ConfigContext const&); + /// Get the data director + static std::shared_ptr getDataOutputDirector(ConfigContext const& ctx); }; }; // namespace o2::framework diff --git a/Framework/Core/include/Framework/ConfigContext.h b/Framework/Core/include/Framework/ConfigContext.h index 5790699fe68bb..87259f0519915 100644 --- a/Framework/Core/include/Framework/ConfigContext.h +++ b/Framework/Core/include/Framework/ConfigContext.h @@ -8,11 +8,11 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_CONFIG_CONTEXT_H -#define FRAMEWORK_CONFIG_CONTEXT_H +#ifndef O2_FRAMEWORK_CONFIG_CONTEXT_H_ +#define O2_FRAMEWORK_CONFIG_CONTEXT_H_ #include "Framework/ConfigParamRegistry.h" -#include "Framework/ServiceRegistry.h" +#include "Framework/ServiceRegistryRef.h" namespace o2::framework { @@ -23,9 +23,10 @@ namespace o2::framework class ConfigContext { public: - ConfigContext(ConfigParamRegistry& options, int argc, char** argv) : mOptions{options}, mArgc{argc}, mArgv{argv} {} + ConfigContext(ConfigParamRegistry& options, ServiceRegistryRef services, int argc, char** argv); [[nodiscard]] ConfigParamRegistry& options() const { return mOptions; } + [[nodiscard]] ServiceRegistryRef services() const { return mServices; } [[nodiscard]] bool helpOnCommandLine() const; @@ -34,11 +35,13 @@ class ConfigContext private: ConfigParamRegistry& mOptions; + + ServiceRegistryRef mServices; // additionaly keep information about the original command line int mArgc = 0; char** mArgv = nullptr; }; -} // namespace o2 +} // namespace o2::framework -#endif +#endif // O2_FRAMEWORK_CONFIG_CONTEXT_H_ diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index eee4c4b6583d3..8293bf0cf7039 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -30,6 +30,7 @@ #include "Framework/CheckTypes.h" #include "Framework/StructToTuple.h" #include "Framework/ConfigParamDiscovery.h" +#include "ServiceRegistryRef.h" #include namespace o2::framework @@ -198,7 +199,8 @@ int mainNoCatch(int argc, char** argv) workflowOptions.push_back(extra); } - ConfigContext configContext(workflowOptionsRegistry, argc, argv); + ServiceRegistry configRegistry; + ConfigContext configContext(workflowOptionsRegistry, ServiceRegistryRef{configRegistry}, argc, argv); o2::framework::WorkflowSpec specs = defineDataProcessing(configContext); overrideCloning(configContext, specs); overridePipeline(configContext, specs); diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index e949f27a6eed6..eb17566fd6d31 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -9,18 +9,16 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#include "AnalysisSupportHelpers.h" +#include "Framework/AnalysisSupportHelpers.h" #include "Framework/DataOutputDirector.h" #include "Framework/OutputObjHeader.h" #include "Framework/ControlService.h" #include "Framework/EndOfStreamContext.h" #include "Framework/DeviceSpec.h" #include "Framework/TableTreeHelpers.h" - -#include "TFile.h" -#include "TTree.h" -#include "TMap.h" -#include "TObjString.h" +#include "Framework/PluginManager.h" +#include "Framework/ConfigContext.h" +#include "WorkflowHelpers.h" template class std::vector; template class std::vector; @@ -28,21 +26,105 @@ template class std::vector; namespace o2::framework { -struct InputObjectRoute { - std::string name; - uint32_t uniqueId; - std::string directory; - uint32_t taskHash; - OutputObjHandlingPolicy policy; - OutputObjSourceType sourceType; -}; +std::shared_ptr AnalysisSupportHelpers::getDataOutputDirector(ConfigContext const& ctx) +{ + auto const& options = ctx.options(); + auto const& OutputsInputs = ctx.services().get().outputsInputs; + auto const& isDangling = ctx.services().get().isDangling; + + std::shared_ptr dod = std::make_shared(); + + // analyze options and take actions accordingly + // default values + std::string rdn, resdir("./"); + std::string fnb, fnbase("AnalysisResults_trees"); + float mfs, maxfilesize(-1.); + std::string fmo, filemode("RECREATE"); + int ntfm, ntfmerge = 1; + + // values from json + if (options.isSet("aod-writer-json")) { + auto fnjson = options.get("aod-writer-json"); + if (!fnjson.empty()) { + std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson); + if (!rdn.empty()) { + resdir = rdn; + } + if (!fnb.empty()) { + fnbase = fnb; + } + if (!fmo.empty()) { + filemode = fmo; + } + if (mfs > 0.) { + maxfilesize = mfs; + } + if (ntfm > 0) { + ntfmerge = ntfm; + } + } + } + + // values from command line options, information from json is overwritten + if (options.isSet("aod-writer-resdir")) { + rdn = options.get("aod-writer-resdir"); + if (!rdn.empty()) { + resdir = rdn; + } + } + if (options.isSet("aod-writer-resfile")) { + fnb = options.get("aod-writer-resfile"); + if (!fnb.empty()) { + fnbase = fnb; + } + } + if (options.isSet("aod-writer-resmode")) { + fmo = options.get("aod-writer-resmode"); + if (!fmo.empty()) { + filemode = fmo; + } + } + if (options.isSet("aod-writer-maxfilesize")) { + mfs = options.get("aod-writer-maxfilesize"); + if (mfs > 0) { + maxfilesize = mfs; + } + } + if (options.isSet("aod-writer-ntfmerge")) { + ntfm = options.get("aod-writer-ntfmerge"); + if (ntfm > 0) { + ntfmerge = ntfm; + } + } + // parse the keepString + if (options.isSet("aod-writer-keep")) { + auto keepString = options.get("aod-writer-keep"); + if (!keepString.empty()) { + dod->reset(); + std::string d("dangling"); + if (d.find(keepString) == 0) { + // use the dangling outputs + std::vector danglingOutputs; + for (auto ii = 0u; ii < OutputsInputs.size(); ii++) { + if (DataSpecUtils::partialMatch(OutputsInputs[ii], writableAODOrigins) && isDangling[ii]) { + danglingOutputs.emplace_back(OutputsInputs[ii]); + } + } + dod->readSpecs(danglingOutputs); + } else { + // use the keep string + dod->readString(keepString); + } + } + } + dod->setResultDir(resdir); + dod->setFilenameBase(fnbase); + dod->setFileMode(filemode); + dod->setMaximumFileSize(maxfilesize); + dod->setNumberTimeFramesToMerge(ntfmerge); -struct InputObject { - TClass* kind = nullptr; - void* obj = nullptr; - std::string name; - int count = -1; -}; + return dod; +} void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector const& providedOutputs, std::vector const& requestedInputs, @@ -125,191 +207,16 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c } } -const static std::unordered_map ROOTfileNames = {{OutputObjHandlingPolicy::AnalysisObject, "AnalysisResults.root"}, - {OutputObjHandlingPolicy::QAObject, "QAResults.root"}}; - // ============================================================================= -DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(std::vector const& objmap, std::vector const& tskmap) +DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(ConfigContext const& ctx) { - auto writerFunction = [objmap, tskmap](InitContext& ic) -> std::function { - auto& callbacks = ic.services().get(); - auto inputObjects = std::make_shared>>(); - - static TFile* f[OutputObjHandlingPolicy::numPolicies]; - for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { - f[i] = nullptr; - } - - static std::string currentDirectory = ""; - static std::string currentFile = ""; - - auto endofdatacb = [inputObjects](EndOfStreamContext& context) { - LOG(debug) << "Writing merged objects and histograms to file"; - if (inputObjects->empty()) { - LOG(error) << "Output object map is empty!"; - context.services().get().readyToQuit(QuitRequest::Me); - return; - } - for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) { - if (f[i] != nullptr) { - f[i]->Close(); - } - } - LOG(debug) << "All outputs merged in their respective target files"; - context.services().get().readyToQuit(QuitRequest::Me); - }; - - callbacks.set(endofdatacb); - return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void { - auto const& ref = pc.inputs().get("x"); - if (!ref.header) { - LOG(error) << "Header not found"; - return; - } - if (!ref.payload) { - LOG(error) << "Payload not found"; - return; - } - auto datah = o2::header::get(ref.header); - if (!datah) { - LOG(error) << "No data header in stack"; - return; - } - - auto objh = o2::header::get(ref.header); - if (!objh) { - LOG(error) << "No output object header in stack"; - return; - } - - InputObject obj; - FairInputTBuffer tm(const_cast(ref.payload), static_cast(datah->payloadSize)); - tm.InitMap(); - obj.kind = tm.ReadClass(); - tm.SetBufferOffset(0); - tm.ResetMap(); - if (obj.kind == nullptr) { - LOG(error) << "Cannot read class info from buffer."; - return; - } - - auto policy = objh->mPolicy; - auto sourceType = objh->mSourceType; - auto hash = objh->mTaskHash; - - obj.obj = tm.ReadObjectAny(obj.kind); - auto* named = static_cast(obj.obj); - obj.name = named->GetName(); - auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; }); - if (hpos == tskmap.end()) { - LOG(error) << "No task found for hash " << hash; - return; - } - auto taskname = hpos->name; - auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; }); - if (opos == objmap.end()) { - LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")"; - return; - } - auto objects = opos->bindings; - if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) { - LOG(error) << "No object " << obj.name << " in map for task " << taskname; - return; - } - auto nameHash = runtime_hash(obj.name.c_str()); - InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType}; - auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); }); - // If it's the first one, we just add it to the list. - if (existing == inputObjects->end()) { - obj.count = objh->mPipelineSize; - inputObjects->push_back(std::make_pair(key, obj)); - existing = inputObjects->end() - 1; - } else { - obj.count = existing->second.count; - // Otherwise, we merge it with the existing one. - auto merger = existing->second.kind->GetMerge(); - if (!merger) { - LOG(error) << "Already one unmergeable object found for " << obj.name; - return; - } - TList coll; - coll.Add(static_cast(obj.obj)); - merger(existing->second.obj, &coll, nullptr); - } - // We expect as many objects as the pipeline size, for - // a given object name and task hash. - existing->second.count -= 1; - - if (existing->second.count != 0) { - return; - } - // Write the object here. - auto route = existing->first; - auto entry = existing->second; - auto file = ROOTfileNames.find(route.policy); - if (file == ROOTfileNames.end()) { - return; - } - auto filename = file->second; - if (f[route.policy] == nullptr) { - f[route.policy] = TFile::Open(filename.c_str(), "RECREATE"); - } - auto nextDirectory = route.directory; - if ((nextDirectory != currentDirectory) || (filename != currentFile)) { - if (!f[route.policy]->FindKey(nextDirectory.c_str())) { - f[route.policy]->mkdir(nextDirectory.c_str()); - } - currentDirectory = nextDirectory; - currentFile = filename; - } - - // translate the list-structure created by the registry into a directory structure within the file - std::function writeListToFile; - writeListToFile = [&](TList* list, TDirectory* parentDir) { - TIter next(list); - TObject* object = nullptr; - while ((object = next())) { - if (object->InheritsFrom(TList::Class())) { - writeListToFile(static_cast(object), parentDir->mkdir(object->GetName(), object->GetName(), true)); - } else { - parentDir->WriteObjectAny(object, object->Class(), object->GetName()); - auto* written = list->Remove(object); - delete written; - } - } - }; - - TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str()); - if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) { - auto* outputList = static_cast(entry.obj); - outputList->SetOwner(false); - - // if registry should live in dedicated folder a TNamed object is appended to the list - if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) { - delete outputList->Last(); - outputList->RemoveLast(); - currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true); - } - - writeListToFile(outputList, currentDir); - outputList->SetOwner(); - delete outputList; - entry.obj = nullptr; - } else { - currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str()); - delete (TObject*)entry.obj; - entry.obj = nullptr; - } - }; - }; - - char const* name = "internal-dpl-aod-global-analysis-file-sink"; // Lifetime is sporadic because we do not ask each analysis task to send its // results every timeframe. DataProcessorSpec spec{ - .name = name, + .name = "internal-dpl-aod-global-analysis-file-sink", .inputs = {InputSpec("x", DataSpecUtils::dataDescriptorMatcherFrom(header::DataOrigin{"ATSK"}), Lifetime::Sporadic)}, - .algorithm = {writerFunction}, + .outputs = {}, + .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTObjWriter", ctx), }; return spec; @@ -317,188 +224,17 @@ DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(std::vector dod, - std::vector const& outputInputs, int compressionLevel) + AnalysisSupportHelpers::getGlobalAODSink(ConfigContext const& ctx) { - - auto writerFunction = [dod, outputInputs, compressionLevel](InitContext& ic) -> std::function { - LOGP(debug, "======== getGlobalAODSink::Init =========="); - - // find out if any table needs to be saved - bool hasOutputsToWrite = false; - for (auto& outobj : outputInputs) { - auto ds = dod->getDataOutputDescriptors(outobj); - if (ds.size() > 0) { - hasOutputsToWrite = true; - break; - } - } - - // if nothing needs to be saved then return a trivial functor - // this happens when nothing needs to be saved but there are dangling outputs - if (!hasOutputsToWrite) { - return [](ProcessingContext&) mutable -> void { - static bool once = false; - if (!once) { - LOG(info) << "No AODs to be saved."; - once = true; - } - }; - } - - // end of data functor is called at the end of the data stream - auto endofdatacb = [dod](EndOfStreamContext& context) { - dod->closeDataFiles(); - context.services().get().readyToQuit(QuitRequest::Me); - }; - - auto& callbacks = ic.services().get(); - callbacks.set(endofdatacb); - - // prepare map(startTime, tfNumber) - std::map tfNumbers; - std::map tfFilenames; - - std::vector aodMetaDataKeys; - std::vector aodMetaDataVals; - - // this functor is called once per time frame - return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void { - LOGP(debug, "======== getGlobalAODSink::processing =========="); - LOGP(debug, " processing data set with {} entries", pc.inputs().size()); - - // return immediately if pc.inputs() is empty. This should never happen! - if (pc.inputs().size() == 0) { - LOGP(info, "No inputs available!"); - return; - } - - // update tfNumbers - uint64_t startTime = 0; - uint64_t tfNumber = 0; - auto ref = pc.inputs().get("tfn"); - if (ref.spec && ref.payload) { - startTime = DataRefUtils::getHeader(ref)->startTime; - tfNumber = pc.inputs().get("tfn"); - tfNumbers.insert(std::pair(startTime, tfNumber)); - } - // update tfFilenames - std::string aodInputFile; - auto ref2 = pc.inputs().get("tff"); - if (ref2.spec && ref2.payload) { - startTime = DataRefUtils::getHeader(ref2)->startTime; - aodInputFile = pc.inputs().get("tff"); - tfFilenames.insert(std::pair(startTime, aodInputFile)); - } - - // close all output files if one has reached size limit - dod->checkFileSizes(); - - // loop over the DataRefs which are contained in pc.inputs() - for (const auto& ref : pc.inputs()) { - if (!ref.spec) { - LOGP(debug, "Invalid input will be skipped!"); - continue; - } - - // get metadata - if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataKeys"))) { - aodMetaDataKeys = pc.inputs().get>(ref.spec->binding); - } - if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataVals"))) { - aodMetaDataVals = pc.inputs().get>(ref.spec->binding); - } - - // skip non-AOD refs - if (!DataSpecUtils::partialMatch(*ref.spec, writableAODOrigins)) { - continue; - } - startTime = DataRefUtils::getHeader(ref)->startTime; - - // does this need to be saved? - auto dh = DataRefUtils::getHeader(ref); - auto tableName = dh->dataDescription.as(); - auto ds = dod->getDataOutputDescriptors(*dh); - if (ds.empty()) { - continue; - } - - // get TF number from startTime - auto it = tfNumbers.find(startTime); - if (it != tfNumbers.end()) { - tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge(); - } else { - LOGP(fatal, "No time frame number found for output with start time {}", startTime); - throw std::runtime_error("Processing is stopped!"); - } - // get aod input file from startTime - auto it2 = tfFilenames.find(startTime); - if (it2 != tfFilenames.end()) { - aodInputFile = it2->second; - } - - // get the TableConsumer and corresponding arrow table - auto msg = pc.inputs().get(ref.spec->binding); - if (msg.header == nullptr) { - LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec)); - continue; - } - auto s = pc.inputs().get(ref.spec->binding); - auto table = s->asArrowTable(); - if (!table->Validate().ok()) { - LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName); - continue; - } - if (table->schema()->fields().empty()) { - LOGP(debug, "The table \"{}\" is empty but will be saved anyway!", tableName); - } - - // loop over all DataOutputDescriptors - // a table can be saved in multiple ways - // e.g. different selections of columns to different files - for (auto d : ds) { - auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel); - auto treename = fileAndFolder.folderName + "/" + d->treename; - TableToTree ta2tr(table, - fileAndFolder.file, - treename.c_str()); - - // update metadata - if (fileAndFolder.file->FindObjectAny("metaData")) { - LOGF(debug, "Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName()); - } else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) { - TMap aodMetaDataMap; - for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) { - aodMetaDataMap.Add(new TObjString(aodMetaDataKeys[imd]), new TObjString(aodMetaDataVals[imd])); - } - fileAndFolder.file->WriteObject(&aodMetaDataMap, "metaData", "Overwrite"); - } - - if (!d->colnames.empty()) { - for (auto& cn : d->colnames) { - auto idx = table->schema()->GetFieldIndex(cn); - auto col = table->column(idx); - auto field = table->schema()->field(idx); - if (idx != -1) { - ta2tr.addBranch(col, field); - } - } - } else { - ta2tr.addAllBranches(); - } - ta2tr.process(); - } - } - }; - }; // end of writerFunction + auto& ac = ctx.services().get(); // the command line options relevant for the writer are global // see runDataProcessing.h DataProcessorSpec spec{ .name = "internal-dpl-aod-writer", - .inputs = outputInputs, + .inputs = ac.outputsInputsAOD, .outputs = {}, - .algorithm = AlgorithmSpec{writerFunction}, + .algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTTTreeWriter", ctx), }; return spec; diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 1a656e4d60080..230d708b47dc7 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -30,7 +30,7 @@ #include "Framework/ServiceMetricsInfo.h" #include "WorkflowHelpers.h" #include "Framework/WorkflowSpecNode.h" -#include "AnalysisSupportHelpers.h" +#include "Framework/AnalysisSupportHelpers.h" #include "CommonMessageBackendsHelpers.h" #include @@ -516,7 +516,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow); // create DataOutputDescriptor - std::shared_ptr dod = WorkflowHelpers::getDataOutputDirector(ctx.options(), outputsInputs, isDangling); + std::shared_ptr dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); // select outputs of type AOD which need to be saved // ATTENTION: if there are dangling outputs the getGlobalAODSink @@ -537,11 +537,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // add TFNumber and TFFilename as input to the writer outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber"); outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename"); - int compression = 505; - if (ctx.options().hasOption("aod-writer-compression")) { - compression = ctx.options().get("aod-writer-compression"); - } - workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(dod, outputsInputsAOD, compression)); + workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx)); } // Move the dummy sink at the end, if needed for (size_t i = 0; i < workflow.size(); ++i) { diff --git a/Framework/Core/src/ConfigContext.cxx b/Framework/Core/src/ConfigContext.cxx index 726332e1d0ae3..9b121b1884998 100644 --- a/Framework/Core/src/ConfigContext.cxx +++ b/Framework/Core/src/ConfigContext.cxx @@ -14,6 +14,9 @@ namespace o2::framework { +ConfigContext::ConfigContext(ConfigParamRegistry& options, ServiceRegistryRef services, int argc, char** argv) + : mOptions{options}, mServices{services}, mArgc{argc}, mArgv{argv} {} + bool ConfigContext::helpOnCommandLine() const { bool helpasked = false; diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index da9a135dc5eb8..3782c48e81c56 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -9,7 +9,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "WorkflowHelpers.h" -#include "AnalysisSupportHelpers.h" +#include "Framework/AnalysisSupportHelpers.h" #include "Framework/AlgorithmSpec.h" #include "Framework/AODReaderHelpers.h" #include "Framework/ConfigParamSpec.h" @@ -153,7 +153,7 @@ int defaultConditionQueryRateMultiplier() return getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER") ? std::stoi(getenv("DPL_CONDITION_QUERY_RATE_MULTIPLIER")) : 1; } -void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext const& ctx) +void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx) { auto fakeCallback = AlgorithmSpec{[](InitContext& ic) { LOG(info) << "This is not a real device, merely a placeholder for external inputs"; @@ -241,7 +241,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext aodReader.options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, rateLimitingChannelConfigInput, {"how many timeframes can be in flight at the same time"}}); } - AnalysisContext ac; + ctx.services().registerService(ServiceRegistryHelpers::handleForService(new AnalysisContext)); + auto& ac = ctx.services().get(); + std::vector requestedCCDBs; std::vector providedCCDBs; @@ -573,7 +575,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // This is to inject a file sink so that any dangling ATSK object is written // to a ROOT file. if (ac.providedOutputObjHist.empty() == false) { - auto rootSink = AnalysisSupportHelpers::getOutputObjHistSink(ac.outObjHistMap, ac.outTskMap); + auto rootSink = AnalysisSupportHelpers::getOutputObjHistSink(ctx); extraSpecs.push_back(rootSink); } @@ -581,41 +583,38 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext extraSpecs.clear(); /// Analyze all ouputs - auto [outputsInputs, isDangling] = analyzeOutputs(workflow); + auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow); + ac.isDangling = isDanglingTmp; + ac.outputsInputs = outputsInputsTmp; // create DataOutputDescriptor - std::shared_ptr dod = getDataOutputDirector(ctx.options(), outputsInputs, isDangling); + std::shared_ptr dod = AnalysisSupportHelpers::getDataOutputDirector(ctx); // select outputs of type AOD which need to be saved // ATTENTION: if there are dangling outputs the getGlobalAODSink // has to be created in any case! - std::vector outputsInputsAOD; - for (auto ii = 0u; ii < outputsInputs.size(); ii++) { - if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) { - auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]); - if (ds.size() > 0 || isDangling[ii]) { - outputsInputsAOD.emplace_back(outputsInputs[ii]); + for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) { + if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) { + auto ds = dod->getDataOutputDescriptors(ac.outputsInputs[ii]); + if (ds.size() > 0 || ac.isDangling[ii]) { + ac.outputsInputsAOD.emplace_back(ac.outputsInputs[ii]); } } } // file sink for any AOD output - if (outputsInputsAOD.size() > 0) { + if (ac.outputsInputsAOD.size() > 0) { // add TFNumber and TFFilename as input to the writer - outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"}); - outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"}); - int compressionLevel = 505; - if (ctx.options().hasOption("aod-writer-compression")) { - compressionLevel = ctx.options().get("aod-writer-compression"); - } - auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(dod, outputsInputsAOD, compressionLevel); + ac.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"}); + ac.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"}); + auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx); extraSpecs.push_back(fileSink); - auto it = std::find_if(outputsInputs.begin(), outputsInputs.end(), [](InputSpec& spec) -> bool { + auto it = std::find_if(ac.outputsInputs.begin(), ac.outputsInputs.end(), [](InputSpec& spec) -> bool { return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN")); }); - size_t ii = std::distance(outputsInputs.begin(), it); - isDangling[ii] = false; + size_t ii = std::distance(ac.outputsInputs.begin(), it); + ac.isDangling[ii] = false; } workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); @@ -623,20 +622,20 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext // Select dangling outputs which are not of type AOD std::vector redirectedOutputsInputs; - for (auto ii = 0u; ii < outputsInputs.size(); ii++) { + for (auto ii = 0u; ii < ac.outputsInputs.size(); ii++) { if (ctx.options().get("forwarding-policy") == "none") { continue; } // We forward to the output proxy all the inputs only if they are dangling // or if the forwarding policy is "proxy". - if (!isDangling[ii] && (ctx.options().get("forwarding-policy") != "all")) { + if (!ac.isDangling[ii] && (ctx.options().get("forwarding-policy") != "all")) { continue; } // AODs are skipped in any case. - if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) { + if (DataSpecUtils::partialMatch(ac.outputsInputs[ii], extendedAODOrigins)) { continue; } - redirectedOutputsInputs.emplace_back(outputsInputs[ii]); + redirectedOutputsInputs.emplace_back(ac.outputsInputs[ii]); } std::vector unmatched; @@ -985,102 +984,6 @@ struct DataMatcherId { size_t id; }; -std::shared_ptr WorkflowHelpers::getDataOutputDirector(ConfigParamRegistry const& options, std::vector const& OutputsInputs, std::vector const& isDangling) -{ - std::shared_ptr dod = std::make_shared(); - - // analyze options and take actions accordingly - // default values - std::string rdn, resdir("./"); - std::string fnb, fnbase("AnalysisResults_trees"); - float mfs, maxfilesize(-1.); - std::string fmo, filemode("RECREATE"); - int ntfm, ntfmerge = 1; - - // values from json - if (options.isSet("aod-writer-json")) { - auto fnjson = options.get("aod-writer-json"); - if (!fnjson.empty()) { - std::tie(rdn, fnb, fmo, mfs, ntfm) = dod->readJson(fnjson); - if (!rdn.empty()) { - resdir = rdn; - } - if (!fnb.empty()) { - fnbase = fnb; - } - if (!fmo.empty()) { - filemode = fmo; - } - if (mfs > 0.) { - maxfilesize = mfs; - } - if (ntfm > 0) { - ntfmerge = ntfm; - } - } - } - - // values from command line options, information from json is overwritten - if (options.isSet("aod-writer-resdir")) { - rdn = options.get("aod-writer-resdir"); - if (!rdn.empty()) { - resdir = rdn; - } - } - if (options.isSet("aod-writer-resfile")) { - fnb = options.get("aod-writer-resfile"); - if (!fnb.empty()) { - fnbase = fnb; - } - } - if (options.isSet("aod-writer-resmode")) { - fmo = options.get("aod-writer-resmode"); - if (!fmo.empty()) { - filemode = fmo; - } - } - if (options.isSet("aod-writer-maxfilesize")) { - mfs = options.get("aod-writer-maxfilesize"); - if (mfs > 0) { - maxfilesize = mfs; - } - } - if (options.isSet("aod-writer-ntfmerge")) { - ntfm = options.get("aod-writer-ntfmerge"); - if (ntfm > 0) { - ntfmerge = ntfm; - } - } - // parse the keepString - if (options.isSet("aod-writer-keep")) { - auto keepString = options.get("aod-writer-keep"); - if (!keepString.empty()) { - dod->reset(); - std::string d("dangling"); - if (d.find(keepString) == 0) { - // use the dangling outputs - std::vector danglingOutputs; - for (auto ii = 0u; ii < OutputsInputs.size(); ii++) { - if (DataSpecUtils::partialMatch(OutputsInputs[ii], writableAODOrigins) && isDangling[ii]) { - danglingOutputs.emplace_back(OutputsInputs[ii]); - } - } - dod->readSpecs(danglingOutputs); - } else { - // use the keep string - dod->readString(keepString); - } - } - } - dod->setResultDir(resdir); - dod->setFilenameBase(fnbase); - dod->setFileMode(filemode); - dod->setMaximumFileSize(maxfilesize); - dod->setNumberTimeFramesToMerge(ntfmerge); - - return dod; -} - std::tuple, std::vector> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow) { // compute total number of input/output diff --git a/Framework/Core/src/WorkflowHelpers.h b/Framework/Core/src/WorkflowHelpers.h index b20249b99edc8..b2a4d4cab55df 100644 --- a/Framework/Core/src/WorkflowHelpers.h +++ b/Framework/Core/src/WorkflowHelpers.h @@ -180,7 +180,7 @@ struct WorkflowHelpers { // dangling inputs are satisfied. // @a workflow the workflow to decorate // @a ctx the context for the configuration phase - static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext const& ctx); + static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx); // Final adjustments to @a workflow after service devices have been injected. static void adjustTopology(WorkflowSpec& workflow, ConfigContext const& ctx); @@ -204,8 +204,6 @@ struct WorkflowHelpers { const std::vector& edges, const std::vector& index); - static std::shared_ptr getDataOutputDirector(ConfigParamRegistry const& options, std::vector const& OutputsInputs, std::vector const& outputTypes); - /// Given @a workflow it gathers all the OutputSpec and in addition provides /// the information whether and output is dangling and/or of type AOD /// An Output is dangling if it does not have a corresponding InputSpec. diff --git a/Framework/Core/test/Mocking.h b/Framework/Core/test/Mocking.h index b3e48ad3b2d0f..3b9a86b46e89c 100644 --- a/Framework/Core/test/Mocking.h +++ b/Framework/Core/test/Mocking.h @@ -34,7 +34,8 @@ std::unique_ptr makeEmptyConfigContext() store->preload(); store->activate(); static ConfigParamRegistry registry(std::move(store)); - auto context = std::make_unique(registry, 0, nullptr); + static ServiceRegistry services; + auto context = std::make_unique(registry, ServiceRegistryRef{services}, 0, nullptr); return context; } diff --git a/Framework/Core/test/benchmark_WorkflowHelpers.cxx b/Framework/Core/test/benchmark_WorkflowHelpers.cxx index f1c070d8a0f4e..09a9ae0cca923 100644 --- a/Framework/Core/test/benchmark_WorkflowHelpers.cxx +++ b/Framework/Core/test/benchmark_WorkflowHelpers.cxx @@ -30,7 +30,8 @@ std::unique_ptr makeEmptyConfigContext() store->preload(); store->activate(); static ConfigParamRegistry registry(std::move(store)); - auto context = std::make_unique(registry, 0, nullptr); + static ServiceRegistry services; + auto context = std::make_unique(registry, ServiceRegistryRef{services}, 0, nullptr); return context; } diff --git a/Framework/Core/test/test_OverrideLabels.cxx b/Framework/Core/test/test_OverrideLabels.cxx index 573bd13be797a..c5134c0c169c0 100644 --- a/Framework/Core/test/test_OverrideLabels.cxx +++ b/Framework/Core/test/test_OverrideLabels.cxx @@ -31,7 +31,8 @@ std::unique_ptr mockupLabels(std::string labelArg) store->preload(); store->activate(); registry = ConfigParamRegistry(std::move(store)); - auto context = std::make_unique(registry, 0, nullptr); + static ServiceRegistry services; + auto context = std::make_unique(registry, ServiceRegistryRef{services}, 0, nullptr); return context; } diff --git a/Framework/TestWorkflows/src/o2TestHistograms.cxx b/Framework/TestWorkflows/src/o2TestHistograms.cxx index 9986f52a1d940..efac16f6da4f0 100644 --- a/Framework/TestWorkflows/src/o2TestHistograms.cxx +++ b/Framework/TestWorkflows/src/o2TestHistograms.cxx @@ -17,6 +17,7 @@ #include "Framework/AnalysisTask.h" #include #include +#include using namespace o2; using namespace o2::framework; @@ -43,7 +44,7 @@ struct EtaAndClsHistogramsSimple { { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) { - etaClsH->Fill(track.eta(), track.pt(), 0); + etaClsH->Fill(track.eta(), track.pt()); skimEx(track.pt(), track.eta()); } } @@ -57,7 +58,7 @@ struct EtaAndClsHistogramsIUSimple { { LOGP(info, "Invoking the simple one"); for (auto& track : tracks) { - etaClsH->Fill(track.eta(), track.pt(), 0); + etaClsH->Fill(track.eta(), track.pt()); skimEx(track.pt(), track.eta()); } }