From a55f48275dd78f0453cf8c0b168928228375ae95 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 7 Nov 2024 15:40:41 +0100 Subject: [PATCH 1/3] DPL: pass ConfigContext to the PluginManager when creating AlgorithmSpec This way a plugin can create more complex AlgorithmSpecs which depend on the workflow options. This will be needed to properly read metadata from parent files, and it opens the way to more service devices to be moved in a plugin. --- Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx | 3 ++- Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h | 2 +- Framework/AnalysisSupport/src/Plugin.cxx | 4 ++-- Framework/CCDBSupport/src/Plugin.cxx | 2 +- Framework/Core/include/Framework/AlgorithmSpec.h | 2 +- Framework/Core/include/Framework/PluginManager.h | 5 +++-- Framework/Core/src/PluginManager.cxx | 6 +++--- Framework/Core/src/WorkflowHelpers.cxx | 4 ++-- 8 files changed, 15 insertions(+), 13 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index a8b708668dae1..016ed4f1df1ef 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -22,6 +22,7 @@ #include "Framework/DeviceSpec.h" #include "Framework/RawDeviceService.h" #include "Framework/DataSpecUtils.h" +#include "Framework/ConfigContext.h" #include "DataInputDirector.h" #include "Framework/SourceInfoHeader.h" #include "Framework/ChannelInfo.h" @@ -117,7 +118,7 @@ static inline auto extractOriginalsTuple(framework::pack, ProcessingConte return std::make_tuple(extractTypedOriginal(pc)...); } -AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback() +AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const& config) { auto callback = AlgorithmSpec{adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h index 4b9fd710aca14..e8d663d8fe0bb 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.h @@ -24,7 +24,7 @@ namespace o2::framework::readers { struct AODJAlienReaderHelpers { - static AlgorithmSpec rootFileReaderCallback(); + static AlgorithmSpec rootFileReaderCallback(ConfigContext const&context); static void dumpFileMetrics(o2::monitoring::Monitoring& monitoring, TFile* currentFile, uint64_t startedAt, uint64_t ioTime, int tfPerFile, int tfRead); }; diff --git a/Framework/AnalysisSupport/src/Plugin.cxx b/Framework/AnalysisSupport/src/Plugin.cxx index 32a86d37aebb9..9ab4dfa0a2a9f 100644 --- a/Framework/AnalysisSupport/src/Plugin.cxx +++ b/Framework/AnalysisSupport/src/Plugin.cxx @@ -26,9 +26,9 @@ O2_DECLARE_DYNAMIC_LOG(analysis_support); struct ROOTFileReader : o2::framework::AlgorithmPlugin { - o2::framework::AlgorithmSpec create() override + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const& config) override { - return o2::framework::readers::AODJAlienReaderHelpers::rootFileReaderCallback(); + return o2::framework::readers::AODJAlienReaderHelpers::rootFileReaderCallback(config); } }; diff --git a/Framework/CCDBSupport/src/Plugin.cxx b/Framework/CCDBSupport/src/Plugin.cxx index 8769511f4849a..18aabc07ae4a4 100644 --- a/Framework/CCDBSupport/src/Plugin.cxx +++ b/Framework/CCDBSupport/src/Plugin.cxx @@ -13,7 +13,7 @@ #include "CCDBHelpers.h" struct CCDBFetcherPlugin : o2::framework::AlgorithmPlugin { - o2::framework::AlgorithmSpec create() final + o2::framework::AlgorithmSpec create(o2::framework::ConfigContext const&) final { return o2::framework::CCDBHelpers::fetchFromCCDB(); } diff --git a/Framework/Core/include/Framework/AlgorithmSpec.h b/Framework/Core/include/Framework/AlgorithmSpec.h index e08d829e489bd..7d56ba9f6ce68 100644 --- a/Framework/Core/include/Framework/AlgorithmSpec.h +++ b/Framework/Core/include/Framework/AlgorithmSpec.h @@ -81,7 +81,7 @@ struct AlgorithmSpec { /// Helper class for an algorithm which is loaded as a plugin. struct AlgorithmPlugin { - virtual AlgorithmSpec create() = 0; + virtual AlgorithmSpec create(ConfigContext const&) = 0; }; // Allow fetching inputs from the context using a string literal. template diff --git a/Framework/Core/include/Framework/PluginManager.h b/Framework/Core/include/Framework/PluginManager.h index 4c6e965502500..d6b16f01ad713 100644 --- a/Framework/Core/include/Framework/PluginManager.h +++ b/Framework/Core/include/Framework/PluginManager.h @@ -51,8 +51,9 @@ struct PluginManager { /// the DPLPluginHandle provided by the library. static void load(std::vector& infos, const char* dso, std::function& onSuccess); /// Load an called @plugin from a library called @a library and - /// return the associtated AlgorithmSpec. - static auto loadAlgorithmFromPlugin(std::string library, std::string plugin) -> AlgorithmSpec; + /// @return the associated AlgorithmSpec. + /// The config @a context can be used to determine the workflow options which affect such plugin. + static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const& context) -> AlgorithmSpec; /// Wrap an algorithm with some lambda @wrapper which will be called /// with the original callback and the ProcessingContext. static auto wrapAlgorithm(AlgorithmSpec const& spec, WrapperProcessCallback&& wrapper) -> AlgorithmSpec; diff --git a/Framework/Core/src/PluginManager.cxx b/Framework/Core/src/PluginManager.cxx index 96666722fc169..9faea85ad65e7 100644 --- a/Framework/Core/src/PluginManager.cxx +++ b/Framework/Core/src/PluginManager.cxx @@ -101,10 +101,10 @@ void PluginManager::load(std::vector& libs, const char* dso, std::fu onSuccess(pluginInstance); } -auto PluginManager::loadAlgorithmFromPlugin(std::string library, std::string plugin) -> AlgorithmSpec +auto PluginManager::loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const& context) -> AlgorithmSpec { std::shared_ptr algorithm{nullptr}; - return AlgorithmSpec{[algorithm, library, plugin](InitContext& ic) mutable -> AlgorithmSpec::ProcessCallback { + return AlgorithmSpec{[algorithm, library, plugin, &context](InitContext& ic) mutable -> AlgorithmSpec::ProcessCallback { if (algorithm.get()) { return algorithm->onInit(ic); } @@ -134,7 +134,7 @@ auto PluginManager::loadAlgorithmFromPlugin(std::string library, std::string plu if (!creator) { LOGP(fatal, "Could not find the {} plugin in {}.", plugin, libName); } - algorithm = std::make_shared(creator->create()); + algorithm = std::make_shared(creator->create(context)); return algorithm->onInit(ic); }}; }; diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 3fe8fae19a3b5..0366e39cf8976 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -432,7 +432,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; }); if (mctracks2aod == workflow.end()) { // add normal reader - auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader"); + auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx); if (internalRateLimiting) { aodReader.algorithm = CommonDataProcessors::wrapWithRateLimiting(algo); } else { @@ -520,7 +520,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext } // Load the CCDB backend from the plugin - ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin"); + ccdbBackend.algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "CCDBFetcherPlugin", ctx); extraSpecs.push_back(ccdbBackend); } else { // If there is no CCDB requested, but we still ask for a FLP/DISTSUBTIMEFRAME/0xccdb From 2d015adce39c98bbf385f98564a0144fcdf5415f Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 7 Nov 2024 15:53:03 +0100 Subject: [PATCH 2/3] DPL: allow plugins to know about discoveries of other plugins --- Framework/Core/include/Framework/runDataProcessing.h | 1 - Framework/Core/src/ConfigParamDiscovery.cxx | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/include/Framework/runDataProcessing.h b/Framework/Core/include/Framework/runDataProcessing.h index fbf2843d4db01..eee4c4b6583d3 100644 --- a/Framework/Core/include/Framework/runDataProcessing.h +++ b/Framework/Core/include/Framework/runDataProcessing.h @@ -197,7 +197,6 @@ int mainNoCatch(int argc, char** argv) for (auto& extra : extraOptions) { workflowOptions.push_back(extra); } - workflowOptionsRegistry.loadExtra(extraOptions); ConfigContext configContext(workflowOptionsRegistry, argc, argv); o2::framework::WorkflowSpec specs = defineDataProcessing(configContext); diff --git a/Framework/Core/src/ConfigParamDiscovery.cxx b/Framework/Core/src/ConfigParamDiscovery.cxx index 9673f77ed0e42..63c38b7f0ac6c 100644 --- a/Framework/Core/src/ConfigParamDiscovery.cxx +++ b/Framework/Core/src/ConfigParamDiscovery.cxx @@ -75,6 +75,7 @@ std::vector ConfigParamDiscovery::discover(ConfigParamRegistry& for (auto& extra : extras) { result.push_back(extra); } + registry.loadExtra(extras); } return result; } From ed328b50751686876d2c0b320ded2115fa20b6e9 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Thu, 7 Nov 2024 15:53:21 +0100 Subject: [PATCH 3/3] DPL: read metadata from parent files If the metadata is not found in the main file and if there is a list of parent files, try those as well. --- .../src/AODJAlienReaderHelpers.cxx | 20 ++-- Framework/AnalysisSupport/src/Plugin.cxx | 102 +++++++++++++----- Framework/Core/src/ConfigParamDiscovery.cxx | 2 +- Framework/Core/src/Plugin.cxx | 9 +- Framework/Core/src/WorkflowHelpers.cxx | 1 - 5 files changed, 96 insertions(+), 38 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index 016ed4f1df1ef..90d88cb43626e 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -120,10 +120,17 @@ static inline auto extractOriginalsTuple(framework::pack, ProcessingConte AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const& config) { - auto callback = AlgorithmSpec{adaptStateful([](ConfigParamRegistry const& options, - DeviceSpec const& spec, - Monitoring& monitoring, - DataProcessingStats& stats) { + // aod-parent-base-path-replacement is now a workflow option, so it needs to be + // retrieved from the ConfigContext. This is because we do not allow workflow options + // to change over start-stop-start because they can affect the topology generation. + std::string parentFileReplacement; + if (config.options().isSet("aod-parent-base-path-replacement")) { + parentFileReplacement = config.options().get("aod-parent-base-path-replacement"); + } + auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement](ConfigParamRegistry const& options, + DeviceSpec const& spec, + Monitoring& monitoring, + DataProcessingStats& stats) { // FIXME: not actually needed, since data processing stats can specify that we should // send the initial value. stats.updateStats({static_cast(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, 0}); @@ -141,11 +148,6 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const auto maxRate = options.get("aod-max-io-rate"); - std::string parentFileReplacement; - if (options.isSet("aod-parent-base-path-replacement")) { - parentFileReplacement = options.get("aod-parent-base-path-replacement"); - } - int parentAccessLevel = 0; if (options.isSet("aod-parent-access-level")) { parentAccessLevel = options.get("aod-parent-access-level"); diff --git a/Framework/AnalysisSupport/src/Plugin.cxx b/Framework/AnalysisSupport/src/Plugin.cxx index 9ab4dfa0a2a9f..b899a52206422 100644 --- a/Framework/AnalysisSupport/src/Plugin.cxx +++ b/Framework/AnalysisSupport/src/Plugin.cxx @@ -22,6 +22,7 @@ #include #include #include +#include O2_DECLARE_DYNAMIC_LOG(analysis_support); @@ -65,7 +66,7 @@ struct RunSummary : o2::framework::ServicePlugin { } }; -std::vector getListOfTables(TFile* f) +std::vector getListOfTables(std::unique_ptr& f) { std::vector r; TList* keyList = f->GetListOfKeys(); @@ -83,6 +84,32 @@ std::vector getListOfTables(TFile* f) } return r; } +auto readMetadata(std::unique_ptr& currentFile) -> std::vector +{ + // Get the metadata, if any + auto m = (TMap*)currentFile->Get("metaData"); + if (!m) { + return {}; + } + std::vector results; + auto it = m->MakeIterator(); + + // Serialise metadata into a ; separated string with : separating key and value + bool first = true; + while (auto obj = it->Next()) { + if (first) { + LOGP(info, "Metadata for file \"{}\":", currentFile->GetName()); + first = false; + } + auto objString = (TObjString*)m->GetValue(obj); + LOGP(info, "- {}: {}", obj->GetName(), objString->String().Data()); + std::string key = "aod-metadata-" + std::string(obj->GetName()); + char const* value = strdup(objString->String()); + results.push_back(ConfigParamSpec{key, VariantType::String, value, {"Metadata in AOD"}}); + } + + return results; +} struct DiscoverMetadataInAOD : o2::framework::ConfigDiscoveryPlugin { ConfigDiscovery* create() override @@ -94,8 +121,6 @@ struct DiscoverMetadataInAOD : o2::framework::ConfigDiscoveryPlugin { if (filename.empty()) { return {}; } - std::vector results; - TFile* currentFile = nullptr; if (filename.at(0) == '@') { filename.erase(0, 1); // read the text file and set filename to the contents of the first line @@ -110,39 +135,64 @@ struct DiscoverMetadataInAOD : o2::framework::ConfigDiscoveryPlugin { TGrid::Connect("alien://"); } LOGP(info, "Loading metadata from file {} in PID {}", filename, getpid()); - currentFile = TFile::Open(filename.c_str()); - if (!currentFile) { + std::unique_ptr currentFile{TFile::Open(filename.c_str())}; + if (currentFile.get() == nullptr) { LOGP(fatal, "Couldn't open file \"{}\"!", filename); } + std::vector results = readMetadata(currentFile); + // Found metadata already in the main file. + if (!results.empty()) { + auto tables = getListOfTables(currentFile); + if (tables.empty() == false) { + results.push_back(ConfigParamSpec{"aod-metadata-tables", VariantType::ArrayString, tables, {"Tables in first AOD"}}); + } + results.push_back(ConfigParamSpec{"aod-metadata-source", VariantType::String, filename, {"File from which the metadata was extracted."}}); + return results; + } - // Get the metadata, if any - auto m = (TMap*)currentFile->Get("metaData"); - if (!m) { + // Lets try in parent files + auto parentFiles = (TMap*)currentFile->Get("parentFiles"); + if (!parentFiles) { LOGP(info, "No metadata found in file \"{}\"", filename); results.push_back(ConfigParamSpec{"aod-metadata-disable", VariantType::String, "1", {"Metadata not found in AOD"}}); return results; } - auto it = m->MakeIterator(); - - // Serialise metadata into a ; separated string with : separating key and value - bool first = true; - while (auto obj = it->Next()) { - if (first) { - LOGP(info, "Metadata for file \"{}\":", filename); - first = false; + for (auto* p : *parentFiles) { + std::string parentFilename = ((TPair*)p)->Value()->GetName(); + // Do the replacement. Notice this will require changing aod-parent-base-path-replacement to be + // a workflow option (because the metadata itself is potentially changing the topology). + if (registry.isSet("aod-parent-base-path-replacement")) { + auto parentFileReplacement = registry.get("aod-parent-base-path-replacement"); + auto pos = parentFileReplacement.find(';'); + if (pos == std::string::npos) { + throw std::runtime_error(fmt::format("Invalid syntax in aod-parent-base-path-replacement: \"{}\"", parentFileReplacement.c_str())); + } + auto from = parentFileReplacement.substr(0, pos); + auto to = parentFileReplacement.substr(pos + 1); + pos = parentFilename.find(from); + if (pos != std::string::npos) { + parentFilename.replace(pos, from.length(), to); + } } - auto objString = (TObjString*)m->GetValue(obj); - LOGP(info, "- {}: {}", obj->GetName(), objString->String().Data()); - std::string key = "aod-metadata-" + std::string(obj->GetName()); - char const* value = strdup(objString->String()); - results.push_back(ConfigParamSpec{key, VariantType::String, value, {"Metadata in AOD"}}); - } - auto tables = getListOfTables(currentFile); - if (tables.empty() == false) { - results.push_back(ConfigParamSpec{"aod-metadata-tables", VariantType::ArrayString, tables, {"Tables in first AOD"}}); + std::unique_ptr parentFile{TFile::Open(parentFilename.c_str())}; + if (parentFile.get() == nullptr) { + LOGP(fatal, "Couldn't open derived file \"{}\"!", parentFilename); + } + results = readMetadata(parentFile); + // Found metadata already in the main file. + if (!results.empty()) { + auto tables = getListOfTables(parentFile); + if (tables.empty() == false) { + results.push_back(ConfigParamSpec{"aod-metadata-tables", VariantType::ArrayString, tables, {"Tables in first AOD"}}); + } + results.push_back(ConfigParamSpec{"aod-metadata-source", VariantType::String, filename, {"File from which the metadata was extracted."}}); + return results; + } + LOGP(info, "No metadata found in file \"{}\" nor in its parent file \"{}\"", filename, parentFilename); + break; } - currentFile->Close(); + results.push_back(ConfigParamSpec{"aod-metadata-disable", VariantType::String, "1", {"Metadata not found in AOD"}}); return results; }}; } diff --git a/Framework/Core/src/ConfigParamDiscovery.cxx b/Framework/Core/src/ConfigParamDiscovery.cxx index 63c38b7f0ac6c..fc8d6f2600bb4 100644 --- a/Framework/Core/src/ConfigParamDiscovery.cxx +++ b/Framework/Core/src/ConfigParamDiscovery.cxx @@ -25,9 +25,9 @@ namespace o2::framework std::vector ConfigParamDiscovery::discover(ConfigParamRegistry& registry, int argc, char** argv) { std::vector capabilitiesSpecs = { + "O2Framework:DiscoverAODOptionsInCommandLineCapability", "O2Framework:DiscoverMetadataInAODCapability", "O2Framework:DiscoverMetadataInCommandLineCapability", - "O2Framework:DiscoverAODOptionsInCommandLineCapability", }; // Load all the requested plugins and discover what we can do. diff --git a/Framework/Core/src/Plugin.cxx b/Framework/Core/src/Plugin.cxx index a98771a913d01..91c74bafff5ad 100644 --- a/Framework/Core/src/Plugin.cxx +++ b/Framework/Core/src/Plugin.cxx @@ -57,6 +57,10 @@ auto lookForCommandLineAODOptions = [](ConfigParamRegistry& registry, int argc, O2_SIGNPOST_EVENT_EMIT(capabilities, sid, "DiscoverAODOptionsInCommandLineCapability", "AOD options found in arguments. Populating from them."); return true; } + if (arg.starts_with("--aod-parent-base-path-replacement")) { + O2_SIGNPOST_EVENT_EMIT(capabilities, sid, "DiscoverAODOptionsInCommandLineCapability", "AOD options found in arguments. Populating from them."); + return true; + } } return false; }; @@ -137,7 +141,7 @@ struct DiscoverAODOptionsInCommandLine : o2::framework::ConfigDiscoveryPlugin { bool injectOption = true; for (size_t i = 0; i < argc; i++) { std::string_view arg = argv[i]; - if (!arg.starts_with("--aod-writer-")) { + if (!arg.starts_with("--aod-writer-") && arg != "--aod-parent-base-path-replacement") { continue; } std::string key = arg.data() + 2; @@ -149,6 +153,9 @@ struct DiscoverAODOptionsInCommandLine : o2::framework::ConfigDiscoveryPlugin { results.push_back(ConfigParamSpec{"aod-writer-compression", VariantType::Int, numericValue, {"AOD Compression options"}}); injectOption = false; } + if (key == "aod-parent-base-path-replacement") { + results.push_back(ConfigParamSpec{"aod-parent-base-path-replacement", VariantType::String, value, {R"(Replace base path of parent files. Syntax: FROM;TO. E.g. "alien:///path/in/alien;/local/path". Enclose in "" on the command line.)"}}); + } } if (injectOption) { results.push_back(ConfigParamSpec{"aod-writer-compression", VariantType::Int, 505, {"AOD Compression options"}}); diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 0366e39cf8976..56e9930e3b655 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -217,7 +217,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext ConfigParamSpec{"aod-max-io-rate", VariantType::Float, 0.f, {"Maximum I/O rate in MB/s"}}, ConfigParamSpec{"aod-reader-json", VariantType::String, {"json configuration file"}}, ConfigParamSpec{"aod-parent-access-level", VariantType::String, {"Allow parent file access up to specified level. Default: no (0)"}}, - ConfigParamSpec{"aod-parent-base-path-replacement", VariantType::String, {R"(Replace base path of parent files. Syntax: FROM;TO. E.g. "alien:///path/in/alien;/local/path". Enclose in "" on the command line.)"}}, ConfigParamSpec{"time-limit", VariantType::Int64, 0ll, {"Maximum run time limit in seconds"}}, ConfigParamSpec{"orbit-offset-enumeration", VariantType::Int64, 0ll, {"initial value for the orbit"}}, ConfigParamSpec{"orbit-multiplier-enumeration", VariantType::Int64, 0ll, {"multiplier to get the orbit from the counter"}},