From e65284191e052a54a657ade29664e15f6102ab9e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Tue, 2 Dec 2025 10:58:39 +0100 Subject: [PATCH] DPL: remove direct dependency on fair::mq::Device from the FairMQDeviceProxy --- .../include/Framework/FairMQDeviceProxy.h | 4 ++- Framework/Core/src/CommonMessageBackends.cxx | 13 ++++++++- .../Core/src/ExternalFairMQDeviceProxy.cxx | 13 ++++++++- Framework/Core/src/FairMQDeviceProxy.cxx | 28 ++++++++----------- 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/Framework/Core/include/Framework/FairMQDeviceProxy.h b/Framework/Core/include/Framework/FairMQDeviceProxy.h index ab0d094c18486..dbdade465f09c 100644 --- a/Framework/Core/include/Framework/FairMQDeviceProxy.h +++ b/Framework/Core/include/Framework/FairMQDeviceProxy.h @@ -38,7 +38,9 @@ class FairMQDeviceProxy FairMQDeviceProxy() = default; FairMQDeviceProxy(FairMQDeviceProxy const&) = delete; void bind(std::vector const& outputs, std::vector const& inputs, - std::vector const& forwards, fair::mq::Device& device); + std::vector const& forwards, + std::function bindChannelByName, + std::function newStateRequestedCallback); /// Retrieve the transport associated to a given route. [[nodiscard]] OutputRoute const& getOutputRoute(RouteIndex routeIndex) const { return mOutputs.at(routeIndex.value); } diff --git a/Framework/Core/src/CommonMessageBackends.cxx b/Framework/Core/src/CommonMessageBackends.cxx index 79bd84307df15..25bf6a138dee4 100644 --- a/Framework/Core/src/CommonMessageBackends.cxx +++ b/Framework/Core/src/CommonMessageBackends.cxx @@ -57,7 +57,18 @@ o2::framework::ServiceSpec CommonMessageBackends::fairMQDeviceProxy() /// some of the channels are added only later on to the party, /// (e.g. by ECS) and Init might not be late enough to /// account for them. - proxy->bind(outputs, inputs, forwards, *device); }, + std::function bindByName = [device](std::string const& channelName) -> fair::mq::Channel& { + auto channel = device->GetChannels().find(channelName); + if (channel == device->GetChannels().end()) { + LOGP(fatal, "Expected channel {} not configured.", channelName); + } + return channel->second.at(0); + }; + + std::function newStateCallback = [device]() -> bool { + return device->NewStatePending(); + }; + proxy->bind(outputs, inputs, forwards, bindByName, newStateCallback); }, }; } diff --git a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx index 99176de0d9db6..b4bfc991db9ae 100644 --- a/Framework/Core/src/ExternalFairMQDeviceProxy.cxx +++ b/Framework/Core/src/ExternalFairMQDeviceProxy.cxx @@ -1090,7 +1090,18 @@ DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const* name, channelNames->emplace_back(std::move(channel)); } - proxy.bind(mutableDeviceSpec.outputs, mutableDeviceSpec.inputs, mutableDeviceSpec.forwards, *device); + std::function bindByName = [device](std::string const& channelName) -> fair::mq::Channel& { + auto channel = device->GetChannels().find(channelName); + if (channel == device->GetChannels().end()) { + LOGP(fatal, "Expected channel {} not configured.", channelName); + } + return channel->second.at(0); + }; + + std::function newStateCallback = [device]() -> bool { + return device->NewStatePending(); + }; + proxy.bind(mutableDeviceSpec.outputs, mutableDeviceSpec.inputs, mutableDeviceSpec.forwards, bindByName, newStateCallback); }; // We need to clear the channels on stop, because we will check and add them auto channelConfigurationDisposer = [&deviceSpec]() { diff --git a/Framework/Core/src/FairMQDeviceProxy.cxx b/Framework/Core/src/FairMQDeviceProxy.cxx index bdffddd5a4d1a..e121084b866a2 100644 --- a/Framework/Core/src/FairMQDeviceProxy.cxx +++ b/Framework/Core/src/FairMQDeviceProxy.cxx @@ -230,7 +230,8 @@ std::unique_ptr FairMQDeviceProxy::createForwardMessage(Route void FairMQDeviceProxy::bind(std::vector const& outputs, std::vector const& inputs, std::vector const& forwards, - fair::mq::Device& device) + std::function bindChannelByName, + std::function newStatePending) { mOutputs.clear(); mOutputRoutes.clear(); @@ -258,14 +259,11 @@ void FairMQDeviceProxy::bind(std::vector const& outputs, std::vecto if (channelPos == channelNameToChannel.end()) { channelIndex = ChannelIndex{(int)mOutputChannelInfos.size()}; ChannelAccountingType dplChannel = (route.channel.rfind("from_", 0) == 0) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ; - auto channel = device.GetChannels().find(route.channel); - if (channel == device.GetChannels().end()) { - LOGP(fatal, "Expected channel {} not configured.", route.channel); - } + auto& channel = bindChannelByName(route.channel); OutputChannelInfo info{ .name = route.channel, .channelType = dplChannel, - .channel = channel->second.at(0), + .channel = channel, .policy = route.policy, .index = channelIndex, }; @@ -305,11 +303,9 @@ void FairMQDeviceProxy::bind(std::vector const& outputs, std::vecto if (channelPos == channelNameToChannel.end()) { channelIndex = ChannelIndex{(int)mInputChannels.size()}; - auto channel = device.GetChannels().find(route.sourceChannel); - if (channel == device.GetChannels().end()) { - LOGP(fatal, "Expected channel {} not configured.", route.sourceChannel); - } - mInputChannels.push_back(&channel->second.at(0)); + fair::mq::Channel& channel = bindChannelByName(route.sourceChannel); + + mInputChannels.push_back(&channel); mInputChannelNames.push_back(route.sourceChannel); channelNameToChannel[route.sourceChannel] = channelIndex; LOGP(detail, "Binding channel {} to channel index {}", route.sourceChannel, channelIndex.value); @@ -341,12 +337,10 @@ void FairMQDeviceProxy::bind(std::vector const& outputs, std::vecto if (channelPos == channelNameToChannel.end()) { channelIndex = ChannelIndex{(int)mForwardChannelInfos.size()}; - auto channel = device.GetChannels().find(route.channel); - if (channel == device.GetChannels().end()) { - LOGP(fatal, "Expected channel {} not configured.", route.channel); - } + auto& channel = bindChannelByName(route.channel); + ChannelAccountingType dplChannel = (route.channel.rfind("from_", 0) == 0) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ; - mForwardChannelInfos.push_back(ForwardChannelInfo{.name = route.channel, .channelType = dplChannel, .channel = channel->second.at(0), .policy = route.policy, .index = channelIndex}); + mForwardChannelInfos.push_back(ForwardChannelInfo{.name = route.channel, .channelType = dplChannel, .channel = channel, .policy = route.policy, .index = channelIndex}); mForwardChannelStates.push_back(ForwardChannelState{0}); channelNameToChannel[route.channel] = channelIndex; LOGP(detail, "Binding forward channel {} to channel index {}", route.channel, channelIndex.value); @@ -368,6 +362,6 @@ void FairMQDeviceProxy::bind(std::vector const& outputs, std::vecto LOGP(detail, "Forward route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, fi, state.channel.value); } } - mStateChangeCallback = [&device]() -> bool { return device.NewStatePending(); }; + mStateChangeCallback = newStatePending; } } // namespace o2::framework