diff --git a/Framework/Core/include/Framework/AnalysisSupportHelpers.h b/Framework/Core/include/Framework/AnalysisSupportHelpers.h index cc4d45a46c8bc..c0eeb3bd9697d 100644 --- a/Framework/Core/include/Framework/AnalysisSupportHelpers.h +++ b/Framework/Core/include/Framework/AnalysisSupportHelpers.h @@ -39,11 +39,6 @@ struct AnalysisSupportHelpers { std::vector const& requestedSpecials, std::vector& requestedAODs, DataProcessorSpec& publisher); - static void addMissingOutputsToAnalysisCCDBFetcher(std::vector const& providedSpecials, - std::vector const& requestedSpecials, - std::vector& requestedAODs, - std::vector& requestedDYNs, - DataProcessorSpec& publisher); static void addMissingOutputsToBuilder(std::vector const& requestedSpecials, std::vector& requestedAODs, std::vector& requestedDYNs, diff --git a/Framework/Core/include/Framework/DataSpecViews.h b/Framework/Core/include/Framework/DataSpecViews.h index 0782cefd0f632..162a12419594e 100644 --- a/Framework/Core/include/Framework/DataSpecViews.h +++ b/Framework/Core/include/Framework/DataSpecViews.h @@ -31,6 +31,32 @@ static auto filter_not_matching(auto const& provided) return std::views::filter([&provided](auto const& input) { return std::none_of(provided.begin(), provided.end(), [&input](auto const& output) { return DataSpecUtils::match(input, output); }); }); } +static auto filter_matching(auto const& provided) +{ + return std::views::filter([&provided](auto const& input) { return std::any_of(provided.begin(), provided.end(), [&input](auto const& output) { return DataSpecUtils::match(input, output); }); }); +} + +static auto filter_string_params_with(std::string match) +{ + return std::views::filter([match](auto const& param) { + return (param.type == VariantType::String) && (param.name.find(match) != std::string::npos); + }); +} + +static auto input_to_output_specs() +{ + return std::views::transform([](auto const& input) { + auto concrete = DataSpecUtils::asConcreteDataMatcher(input); + return OutputSpec{concrete.origin, concrete.description, concrete.subSpec, input.lifetime, input.metadata}; + }); +} + +static auto params_to_input_specs() +{ + return std::views::transform([](auto const& param) { + return DataSpecUtils::fromMetadataString(param.defaultValue.template get()); + }); +} } // namespace o2::framework::views // namespace o2::framework::sinks @@ -54,7 +80,7 @@ struct update_input_list { template friend Container& operator|(R&& r, update_input_list self) { - for (auto& item : r) { + for (auto const& item : r) { auto copy = item; DataSpecUtils::updateInputList(self.c, std::move(copy)); } @@ -62,6 +88,21 @@ struct update_input_list { } }; +template +struct update_output_list { + Container& c; + // ends the pipeline, returns the container + template + friend Container& operator|(R&& r, update_output_list self) + { + for (auto const& item : r) { + auto copy = item; + DataSpecUtils::updateOutputList(self.c, std::move(copy)); + } + return self.c; + } +}; + } // namespace o2::framework::sinks #endif // O2_FRAMEWORK_DATASPECVIEWS_H_ diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index b5c898faa515a..e59f36c72bdab 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -11,10 +11,7 @@ #include "Framework/AnalysisSupportHelpers.h" #include "Framework/DataOutputDirector.h" -#include "Framework/OutputObjHeader.h" -#include "Framework/ControlService.h" -#include "Framework/EndOfStreamContext.h" -#include "Framework/DeviceSpec.h" +#include "Framework/DataSpecViews.h" #include "Framework/PluginManager.h" #include "Framework/ConfigContext.h" #include "WorkflowHelpers.h" @@ -129,30 +126,11 @@ void AnalysisSupportHelpers::addMissingOutputsToReader(std::vector c std::vector const& requestedInputs, DataProcessorSpec& publisher) { - auto matchingOutputFor = [](InputSpec const& requested) { - return [&requested](OutputSpec const& provided) { - return DataSpecUtils::match(requested, provided); - }; - }; - for (InputSpec const& requested : requestedInputs) { - auto provided = std::find_if(providedOutputs.begin(), - providedOutputs.end(), - matchingOutputFor(requested)); - - if (provided != providedOutputs.end()) { - continue; - } - - auto inList = std::find_if(publisher.outputs.begin(), - publisher.outputs.end(), - matchingOutputFor(requested)); - if (inList != publisher.outputs.end()) { - continue; - } - - auto concrete = DataSpecUtils::asConcreteDataMatcher(requested); - publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec, requested.lifetime, requested.metadata); - } + requestedInputs | + views::filter_not_matching(providedOutputs) | // filter the inputs that are already provided + views::filter_not_matching(publisher.outputs) | // filter the inputs that are already covered + views::input_to_output_specs() | + sinks::append_to{publisher.outputs}; // append them to the publisher outputs } void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector const& providedSpecials, @@ -160,25 +138,20 @@ void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector std::vector& requestedAODs, DataProcessorSpec& publisher) { - for (auto& input : requestedSpecials) { - if (std::any_of(providedSpecials.begin(), providedSpecials.end(), [&input](auto const& x) { - return DataSpecUtils::match(input, x); - })) { - continue; - } - auto concrete = DataSpecUtils::asConcreteDataMatcher(input); - publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec); - for (auto& i : input.metadata) { - if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) { - auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get()); - auto j = std::find(publisher.inputs.begin(), publisher.inputs.end(), spec); - if (j == publisher.inputs.end()) { - publisher.inputs.push_back(spec); - } - DataSpecUtils::updateInputList(requestedAODs, std::move(spec)); - } - } + requestedSpecials | + views::filter_not_matching(providedSpecials) | // filter the inputs that are already provided + views::input_to_output_specs() | + sinks::append_to{publisher.outputs}; // append them to the publisher outputs + + std::vector additionalInputs; + for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) { + input.metadata | + views::filter_string_params_with("input:") | + views::params_to_input_specs() | + sinks::update_input_list{additionalInputs}; // store into a temporary } + additionalInputs | sinks::update_input_list{requestedAODs}; // update requestedAODs + additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs } void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector const& requestedSpecials, @@ -186,52 +159,26 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c std::vector& requestedDYNs, DataProcessorSpec& publisher) { - for (auto& input : requestedSpecials) { - auto concrete = DataSpecUtils::asConcreteDataMatcher(input); - publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec); - for (auto& i : input.metadata) { - if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) { - auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get()); - auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; }); - if (j == publisher.inputs.end()) { - publisher.inputs.push_back(spec); - } - if (DataSpecUtils::partialMatch(spec, AODOrigins)) { - DataSpecUtils::updateInputList(requestedAODs, std::move(spec)); - } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) { - DataSpecUtils::updateInputList(requestedDYNs, std::move(spec)); - } - } - } + requestedSpecials | + views::input_to_output_specs() | + sinks::append_to{publisher.outputs}; // append them to the publisher outputs + + std::vector additionalInputs; + for (auto const& input : requestedSpecials) { + input.metadata | + views::filter_string_params_with("input:") | + views::params_to_input_specs() | + sinks::update_input_list{additionalInputs}; // store into a temporary } -} -void AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher( - std::vector const& providedSpecials, - std::vector const& requestedSpecials, - std::vector& requestedAODs, - std::vector& requestedDYNs, - DataProcessorSpec& publisher) -{ - for (auto& input : requestedSpecials) { - auto concrete = DataSpecUtils::asConcreteDataMatcher(input); - publisher.outputs.emplace_back(concrete.origin, concrete.description, concrete.subSpec); - // FIXME: good enough for now... - for (auto& i : input.metadata) { - if ((i.type == VariantType::String) && (i.name.find("input:") != std::string::npos)) { - auto spec = DataSpecUtils::fromMetadataString(i.defaultValue.get()); - auto j = std::find_if(publisher.inputs.begin(), publisher.inputs.end(), [&](auto x) { return x.binding == spec.binding; }); - if (j == publisher.inputs.end()) { - publisher.inputs.push_back(spec); - } - if (DataSpecUtils::partialMatch(spec, AODOrigins)) { - DataSpecUtils::updateInputList(requestedAODs, std::move(spec)); - } else if (DataSpecUtils::partialMatch(spec, header::DataOrigin{"DYN"})) { - DataSpecUtils::updateInputList(requestedDYNs, std::move(spec)); - } - } - } - } + additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs + // FIXME: until we have a single list of pairs + additionalInputs | + views::partial_match_filter(AODOrigins) | + sinks::update_input_list{requestedAODs}; // update requestedAODs + additionalInputs | + views::partial_match_filter(header::DataOrigin{"DYN"}) | + sinks::update_input_list{requestedDYNs}; // update requestedDYNs } // ============================================================================= diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index cf2d364027932..26594252e888b 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -595,23 +595,16 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() ac.providedTIMs.clear(); ac.requestedTIMs.clear(); - auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); }; if (builder != workflow.end()) { // collect currently requested IDXs ac.requestedIDXs.clear(); - for (auto& d : workflow) { - if (d.name == builder->name) { - continue; - } - for (auto& i : d.inputs) { - if (DataSpecUtils::partialMatch(i, header::DataOrigin{"IDX"})) { - auto copy = i; - DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy)); - } - } + for (auto& d : workflow | views::exclude_by_name(builder->name)) { + d.inputs | + views::partial_match_filter(header::DataOrigin{"IDX"}) | + sinks::update_input_list{ac.requestedIDXs}; } // recreate inputs and outputs builder->inputs.clear(); @@ -624,37 +617,27 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() if (spawner != workflow.end()) { // collect currently requested DYNs - for (auto& d : workflow) { - if (d.name == spawner->name) { - continue; - } - for (auto const& i : d.inputs) { - if (DataSpecUtils::partialMatch(i, header::DataOrigin{"DYN"})) { - auto copy = i; - DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy)); - } - } - for (auto const& o : d.outputs) { - if (DataSpecUtils::partialMatch(o, header::DataOrigin{"DYN"})) { - ac.providedDYNs.emplace_back(o); - } - } + for (auto& d : workflow | views::exclude_by_name(spawner->name)) { + d.inputs | + views::partial_match_filter(header::DataOrigin{"DYN"}) | + sinks::update_input_list{ac.requestedDYNs}; + d.outputs | + views::partial_match_filter(header::DataOrigin{"DYN"}) | + sinks::append_to{ac.providedDYNs}; } std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan); std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan); ac.spawnerInputs.clear(); - for (auto& input : ac.requestedDYNs) { - if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) { - ac.spawnerInputs.emplace_back(input); - } - } + ac.requestedDYNs | + views::filter_not_matching(ac.providedDYNs) | + sinks::append_to{ac.spawnerInputs}; // recreate inputs and outputs spawner->outputs.clear(); spawner->inputs.clear(); + AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner); // replace AlgorithmSpec // FIXME: it should be made more generic, so it does not need replacement... spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx); - AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner); } if (analysisCCDB != workflow.end()) { @@ -675,7 +658,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // FIXME: it should be made more generic, so it does not need replacement... // FIXME how can I make the lookup depend on DYN tables as well?? analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx); - AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB); + AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB); } if (writer != workflow.end()) { @@ -686,12 +669,9 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // If reader and/or builder were adjusted, remove unneeded outputs // update currently requested AODs for (auto& d : workflow) { - for (auto const& i : d.inputs) { - if (DataSpecUtils::partialMatch(i, AODOrigins)) { - auto copy = i; - DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy)); - } - } + d.inputs | + views::partial_match_filter(AODOrigins) | + sinks::update_input_list{ac.requestedAODs}; } // remove unmatched outputs @@ -705,8 +685,6 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() } } - - // replace writer as some outputs may have become dangling and some are now consumed auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow); diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 17f6c9eb7ddb6..02141678fec7c 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -19,7 +19,6 @@ #include "Framework/DataSpecUtils.h" #include "Framework/DataSpecViews.h" #include "Framework/DataAllocator.h" -#include "Framework/ControlService.h" #include "Framework/RawDeviceService.h" #include "Framework/StringHelpers.h" #include "Framework/ChannelSpecHelpers.h" @@ -157,18 +156,6 @@ int defaultConditionQueryRateMultiplier() void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx) { - auto fakeCallback = AlgorithmSpec{[](InitContext& ic) { - LOG(info) << "This is not a real device, merely a placeholder for external inputs"; - LOG(info) << "To be hidden / removed at some point."; - // mark this dummy process as ready-to-quit - ic.services().get().readyToQuit(QuitRequest::Me); - - return [](ProcessingContext& pc) { - // this callback is never called since there is no expiring input - pc.services().get().waitFor(2000); - }; - }}; - DataProcessorSpec ccdbBackend{ .name = "internal-dpl-ccdb-backend", .outputs = {}, @@ -281,20 +268,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext processor.options.push_back(ConfigParamSpec{"end-value-enumeration", VariantType::Int64, -1ll, {"final value for the enumeration"}}); processor.options.push_back(ConfigParamSpec{"step-value-enumeration", VariantType::Int64, 1ll, {"step between one value and the other"}}); } - bool hasTimeframeInputs = false; - for (auto& input : processor.inputs) { - if (input.lifetime == Lifetime::Timeframe) { - hasTimeframeInputs = true; - break; - } - } - bool hasTimeframeOutputs = false; - for (auto& output : processor.outputs) { - if (output.lifetime == Lifetime::Timeframe) { - hasTimeframeOutputs = true; - break; - } - } + bool hasTimeframeInputs = std::any_of(processor.inputs.begin(), processor.inputs.end(), [](auto const& input) { return input.lifetime == Lifetime::Timeframe; }); + bool hasTimeframeOutputs = std::any_of(processor.outputs.begin(), processor.outputs.end(), [](auto const& output) { return output.lifetime == Lifetime::Timeframe; }); + // A timeframeSink consumes timeframes without creating new // timeframe data. bool timeframeSink = hasTimeframeInputs && !hasTimeframeOutputs; @@ -304,14 +280,13 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext uint32_t hash = runtime_hash(processor.name.c_str()); bool hasMatch = false; ConcreteDataMatcher summaryMatcher = ConcreteDataMatcher{"DPL", "SUMMARY", static_cast(hash)}; - for (auto& output : processor.outputs) { - if (DataSpecUtils::match(output, summaryMatcher)) { - O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s", - DataSpecUtils::describe(output).c_str(), processor.name.c_str()); - hasMatch = true; - break; - } + auto summaryOutput = std::find_if(processor.outputs.begin(), processor.outputs.end(), [&summaryMatcher](auto const& output) { return DataSpecUtils::match(output, summaryMatcher); }); + if (summaryOutput != processor.outputs.end()) { + O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "%{public}s already there in %{public}s", + DataSpecUtils::describe(*summaryOutput).c_str(), processor.name.c_str()); + hasMatch = true; } + if (!hasMatch) { O2_SIGNPOST_EVENT_EMIT(workflow_helpers, sid, "output enumeration", "Adding DPL/SUMMARY/%d to %{public}s", hash, processor.name.c_str()); processor.outputs.push_back(OutputSpec{{"dpl-summary"}, ConcreteDataMatcher{"DPL", "SUMMARY", static_cast(hash)}}); @@ -339,18 +314,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration}); } break; case Lifetime::Condition: { - for (auto& option : processor.options) { - if (option.name == "condition-backend") { - hasConditionOption = true; - break; - } - } - if (hasConditionOption == false) { + requestedCCDBs.emplace_back(input); + if ((hasConditionOption == false) && std::none_of(processor.options.begin(), processor.options.end(), [](auto const& option) { return (option.name.compare("condition-backend") == 0); })) { processor.options.emplace_back(ConfigParamSpec{"condition-backend", VariantType::String, defaultConditionBackend(), {"URL for CCDB"}}); processor.options.emplace_back(ConfigParamSpec{"condition-timestamp", VariantType::Int64, 0ll, {"Force timestamp for CCDB lookup"}}); hasConditionOption = true; } - requestedCCDBs.emplace_back(input); } break; case Lifetime::OutOfBand: { auto concrete = DataSpecUtils::asConcreteDataMatcher(input); @@ -422,14 +391,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs}; DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode(); if (deploymentMode != DeploymentMode::OnlineDDS && deploymentMode != DeploymentMode::OnlineECS) { - AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend); + AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedTIMs, analysisCCDBBackend); } - for (auto& input : ac.requestedDYNs) { - if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) { - ac.spawnerInputs.emplace_back(input); - } - } + ac.requestedDYNs | views::filter_not_matching(ac.providedDYNs) | sinks::append_to{ac.spawnerInputs}; DataProcessorSpec aodSpawner{ "internal-dpl-aod-spawner", @@ -440,6 +405,9 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, aodSpawner); AnalysisSupportHelpers::addMissingOutputsToReader(ac.providedAODs, ac.requestedAODs, aodReader); + + std::sort(requestedCCDBs.begin(), requestedCCDBs.end(), inputSpecLessThan); + std::sort(providedCCDBs.begin(), providedCCDBs.end(), outputSpecLessThan); AnalysisSupportHelpers::addMissingOutputsToReader(providedCCDBs, requestedCCDBs, ccdbBackend); std::vector extraSpecs;