diff --git a/run/O2HitMerger.h b/run/O2HitMerger.h index d32f6370ca2db..30ddd57ba91da 100644 --- a/run/O2HitMerger.h +++ b/run/O2HitMerger.h @@ -88,6 +88,29 @@ namespace o2 namespace devices { +// Function communicating to primary particle server that it is now safe to shutdown. +// From the perspective of o2-sim, this is the case when all configs have been propagated and the system +// is running ok: For instance after the HitMerger is initialized and got it's first data from Geant workers. +bool primaryServer_sendShutdownPermission(fair::mq::Channel& channel) +{ + std::unique_ptr request(channel.NewSimpleMessage((int)o2::O2PrimaryServerInfoRequest::AllowShutdown)); + std::unique_ptr reply(channel.NewMessage()); + + int timeoutinMS = 100; + if (channel.Send(request, timeoutinMS) > 0) { + LOG(info) << "Sending Shutdown permission to particle server"; + if (channel.Receive(reply, timeoutinMS) > 0) { + // the answer is a simple ack with a status code + LOG(info) << "Shutdown permission was acknowledged"; + } else { + LOG(error) << "No answer received within " << timeoutinMS << "ms\n"; + return false; + } + return true; + } + return false; +} + class O2HitMerger : public fair::mq::Device { @@ -129,6 +152,9 @@ class O2HitMerger : public fair::mq::Device if (o2::devices::O2SimDevice::querySimConfig(GetChannels().at("o2sim-primserv-info").at(0))) { outfilename = o2::base::NameConf::getMCKinematicsFileName(o2::conf::SimConfig::Instance().getOutPrefix().c_str()); mNExpectedEvents = o2::conf::SimConfig::Instance().getNEvents(); + } else { + // we didn't manage to get a configuration --> better to fail + LOG(fatal) << "No configuration received. Aborting"; } mAsService = o2::conf::SimConfig::Instance().asService(); mForwardKine = o2::conf::SimConfig::Instance().forwardKine(); @@ -354,6 +380,13 @@ class O2HitMerger : public fair::mq::Device // for the next batch return waitForControlInput(); } + + static bool initAcknowledged = false; + if (!initAcknowledged) { + primaryServer_sendShutdownPermission(GetChannels().at("o2sim-primserv-info").at(0)); + initAcknowledged = true; + } + return more; } @@ -413,10 +446,6 @@ class O2HitMerger : public fair::mq::Device }; } } - if (!expectmore) { - // somehow FairMQ has difficulties shutting down; helping manually - // raise(SIGINT); - } return expectmore; } diff --git a/run/O2PrimaryServerDevice.h b/run/O2PrimaryServerDevice.h index 0c09c2105f403..b8703ffcddb28 100644 --- a/run/O2PrimaryServerDevice.h +++ b/run/O2PrimaryServerDevice.h @@ -247,11 +247,10 @@ class O2PrimaryServerDevice final : public fair::mq::Device } } - // launches a thread that listens for status requests from outside asynchronously + // launches a thread that listens for status/config/shutdown requests from outside asynchronously void launchInfoThread() { static std::vector threads; - auto sendErrorReply = [](fair::mq::Channel& channel) { LOG(error) << "UNKNOWN REQUEST"; std::unique_ptr reply(channel.NewSimpleMessage((int)(404))); @@ -260,7 +259,9 @@ class O2PrimaryServerDevice final : public fair::mq::Device LOG(info) << "LAUNCHING STATUS THREAD"; auto lambda = [this, sendErrorReply]() { - while (mState != O2PrimaryServerState::Stopped) { + bool canShutdown{false}; + // Exit only when both: serving stopped and allowed from outside. + while (!(mState == O2PrimaryServerState::Stopped && canShutdown)) { auto& channel = GetChannels().at("o2sim-primserv-info").at(0); if (!channel.IsValid()) { LOG(error) << "channel primserv-info not valid"; @@ -285,6 +286,11 @@ class O2PrimaryServerDevice final : public fair::mq::Device } } else if (request_payload == (int)O2PrimaryServerInfoRequest::Config) { HandleConfigRequest(channel); + } else if (request_payload == (int)O2PrimaryServerInfoRequest::AllowShutdown) { + LOG(info) << "Got info that we may shutdown"; + std::unique_ptr ack(channel.NewSimpleMessage(200)); + channel.Send(ack); + canShutdown = true; } else { sendErrorReply(channel); } @@ -518,10 +524,13 @@ class O2PrimaryServerDevice final : public fair::mq::Device void PostRun() override { + // We shouldn't shut down immediately when all events have been served + // Instead we also need to wait until the info thread running some communication server + // with other processes is finished. while (!mInfoThreadStopped) { LOG(info) << "Waiting info thread"; using namespace std::chrono_literals; - std::this_thread::sleep_for(100ms); + std::this_thread::sleep_for(1000ms); } } @@ -534,7 +543,7 @@ class O2PrimaryServerDevice final : public fair::mq::Device if (mEventCounter >= mMaxEvents && mNeedNewEvent) { workavailable = false; } - if (!(mState == O2PrimaryServerState::ReadyToServe || mState == O2PrimaryServerState::WaitingEvent)) { + if (!(mState.load() == O2PrimaryServerState::ReadyToServe || mState.load() == O2PrimaryServerState::WaitingEvent)) { // send a zero answer workavailable = false; } diff --git a/run/O2SimDevice.h b/run/O2SimDevice.h index 35a0c31986702..9256734cce487 100644 --- a/run/O2SimDevice.h +++ b/run/O2SimDevice.h @@ -95,9 +95,6 @@ class O2SimDevice final : public fair::mq::Device // returns true if successful / false if not static bool querySimConfig(fair::mq::Channel& channel) { - // auto text = new std::string("configrequest"); - // std::unique_ptr request(channel.NewMessage(const_cast(text->c_str()), - // text->length(), CustomCleanup, text)); std::unique_ptr request(channel.NewSimpleMessage((int)O2PrimaryServerInfoRequest::Config)); std::unique_ptr reply(channel.NewMessage()); diff --git a/run/PrimaryServerState.h b/run/PrimaryServerState.h index 5a15cca12b9b1..4bae1d566dc60 100644 --- a/run/PrimaryServerState.h +++ b/run/PrimaryServerState.h @@ -25,10 +25,11 @@ enum class O2PrimaryServerState { }; static const char* PrimStateToString[5] = {"INIT", "SERVING", "WAITEVENT", "IDLE", "STOPPED"}; -/// enum class for type of info request +/// enum class for request to o2sim-primserv-info channel of the O2PrimaryServerDevice enum class O2PrimaryServerInfoRequest { - Status = 1, - Config = 2 + Status = 1, // asks to retrieve current status of O2PrimaryServerDevice --> will send O2PrimaryServerState + Config = 2, // asks for o2-sim config reply + AllowShutdown = 3 // can be used to let particle server know that shutdown is now safe (once all components initialized) }; /// Struct to be used as payload when making a request