Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Framework/Core/include/Framework/FairMQDeviceProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class FairMQDeviceProxy
FairMQDeviceProxy() = default;
FairMQDeviceProxy(FairMQDeviceProxy const&) = delete;
void bind(std::vector<OutputRoute> const& outputs, std::vector<InputRoute> const& inputs,
std::vector<ForwardRoute> const& forwards, fair::mq::Device& device);
std::vector<ForwardRoute> const& forwards,
std::function<fair::mq::Channel&(std::string const&)> bindChannelByName,
std::function<bool(void)> newStateRequestedCallback);

/// Retrieve the transport associated to a given route.
[[nodiscard]] OutputRoute const& getOutputRoute(RouteIndex routeIndex) const { return mOutputs.at(routeIndex.value); }
Expand Down
13 changes: 12 additions & 1 deletion Framework/Core/src/CommonMessageBackends.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,18 @@ o2::framework::ServiceSpec CommonMessageBackends::fairMQDeviceProxy()
/// some of the channels are added only later on to the party,
/// (e.g. by ECS) and Init might not be late enough to
/// account for them.
proxy->bind(outputs, inputs, forwards, *device); },
std::function<fair::mq::Channel&(std::string const&)> bindByName = [device](std::string const& channelName) -> fair::mq::Channel& {
auto channel = device->GetChannels().find(channelName);
if (channel == device->GetChannels().end()) {
LOGP(fatal, "Expected channel {} not configured.", channelName);
}
return channel->second.at(0);
};

std::function<bool()> newStateCallback = [device]() -> bool {
return device->NewStatePending();
};
proxy->bind(outputs, inputs, forwards, bindByName, newStateCallback); },
};
}

Expand Down
13 changes: 12 additions & 1 deletion Framework/Core/src/ExternalFairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,18 @@ DataProcessorSpec specifyFairMQDeviceMultiOutputProxy(char const* name,

channelNames->emplace_back(std::move(channel));
}
proxy.bind(mutableDeviceSpec.outputs, mutableDeviceSpec.inputs, mutableDeviceSpec.forwards, *device);
std::function<fair::mq::Channel&(std::string const&)> bindByName = [device](std::string const& channelName) -> fair::mq::Channel& {
auto channel = device->GetChannels().find(channelName);
if (channel == device->GetChannels().end()) {
LOGP(fatal, "Expected channel {} not configured.", channelName);
}
return channel->second.at(0);
};

std::function<bool()> newStateCallback = [device]() -> bool {
return device->NewStatePending();
};
proxy.bind(mutableDeviceSpec.outputs, mutableDeviceSpec.inputs, mutableDeviceSpec.forwards, bindByName, newStateCallback);
};
// We need to clear the channels on stop, because we will check and add them
auto channelConfigurationDisposer = [&deviceSpec]() {
Expand Down
28 changes: 11 additions & 17 deletions Framework/Core/src/FairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ std::unique_ptr<fair::mq::Message> FairMQDeviceProxy::createForwardMessage(Route

void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vector<InputRoute> const& inputs,
std::vector<ForwardRoute> const& forwards,
fair::mq::Device& device)
std::function<fair::mq::Channel&(std::string const&)> bindChannelByName,
std::function<bool(void)> newStatePending)
{
mOutputs.clear();
mOutputRoutes.clear();
Expand Down Expand Up @@ -258,14 +259,11 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto
if (channelPos == channelNameToChannel.end()) {
channelIndex = ChannelIndex{(int)mOutputChannelInfos.size()};
ChannelAccountingType dplChannel = (route.channel.rfind("from_", 0) == 0) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ;
auto channel = device.GetChannels().find(route.channel);
if (channel == device.GetChannels().end()) {
LOGP(fatal, "Expected channel {} not configured.", route.channel);
}
auto& channel = bindChannelByName(route.channel);
OutputChannelInfo info{
.name = route.channel,
.channelType = dplChannel,
.channel = channel->second.at(0),
.channel = channel,
.policy = route.policy,
.index = channelIndex,
};
Expand Down Expand Up @@ -305,11 +303,9 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto

if (channelPos == channelNameToChannel.end()) {
channelIndex = ChannelIndex{(int)mInputChannels.size()};
auto channel = device.GetChannels().find(route.sourceChannel);
if (channel == device.GetChannels().end()) {
LOGP(fatal, "Expected channel {} not configured.", route.sourceChannel);
}
mInputChannels.push_back(&channel->second.at(0));
fair::mq::Channel& channel = bindChannelByName(route.sourceChannel);

mInputChannels.push_back(&channel);
mInputChannelNames.push_back(route.sourceChannel);
channelNameToChannel[route.sourceChannel] = channelIndex;
LOGP(detail, "Binding channel {} to channel index {}", route.sourceChannel, channelIndex.value);
Expand Down Expand Up @@ -341,12 +337,10 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto

if (channelPos == channelNameToChannel.end()) {
channelIndex = ChannelIndex{(int)mForwardChannelInfos.size()};
auto channel = device.GetChannels().find(route.channel);
if (channel == device.GetChannels().end()) {
LOGP(fatal, "Expected channel {} not configured.", route.channel);
}
auto& channel = bindChannelByName(route.channel);

ChannelAccountingType dplChannel = (route.channel.rfind("from_", 0) == 0) ? ChannelAccountingType::DPL : ChannelAccountingType::RAWFMQ;
mForwardChannelInfos.push_back(ForwardChannelInfo{.name = route.channel, .channelType = dplChannel, .channel = channel->second.at(0), .policy = route.policy, .index = channelIndex});
mForwardChannelInfos.push_back(ForwardChannelInfo{.name = route.channel, .channelType = dplChannel, .channel = channel, .policy = route.policy, .index = channelIndex});
mForwardChannelStates.push_back(ForwardChannelState{0});
channelNameToChannel[route.channel] = channelIndex;
LOGP(detail, "Binding forward channel {} to channel index {}", route.channel, channelIndex.value);
Expand All @@ -368,6 +362,6 @@ void FairMQDeviceProxy::bind(std::vector<OutputRoute> const& outputs, std::vecto
LOGP(detail, "Forward route {}@{}%{} to index {} and channelIndex {}", DataSpecUtils::describe(route.matcher), route.timeslice, route.maxTimeslices, fi, state.channel.value);
}
}
mStateChangeCallback = [&device]() -> bool { return device.NewStatePending(); };
mStateChangeCallback = newStatePending;
}
} // namespace o2::framework