From ab3a965c4ec76bdee1c70e77688da311cdd2f9f8 Mon Sep 17 00:00:00 2001 From: Sandro Wenzel Date: Sun, 23 Nov 2025 17:10:33 +0100 Subject: [PATCH] Stability improvement for o2-sim startup Fixing a zeromq communication problem when the O2PrimaryServerDevice is too fast and quitting before O2HitMerge is even initialized. This can happen rarely, for instance when startup needs long due to CVMFS latency. This situation is now avoided by waiting for the permission to shutdown. The permission is given by O2HitMerger once the system is up and running (first actual hits have been received). Some minor cleanup (comment removal) in addition. --- run/O2HitMerger.h | 37 +++++++++++++++++++++++++++++++++---- run/O2PrimaryServerDevice.h | 19 ++++++++++++++----- run/O2SimDevice.h | 3 --- run/PrimaryServerState.h | 7 ++++--- 4 files changed, 51 insertions(+), 15 deletions(-) diff --git a/run/O2HitMerger.h b/run/O2HitMerger.h index d32f6370ca2db..30ddd57ba91da 100644 --- a/run/O2HitMerger.h +++ b/run/O2HitMerger.h @@ -88,6 +88,29 @@ namespace o2 namespace devices { +// Function communicating to primary particle server that it is now safe to shutdown. +// From the perspective of o2-sim, this is the case when all configs have been propagated and the system +// is running ok: For instance after the HitMerger is initialized and got it's first data from Geant workers. +bool primaryServer_sendShutdownPermission(fair::mq::Channel& channel) +{ + std::unique_ptr request(channel.NewSimpleMessage((int)o2::O2PrimaryServerInfoRequest::AllowShutdown)); + std::unique_ptr reply(channel.NewMessage()); + + int timeoutinMS = 100; + if (channel.Send(request, timeoutinMS) > 0) { + LOG(info) << "Sending Shutdown permission to particle server"; + if (channel.Receive(reply, timeoutinMS) > 0) { + // the answer is a simple ack with a status code + LOG(info) << "Shutdown permission was acknowledged"; + } else { + LOG(error) << "No answer received within " << timeoutinMS << "ms\n"; + return false; + } + return true; + } + return false; +} + class O2HitMerger : public fair::mq::Device { @@ -129,6 +152,9 @@ class O2HitMerger : public fair::mq::Device if (o2::devices::O2SimDevice::querySimConfig(GetChannels().at("o2sim-primserv-info").at(0))) { outfilename = o2::base::NameConf::getMCKinematicsFileName(o2::conf::SimConfig::Instance().getOutPrefix().c_str()); mNExpectedEvents = o2::conf::SimConfig::Instance().getNEvents(); + } else { + // we didn't manage to get a configuration --> better to fail + LOG(fatal) << "No configuration received. Aborting"; } mAsService = o2::conf::SimConfig::Instance().asService(); mForwardKine = o2::conf::SimConfig::Instance().forwardKine(); @@ -354,6 +380,13 @@ class O2HitMerger : public fair::mq::Device // for the next batch return waitForControlInput(); } + + static bool initAcknowledged = false; + if (!initAcknowledged) { + primaryServer_sendShutdownPermission(GetChannels().at("o2sim-primserv-info").at(0)); + initAcknowledged = true; + } + return more; } @@ -413,10 +446,6 @@ class O2HitMerger : public fair::mq::Device }; } } - if (!expectmore) { - // somehow FairMQ has difficulties shutting down; helping manually - // raise(SIGINT); - } return expectmore; } diff --git a/run/O2PrimaryServerDevice.h b/run/O2PrimaryServerDevice.h index 0c09c2105f403..b8703ffcddb28 100644 --- a/run/O2PrimaryServerDevice.h +++ b/run/O2PrimaryServerDevice.h @@ -247,11 +247,10 @@ class O2PrimaryServerDevice final : public fair::mq::Device } } - // launches a thread that listens for status requests from outside asynchronously + // launches a thread that listens for status/config/shutdown requests from outside asynchronously void launchInfoThread() { static std::vector threads; - auto sendErrorReply = [](fair::mq::Channel& channel) { LOG(error) << "UNKNOWN REQUEST"; std::unique_ptr reply(channel.NewSimpleMessage((int)(404))); @@ -260,7 +259,9 @@ class O2PrimaryServerDevice final : public fair::mq::Device LOG(info) << "LAUNCHING STATUS THREAD"; auto lambda = [this, sendErrorReply]() { - while (mState != O2PrimaryServerState::Stopped) { + bool canShutdown{false}; + // Exit only when both: serving stopped and allowed from outside. + while (!(mState == O2PrimaryServerState::Stopped && canShutdown)) { auto& channel = GetChannels().at("o2sim-primserv-info").at(0); if (!channel.IsValid()) { LOG(error) << "channel primserv-info not valid"; @@ -285,6 +286,11 @@ class O2PrimaryServerDevice final : public fair::mq::Device } } else if (request_payload == (int)O2PrimaryServerInfoRequest::Config) { HandleConfigRequest(channel); + } else if (request_payload == (int)O2PrimaryServerInfoRequest::AllowShutdown) { + LOG(info) << "Got info that we may shutdown"; + std::unique_ptr ack(channel.NewSimpleMessage(200)); + channel.Send(ack); + canShutdown = true; } else { sendErrorReply(channel); } @@ -518,10 +524,13 @@ class O2PrimaryServerDevice final : public fair::mq::Device void PostRun() override { + // We shouldn't shut down immediately when all events have been served + // Instead we also need to wait until the info thread running some communication server + // with other processes is finished. while (!mInfoThreadStopped) { LOG(info) << "Waiting info thread"; using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); + std::this_thread::sleep_for(1000ms); } } @@ -534,7 +543,7 @@ class O2PrimaryServerDevice final : public fair::mq::Device if (mEventCounter >= mMaxEvents && mNeedNewEvent) { workavailable = false; } - if (!(mState == O2PrimaryServerState::ReadyToServe || mState == O2PrimaryServerState::WaitingEvent)) { + if (!(mState.load() == O2PrimaryServerState::ReadyToServe || mState.load() == O2PrimaryServerState::WaitingEvent)) { // send a zero answer workavailable = false; } diff --git a/run/O2SimDevice.h b/run/O2SimDevice.h index 35a0c31986702..9256734cce487 100644 --- a/run/O2SimDevice.h +++ b/run/O2SimDevice.h @@ -95,9 +95,6 @@ class O2SimDevice final : public fair::mq::Device // returns true if successful / false if not static bool querySimConfig(fair::mq::Channel& channel) { - // auto text = new std::string("configrequest"); - // std::unique_ptr request(channel.NewMessage(const_cast(text->c_str()), - // text->length(), CustomCleanup, text)); std::unique_ptr request(channel.NewSimpleMessage((int)O2PrimaryServerInfoRequest::Config)); std::unique_ptr reply(channel.NewMessage()); diff --git a/run/PrimaryServerState.h b/run/PrimaryServerState.h index 5a15cca12b9b1..4bae1d566dc60 100644 --- a/run/PrimaryServerState.h +++ b/run/PrimaryServerState.h @@ -25,10 +25,11 @@ enum class O2PrimaryServerState { }; static const char* PrimStateToString[5] = {"INIT", "SERVING", "WAITEVENT", "IDLE", "STOPPED"}; -/// enum class for type of info request +/// enum class for request to o2sim-primserv-info channel of the O2PrimaryServerDevice enum class O2PrimaryServerInfoRequest { - Status = 1, - Config = 2 + Status = 1, // asks to retrieve current status of O2PrimaryServerDevice --> will send O2PrimaryServerState + Config = 2, // asks for o2-sim config reply + AllowShutdown = 3 // can be used to let particle server know that shutdown is now safe (once all components initialized) }; /// Struct to be used as payload when making a request