Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Framework/Core/include/Framework/CommonServices.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ struct CommonServices {
return [](InitContext&, void* service) -> void* { return service; };
}

static ServiceSpec deviceContextSpec();
static ServiceSpec dataProcessorContextSpec();
static ServiceSpec driverClientSpec();
static ServiceSpec monitoringSpec();
Expand Down
3 changes: 1 addition & 2 deletions Framework/Core/include/Framework/DataProcessingDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ struct DeviceConfigurationHelpers {
class DataProcessingDevice : public fair::mq::Device
{
public:
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&, ProcessingPolicies& policies);
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&);
void Init() final;
void InitTask() final;
void PreRun() final;
Expand Down Expand Up @@ -112,7 +112,6 @@ class DataProcessingDevice : public fair::mq::Device
uint64_t mBeginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started
std::vector<fair::mq::RegionInfo> mPendingRegionInfos; /// A list of the region infos not yet notified.
std::mutex mRegionInfoMutex;
ProcessingPolicies mProcessingPolicies; /// User policies related to data processing
std::vector<uv_work_t> mHandles; /// Handles to use to schedule work.
std::vector<TaskStreamInfo> mStreams; /// Information about the task running in the associated mHandle.
/// Handle to wake up the main loop from other threads
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/DeviceContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ typedef struct uv_signal_s uv_signal_t;
namespace o2::framework
{
struct ComputingQuotaStats;
struct ProcessingPolicies;

/// Stucture which holds the whole runtime context
/// of a running device which is not stored as
Expand All @@ -33,6 +34,7 @@ struct DeviceContext {
int expectedRegionCallbacks = 0;
int exitTransitionTimeout = 0;
int dataProcessingTimeout = 0;
ProcessingPolicies& processingPolicies;
};

} // namespace o2::framework
Expand Down
11 changes: 0 additions & 11 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1237,17 +1237,6 @@ o2::framework::ServiceSpec CommonServices::dataProcessorContextSpec()
.kind = ServiceKind::Serial};
}

o2::framework::ServiceSpec CommonServices::deviceContextSpec()
{
return ServiceSpec{
.name = "device-context",
.init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle {
return ServiceHandle{TypeIdHelpers::uniqueId<DeviceContext>(), new DeviceContext()};
},
.configure = noConfiguration(),
.kind = ServiceKind::Serial};
}

o2::framework::ServiceSpec CommonServices::dataAllocatorSpec()
{
return ServiceSpec{
Expand Down
119 changes: 64 additions & 55 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "Framework/DataProcessor.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/DeviceState.h"
#include "Framework/DeviceStateEnums.h"
#include "Framework/DispatchPolicy.h"
#include "Framework/DispatchControl.h"
#include "Framework/DanglingContext.h"
Expand Down Expand Up @@ -196,11 +197,10 @@ struct locked_execution {
~locked_execution() { ref.unlock(); }
};

DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry, ProcessingPolicies& policies)
DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry)
: mRunningDevice{running},
mConfigRegistry{nullptr},
mServiceRegistry{registry},
mProcessingPolicies{policies}
mServiceRegistry{registry}
{
GetConfig()->Subscribe<std::string>("dpl", [&registry = mServiceRegistry](const std::string& key, std::string value) {
if (key == "cleanup") {
Expand Down Expand Up @@ -247,6 +247,7 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
mHandles.resize(1);

ServiceRegistryRef ref{mServiceRegistry};

mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
auto& state = ref.get<DeviceState>();
assert(state.loop);
Expand Down Expand Up @@ -1189,18 +1190,18 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
errorCallback(errorContext);
};
} else {
context.errorHandling = [&errorPolicy = mProcessingPolicies.error,
&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
context.errorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) {
auto& err = error_from_ref(e);
/// FIXME: we should pass the salt in, so that the message
/// can access information which were stored in the stream.
ServiceRegistryRef ref{serviceRegistry, ServiceRegistry::globalDeviceSalt()};
auto& context = ref.get<DataProcessorContext>();
auto& deviceContext = ref.get<DeviceContext>();
O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context);
BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO);
auto& stats = ref.get<DataProcessingStats>();
stats.updateStats({(int)ProcessingStatsId::EXCEPTION_COUNT, DataProcessingStats::Op::Add, 1});
switch (errorPolicy) {
switch (deviceContext.processingPolicies.error) {
case TerminationPolicy::QUIT:
O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what);
throw e;
Expand All @@ -1211,10 +1212,10 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
};
}

auto decideEarlyForward = [&context, &spec, this]() -> bool {
auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> bool {
/// We must make sure there is no optional
/// if we want to optimize the forwarding
bool canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
bool canForwardEarly = (spec.forwards.empty() == false) && deviceContext.processingPolicies.earlyForward != EarlyForwardPolicy::NEVER;
bool onlyConditions = true;
bool overriddenEarlyForward = false;
for (auto& forwarded : spec.forwards) {
Expand All @@ -1229,7 +1230,7 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont
break;
}
#endif
if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && deviceContext.processingPolicies.earlyForward == EarlyForwardPolicy::NORAW) {
context.canForwardEarly = false;
overriddenEarlyForward = true;
LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher);
Expand Down Expand Up @@ -1330,6 +1331,58 @@ void DataProcessingDevice::Reset()
ref.get<CallbackService>().call<CallbackService::Id::Reset>();
}

TransitionHandlingState updateStateTransition(ServiceRegistryRef& ref, ProcessingPolicies const& policies)
{
auto& state = ref.get<DeviceState>();
auto& deviceProxy = ref.get<FairMQDeviceProxy>();
if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) {
return state.transitionHandling;
}
O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop);
auto& deviceContext = ref.get<DeviceContext>();
// Check if we only have timers
auto& spec = ref.get<DeviceSpec const>();
if (hasOnlyTimers(spec)) {
switchState(ref, StreamingState::EndOfStreaming);
}

// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
}
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
deviceContext.exitTransitionTimeout);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
bool onlyGenerated = hasOnlyGenerated(spec);
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
"New state requested. Waiting for %d seconds before %{public}s",
timeout,
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
}
return TransitionHandlingState::Requested;
} else {
if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
} else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
} else if (policies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
}
return TransitionHandlingState::Expired;
}
}

void DataProcessingDevice::Run()
{
ServiceRegistryRef ref{mServiceRegistry};
Expand Down Expand Up @@ -1382,51 +1435,7 @@ void DataProcessingDevice::Run()
shouldNotWait = true;
state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING;
}
if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
state.transitionHandling = TransitionHandlingState::Requested;
auto& deviceContext = ref.get<DeviceContext>();
// Check if we only have timers
auto& spec = ref.get<DeviceSpec const>();
if (hasOnlyTimers(spec)) {
switchState(ref, StreamingState::EndOfStreaming);
}

// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
}
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
state.transitionHandling = TransitionHandlingState::Requested;
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.",
deviceContext.exitTransitionTimeout);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
bool onlyGenerated = hasOnlyGenerated(spec);
int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout;
if (mProcessingPolicies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout);
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop",
"New state requested. Waiting for %d seconds before %{public}s",
timeout,
onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state.");
}
} else {
state.transitionHandling = TransitionHandlingState::Expired;
if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy");
} else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately");
} else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
}
}
}
state.transitionHandling = updateStateTransition(ref, ref.get<DeviceContext>().processingPolicies);
// If we are Idle, we can then consider the transition to be expired.
if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
Expand Down Expand Up @@ -1560,7 +1569,7 @@ void DataProcessingDevice::Run()
}
}

O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling);
O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling);
auto& spec = ref.get<DeviceSpec const>();
/// Cleanup messages which are still pending on exit.
for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) {
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1092,13 +1092,13 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
quotaEvaluator = std::make_unique<ComputingQuotaEvaluator>(serviceRef);
serviceRef.registerService(ServiceRegistryHelpers::handleForService<ComputingQuotaEvaluator>(quotaEvaluator.get()));

deviceContext = std::make_unique<DeviceContext>();
deviceContext = std::make_unique<DeviceContext>(DeviceContext{.processingPolicies = processingPolicies});
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceSpec const>(&spec));
serviceRef.registerService(ServiceRegistryHelpers::handleForService<RunningWorkflowInfo const>(&runningWorkflow));
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DeviceContext>(deviceContext.get()));
serviceRef.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));

auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry, processingPolicies);
auto device = std::make_unique<DataProcessingDevice>(ref, serviceRegistry);

serviceRef.get<RawDeviceService>().setDevice(device.get());
r.fDevice = std::move(device);
Expand Down
Loading