From e46efb63e37b7a8f22813eff18b1e4de3c349b17 Mon Sep 17 00:00:00 2001 From: David Rohr Date: Fri, 29 Aug 2025 18:30:18 +0200 Subject: [PATCH] GPU Workflow: Pop next tf from completion policy queue only when actually running and add sanity checks --- .../Global/GPUChainTrackingClusterizer.cxx | 3 +++ GPU/Workflow/src/GPUWorkflowPipeline.cxx | 14 +++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/GPU/GPUTracking/Global/GPUChainTrackingClusterizer.cxx b/GPU/GPUTracking/Global/GPUChainTrackingClusterizer.cxx index 846df352d3a34..c92049b040c46 100644 --- a/GPU/GPUTracking/Global/GPUChainTrackingClusterizer.cxx +++ b/GPU/GPUTracking/Global/GPUChainTrackingClusterizer.cxx @@ -751,6 +751,9 @@ int32_t GPUChainTracking::RunTPCClusterizer(bool synchronizeOutput) if (buildNativeGPU) { AllocateRegisteredMemory(mInputsHost->mResourceClusterNativeBuffer); } + if (mWaitForFinalInputs && GetProcessingSettings().nTPCClustererLanes > 6) { + GPUFatal("ERROR, mWaitForFinalInputs cannot be called with nTPCClustererLanes > 6"); + } if (buildNativeHost && !(buildNativeGPU && GetProcessingSettings().delayedOutput)) { if (mWaitForFinalInputs) { GPUFatal("Cannot use waitForFinalInput callback without delayed output"); diff --git a/GPU/Workflow/src/GPUWorkflowPipeline.cxx b/GPU/Workflow/src/GPUWorkflowPipeline.cxx index a09fdac988d1a..8867b6c336f97 100644 --- a/GPU/Workflow/src/GPUWorkflowPipeline.cxx +++ b/GPU/Workflow/src/GPUWorkflowPipeline.cxx @@ -66,11 +66,7 @@ void GPURecoWorkflowSpec::initPipeline(o2::framework::InitContext& ic) mPolicyOrder = [this](o2::framework::DataProcessingHeader::StartTime timeslice) { std::unique_lock lk(mPipeline->completionPolicyMutex); mPipeline->completionPolicyNotify.wait(lk, [pipeline = mPipeline.get()] { return pipeline->pipelineSenderTerminating || !pipeline->completionPolicyQueue.empty(); }); - if (mPipeline->completionPolicyQueue.front() == timeslice) { - mPipeline->completionPolicyQueue.pop(); - return true; - } - return false; + return !mPipeline->completionPolicyQueue.empty() && mPipeline->completionPolicyQueue.front() == timeslice; }; mPipeline->receiveThread = std::thread([this]() { RunReceiveThread(); }); for (uint32_t i = 0; i < mPipeline->workers.size(); i++) { @@ -175,6 +171,14 @@ int32_t GPURecoWorkflowSpec::handlePipeline(ProcessingContext& pc, GPUTrackingIn tpcZSmeta = std::move(context->tpcZSmeta); tpcZS = context->tpcZS; ptrs.tpcZS = &tpcZS; + + { + std::lock_guard lk(mPipeline->completionPolicyMutex); + if (mPipeline->completionPolicyQueue.empty() || mPipeline->completionPolicyQueue.front() != tinfo.timeslice) { + LOG(fatal) << "Time frame processed does not equal the timeframe at the top of the queue, time frames seem out of sync"; + } + mPipeline->completionPolicyQueue.pop(); + } } if (mSpecConfig.enableDoublePipeline == 2) { auto prepareDummyMessage = pc.outputs().make>(Output{gDataOriginGPU, "PIPELINEPREPARE", 0}, 0u);