From 1bac664a6bafef7f2475d0a887a4b1597d6e6140 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Tue, 16 Dec 2025 16:42:24 +0100 Subject: [PATCH] DPL: allow to disable oldest possible timeframe propagation with a label This allows to disable all DomainInfoHeader propagation with a corresponding DataProcessorLabel. It addresses the issue reported in QC-1320, where remote QC workflows were getting flooded with a DIH for each QC task instance in the setup. --- Framework/Core/CMakeLists.txt | 1 + .../Core/include/Framework/CommonLabels.h | 26 +++++++++++++++++++ Framework/Core/src/CommonLabels.cxx | 19 ++++++++++++++ Framework/Core/src/CommonServices.cxx | 7 +++++ Framework/Core/src/DataProcessingHelpers.cxx | 7 +++++ Framework/Core/src/DecongestionService.h | 2 ++ 6 files changed, 62 insertions(+) create mode 100644 Framework/Core/include/Framework/CommonLabels.h create mode 100644 Framework/Core/src/CommonLabels.cxx diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index fe8a91eaa0449..1daba5dbc9798 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -108,6 +108,7 @@ o2_add_library(Framework src/SimpleOptionsRetriever.cxx src/O2ControlHelpers.cxx src/O2ControlLabels.cxx + src/CommonLabels.cxx src/O2ControlParameters.cxx src/O2DataModelHelpers.cxx src/OutputSpec.cxx diff --git a/Framework/Core/include/Framework/CommonLabels.h b/Framework/Core/include/Framework/CommonLabels.h new file mode 100644 index 0000000000000..8be41a33af41d --- /dev/null +++ b/Framework/Core/include/Framework/CommonLabels.h @@ -0,0 +1,26 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef O2_FRAMEWORK_COMMONLABELS_H +#define O2_FRAMEWORK_COMMONLABELS_H + +#include "Framework/DataProcessorLabel.h" + +namespace o2::framework +{ + +// Label to disable forwarding/advertising of DomainInfoHeader (oldest possible outputs) +// When present on a DataProcessor, no DomainInfoHeader messages will be sent downstream. +const extern DataProcessorLabel suppressDomainInfoLabel; + +} // namespace o2::framework + +#endif // O2_FRAMEWORK_COMMONLABELS_H diff --git a/Framework/Core/src/CommonLabels.cxx b/Framework/Core/src/CommonLabels.cxx new file mode 100644 index 0000000000000..f728e194f611b --- /dev/null +++ b/Framework/Core/src/CommonLabels.cxx @@ -0,0 +1,19 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "Framework/CommonLabels.h" + +namespace o2::framework +{ + +const DataProcessorLabel suppressDomainInfoLabel = {"suppress-domain-info"}; + +} // namespace o2::framework diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index 3aa46269bdd7e..f786d99fd2c0d 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -45,6 +45,7 @@ #include "Framework/DefaultsHelpers.h" #include "Framework/Signpost.h" #include "Framework/DriverConfig.h" +#include "Framework/CommonLabels.h" #include "TextDriverClient.h" #include "WSDriverClient.h" @@ -604,6 +605,12 @@ o2::framework::ServiceSpec break; } } + for (const auto& label : services.get().labels) { + if (label == suppressDomainInfoLabel) { + decongestion->suppressDomainInfo = true; + break; + } + } auto& queue = services.get(); decongestion->oldestPossibleTimesliceTask = AsyncQueueHelpers::create(queue, {.name = "oldest-possible-timeslice", .score = 100}); return ServiceHandle{TypeIdHelpers::uniqueId(), decongestion, ServiceKind::Serial}; diff --git a/Framework/Core/src/DataProcessingHelpers.cxx b/Framework/Core/src/DataProcessingHelpers.cxx index 9c53bbf8b2c10..aea682a8d00c3 100644 --- a/Framework/Core/src/DataProcessingHelpers.cxx +++ b/Framework/Core/src/DataProcessingHelpers.cxx @@ -34,6 +34,7 @@ #include "Framework/DeviceStateEnums.h" #include "Headers/DataHeader.h" #include "Framework/DataProcessingHeader.h" +#include "DecongestionService.h" #include #include @@ -83,6 +84,9 @@ void doSendOldestPossibleTimeframe(ServiceRegistryRef ref, fair::mq::TransportFa bool DataProcessingHelpers::sendOldestPossibleTimeframe(ServiceRegistryRef const& ref, ForwardChannelInfo const& info, ForwardChannelState& state, size_t timeslice) { + if (ref.get().suppressDomainInfo) { + return false; + } if (state.oldestForChannel.value >= timeslice) { return false; } @@ -93,6 +97,9 @@ bool DataProcessingHelpers::sendOldestPossibleTimeframe(ServiceRegistryRef const bool DataProcessingHelpers::sendOldestPossibleTimeframe(ServiceRegistryRef const& ref, OutputChannelInfo const& info, OutputChannelState& state, size_t timeslice) { + if (ref.get().suppressDomainInfo) { + return false; + } if (state.oldestForChannel.value >= timeslice) { return false; } diff --git a/Framework/Core/src/DecongestionService.h b/Framework/Core/src/DecongestionService.h index c45e9a36217ec..1a42d3577bc0a 100644 --- a/Framework/Core/src/DecongestionService.h +++ b/Framework/Core/src/DecongestionService.h @@ -18,6 +18,8 @@ namespace o2::framework struct DecongestionService { /// Wether we are a source in the processing chain bool isFirstInTopology = true; + /// do not advertise/forward DomainInfoHeader from this device + bool suppressDomainInfo = false; /// The last timeslice which the ExpirationHandler::Creator callback /// created. This can be used to skip dummy iterations. size_t nextEnumerationTimeslice = 0;