diff --git a/Framework/Core/include/Framework/CommonServices.h b/Framework/Core/include/Framework/CommonServices.h index 69f3152c0ba76..f5080fcac28db 100644 --- a/Framework/Core/include/Framework/CommonServices.h +++ b/Framework/Core/include/Framework/CommonServices.h @@ -56,7 +56,6 @@ struct CommonServices { return [](InitContext&, void* service) -> void* { return service; }; } - static ServiceSpec deviceContextSpec(); static ServiceSpec dataProcessorContextSpec(); static ServiceSpec driverClientSpec(); static ServiceSpec monitoringSpec(); diff --git a/Framework/Core/include/Framework/DataProcessingDevice.h b/Framework/Core/include/Framework/DataProcessingDevice.h index 67edaa99e532b..b2281274acc87 100644 --- a/Framework/Core/include/Framework/DataProcessingDevice.h +++ b/Framework/Core/include/Framework/DataProcessingDevice.h @@ -77,7 +77,7 @@ struct DeviceConfigurationHelpers { class DataProcessingDevice : public fair::mq::Device { public: - DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&, ProcessingPolicies& policies); + DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&); void Init() final; void InitTask() final; void PreRun() final; @@ -112,7 +112,6 @@ class DataProcessingDevice : public fair::mq::Device uint64_t mBeginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started std::vector mPendingRegionInfos; /// A list of the region infos not yet notified. std::mutex mRegionInfoMutex; - ProcessingPolicies mProcessingPolicies; /// User policies related to data processing std::vector mHandles; /// Handles to use to schedule work. std::vector mStreams; /// Information about the task running in the associated mHandle. /// Handle to wake up the main loop from other threads diff --git a/Framework/Core/include/Framework/DeviceContext.h b/Framework/Core/include/Framework/DeviceContext.h index 4593e5e819ccf..a392004c2ffbf 100644 --- a/Framework/Core/include/Framework/DeviceContext.h +++ b/Framework/Core/include/Framework/DeviceContext.h @@ -21,6 +21,7 @@ typedef struct uv_signal_s uv_signal_t; namespace o2::framework { struct ComputingQuotaStats; +struct ProcessingPolicies; /// Stucture which holds the whole runtime context /// of a running device which is not stored as @@ -33,6 +34,7 @@ struct DeviceContext { int expectedRegionCallbacks = 0; int exitTransitionTimeout = 0; int dataProcessingTimeout = 0; + ProcessingPolicies& processingPolicies; }; } // namespace o2::framework diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index 5a2876e074d29..091cd9d4ed0a5 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -1237,17 +1237,6 @@ o2::framework::ServiceSpec CommonServices::dataProcessorContextSpec() .kind = ServiceKind::Serial}; } -o2::framework::ServiceSpec CommonServices::deviceContextSpec() -{ - return ServiceSpec{ - .name = "device-context", - .init = [](ServiceRegistryRef, DeviceState&, fair::mq::ProgOptions&) -> ServiceHandle { - return ServiceHandle{TypeIdHelpers::uniqueId(), new DeviceContext()}; - }, - .configure = noConfiguration(), - .kind = ServiceKind::Serial}; -} - o2::framework::ServiceSpec CommonServices::dataAllocatorSpec() { return ServiceSpec{ diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index a41aa3a886d55..aa194b525ca5d 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -17,6 +17,7 @@ #include "Framework/DataProcessor.h" #include "Framework/DataSpecUtils.h" #include "Framework/DeviceState.h" +#include "Framework/DeviceStateEnums.h" #include "Framework/DispatchPolicy.h" #include "Framework/DispatchControl.h" #include "Framework/DanglingContext.h" @@ -196,11 +197,10 @@ struct locked_execution { ~locked_execution() { ref.unlock(); } }; -DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry, ProcessingPolicies& policies) +DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegistry& registry) : mRunningDevice{running}, mConfigRegistry{nullptr}, - mServiceRegistry{registry}, - mProcessingPolicies{policies} + mServiceRegistry{registry} { GetConfig()->Subscribe("dpl", [®istry = mServiceRegistry](const std::string& key, std::string value) { if (key == "cleanup") { @@ -247,6 +247,7 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi mHandles.resize(1); ServiceRegistryRef ref{mServiceRegistry}; + mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t)); auto& state = ref.get(); assert(state.loop); @@ -1189,18 +1190,18 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont errorCallback(errorContext); }; } else { - context.errorHandling = [&errorPolicy = mProcessingPolicies.error, - &serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) { + context.errorHandling = [&serviceRegistry = mServiceRegistry](RuntimeErrorRef e, InputRecord& record) { auto& err = error_from_ref(e); /// FIXME: we should pass the salt in, so that the message /// can access information which were stored in the stream. ServiceRegistryRef ref{serviceRegistry, ServiceRegistry::globalDeviceSalt()}; auto& context = ref.get(); + auto& deviceContext = ref.get(); O2_SIGNPOST_ID_FROM_POINTER(cid, device, &context); BacktraceHelpers::demangled_backtrace_symbols(err.backtrace, err.maxBacktrace, STDERR_FILENO); auto& stats = ref.get(); stats.updateStats({(int)ProcessingStatsId::EXCEPTION_COUNT, DataProcessingStats::Op::Add, 1}); - switch (errorPolicy) { + switch (deviceContext.processingPolicies.error) { case TerminationPolicy::QUIT: O2_SIGNPOST_EVENT_EMIT_ERROR(device, cid, "Run", "Exception while running: %{public}s. Rethrowing.", err.what); throw e; @@ -1211,10 +1212,10 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont }; } - auto decideEarlyForward = [&context, &spec, this]() -> bool { + auto decideEarlyForward = [&context, &deviceContext, &spec, this]() -> bool { /// We must make sure there is no optional /// if we want to optimize the forwarding - bool canForwardEarly = (spec.forwards.empty() == false) && mProcessingPolicies.earlyForward != EarlyForwardPolicy::NEVER; + bool canForwardEarly = (spec.forwards.empty() == false) && deviceContext.processingPolicies.earlyForward != EarlyForwardPolicy::NEVER; bool onlyConditions = true; bool overriddenEarlyForward = false; for (auto& forwarded : spec.forwards) { @@ -1229,7 +1230,7 @@ void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceCont break; } #endif - if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && mProcessingPolicies.earlyForward == EarlyForwardPolicy::NORAW) { + if (DataSpecUtils::partialMatch(forwarded.matcher, o2::header::DataDescription{"RAWDATA"}) && deviceContext.processingPolicies.earlyForward == EarlyForwardPolicy::NORAW) { context.canForwardEarly = false; overriddenEarlyForward = true; LOG(detail) << "Cannot forward early because of RAWDATA input: " << DataSpecUtils::describe(forwarded.matcher); @@ -1330,6 +1331,58 @@ void DataProcessingDevice::Reset() ref.get().call(); } +TransitionHandlingState updateStateTransition(ServiceRegistryRef& ref, ProcessingPolicies const& policies) +{ + auto& state = ref.get(); + auto& deviceProxy = ref.get(); + if (state.transitionHandling != TransitionHandlingState::NoTransition || deviceProxy.newStateRequested() == false) { + return state.transitionHandling; + } + O2_SIGNPOST_ID_FROM_POINTER(lid, device, state.loop); + auto& deviceContext = ref.get(); + // Check if we only have timers + auto& spec = ref.get(); + if (hasOnlyTimers(spec)) { + switchState(ref, StreamingState::EndOfStreaming); + } + + // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout + if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) { + uv_update_time(state.loop); + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout); + uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0); + } + if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) { + ref.get().call(ServiceRegistryRef{ref}); + uv_update_time(state.loop); + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", + deviceContext.exitTransitionTimeout); + uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0); + bool onlyGenerated = hasOnlyGenerated(spec); + int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout; + if (policies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout); + } else { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", + "New state requested. Waiting for %d seconds before %{public}s", + timeout, + onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state."); + } + return TransitionHandlingState::Requested; + } else { + if (deviceContext.exitTransitionTimeout == 0 && policies.termination == TerminationPolicy::QUIT) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy"); + } else if (deviceContext.exitTransitionTimeout == 0 && policies.termination != TerminationPolicy::QUIT) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately"); + } else if (policies.termination == TerminationPolicy::QUIT) { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy"); + } else { + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately."); + } + return TransitionHandlingState::Expired; + } +} + void DataProcessingDevice::Run() { ServiceRegistryRef ref{mServiceRegistry}; @@ -1382,51 +1435,7 @@ void DataProcessingDevice::Run() shouldNotWait = true; state.loopReason |= DeviceState::LoopReason::NEW_STATE_PENDING; } - if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) { - state.transitionHandling = TransitionHandlingState::Requested; - auto& deviceContext = ref.get(); - // Check if we only have timers - auto& spec = ref.get(); - if (hasOnlyTimers(spec)) { - switchState(ref, StreamingState::EndOfStreaming); - } - - // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout - if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) { - uv_update_time(state.loop); - O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout); - uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0); - } - if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) { - state.transitionHandling = TransitionHandlingState::Requested; - ref.get().call(ServiceRegistryRef{ref}); - uv_update_time(state.loop); - O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", - deviceContext.exitTransitionTimeout); - uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0); - bool onlyGenerated = hasOnlyGenerated(spec); - int timeout = onlyGenerated ? deviceContext.dataProcessingTimeout : deviceContext.exitTransitionTimeout; - if (mProcessingPolicies.termination == TerminationPolicy::QUIT && DefaultsHelpers::onlineDeploymentMode() == false) { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", timeout); - } else { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", - "New state requested. Waiting for %d seconds before %{public}s", - timeout, - onlyGenerated ? "dropping remaining input and switching to READY state." : "switching to READY state."); - } - } else { - state.transitionHandling = TransitionHandlingState::Expired; - if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination == TerminationPolicy::QUIT) { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, quitting immediately as per --completion-policy"); - } else if (deviceContext.exitTransitionTimeout == 0 && mProcessingPolicies.termination != TerminationPolicy::QUIT) { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. No timeout set, switching to READY state immediately"); - } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy"); - } else { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately."); - } - } - } + state.transitionHandling = updateStateTransition(ref, ref.get().processingPolicies); // If we are Idle, we can then consider the transition to be expired. if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) { O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed."); @@ -1560,7 +1569,7 @@ void DataProcessingDevice::Run() } } - O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", state.transitionHandling); + O2_SIGNPOST_END(device, lid, "run_loop", "Run loop completed. Transition handling state %d.", (int)state.transitionHandling); auto& spec = ref.get(); /// Cleanup messages which are still pending on exit. for (size_t ci = 0; ci < spec.inputChannels.size(); ++ci) { diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 900769adb780d..0ea9f18eb65b3 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1092,13 +1092,13 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, quotaEvaluator = std::make_unique(serviceRef); serviceRef.registerService(ServiceRegistryHelpers::handleForService(quotaEvaluator.get())); - deviceContext = std::make_unique(); + deviceContext = std::make_unique(DeviceContext{.processingPolicies = processingPolicies}); serviceRef.registerService(ServiceRegistryHelpers::handleForService(&spec)); serviceRef.registerService(ServiceRegistryHelpers::handleForService(&runningWorkflow)); serviceRef.registerService(ServiceRegistryHelpers::handleForService(deviceContext.get())); serviceRef.registerService(ServiceRegistryHelpers::handleForService(&driverConfig)); - auto device = std::make_unique(ref, serviceRegistry, processingPolicies); + auto device = std::make_unique(ref, serviceRegistry); serviceRef.get().setDevice(device.get()); r.fDevice = std::move(device); diff --git a/Framework/Core/test/test_AllCrashTypes.sh b/Framework/Core/test/test_AllCrashTypes.sh index 54898fd9c4c5d..d333cf4252816 100755 --- a/Framework/Core/test/test_AllCrashTypes.sh +++ b/Framework/Core/test/test_AllCrashTypes.sh @@ -1,23 +1,23 @@ #!/bin/sh -e echo $PATH printf "ok\nTesting runtime-init..." -o2-framework-crashing-workflow --crash-type=runtime-init --completion-policy=quit -b --run | grep -q "Exception caught while in Init: This is a std::runtime_error. Exiting with 1." || { printf "runtime error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=runtime-init --completion-policy=quit -b --run | tee error.log | grep -q "Exception caught while in Init: This is a std::runtime_error. Exiting with 1." || { printf "runtime error not found" ; cat error.log ; exit 1; } printf "ok\nTesting framework-init..." -o2-framework-crashing-workflow --crash-type=framework-init --completion-policy=quit -b --run | grep -q "Exception caught while in Init: This is a o2::framework::runtime_error. Exiting with 1." || { printf "framework error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=framework-init --completion-policy=quit -b --run | tee error.log | grep -q "Exception caught while in Init: This is a o2::framework::runtime_error. Exiting with 1." || { printf "framework error not found" ; cat error.log ; exit 1; } printf "ok\nTesting framework-run..." -o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | grep -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | tee error.log | grep -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; cat error.log ; exit 1; } printf "ok\nTesting runtime-run..." -o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | grep -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | tee error.log | grep -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; cat error.log ; exit 1; } printf "ok\n" export O2_NO_CATCHALL_EXCEPTIONS=1 echo O2_NO_CATCHALL_EXCEPTIONS enabled printf "ok\nTesting runtime-init..." -o2-framework-crashing-workflow --crash-type=runtime-init --completion-policy=quit -b --run | grep -v -q "Exception caught: This is a std::runtime_error" || { printf "runtime error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=runtime-init --completion-policy=quit -b --run | tee error.log | grep -v -q "Exception caught: This is a std::runtime_error" || { printf "runtime error not found" ; cat error.log ; exit 1; } printf "ok\nTesting framework-init..." -o2-framework-crashing-workflow --crash-type=framework-init --completion-policy=quit -b --run | grep -v -q "Exception caught: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=framework-init --completion-policy=quit -b --run | tee error.log | grep -v -q "Exception caught: This is a o2::framework::runtime_error" || { printf "framework error not found" ; cat error.log ; exit 1; } printf "ok\nTesting framework-run..." -o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=framework-run --completion-policy=quit -b --run | tee error.log | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a o2::framework::runtime_error" || { printf "framework error not found" ; cat error.log ; exit 1; } printf "ok\nTesting runtime-run..." -o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found" ; exit 1; } +o2-framework-crashing-workflow --crash-type=runtime-run --completion-policy=quit --run | tee error.log | grep -v -q "Unhandled o2::framework::runtime_error reached the top of main of [^ ]*o2-framework-crashing-workflow, device shutting down. Reason: This is a std::runtime_error" || { echo "runtime error not found"; cat error.log ; exit 1; } printf "ok"