From f3e5292547b489c93b599148c93cf0b4c440e180 Mon Sep 17 00:00:00 2001 From: Philip Wernersbach Date: Mon, 9 Mar 2026 15:55:40 -0400 Subject: [PATCH] fix: Properly shutdown quickwit-serve when subcomponents panic or otherwise error. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this change, the `if let Err` block silently swallows the error and logs it. The code continues on to the `shutdown_handle.await` call. In the case where the `tokio::try_join!` returns an error (such as when any of the three components for the three `JoinHandle` arguments panic), the `shutdown_handle` is not guaranteed to have completed, so the program sits there waiting for a SIGTERM, even though some components aren’t running. --- quickwit/quickwit-serve/src/lib.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 676febe224a..0e9b420928c 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -54,6 +54,8 @@ use anyhow::{Context, bail}; use bytesize::ByteSize; pub(crate) use decompression::Body; pub use format::BodyFormat; +use futures::stream::Abortable; +use futures::stream::AbortHandle; use futures::StreamExt; use itertools::Itertools; use once_cell::sync::Lazy; @@ -396,14 +398,14 @@ fn start_shard_positions_service( /// Usually called when receiving a SIGTERM signal, e.g. k8s trying to /// decomission a pod. async fn shutdown_signal_handler( - shutdown_signal: BoxFutureInfaillible<()>, + shutdown_signal: Abortable>, universe: Universe, ingester_opt: Option, grpc_shutdown_trigger_tx: oneshot::Sender<()>, rest_shutdown_trigger_tx: oneshot::Sender<()>, cluster: Cluster, ) -> HashMap { - shutdown_signal.await; + let _ = shutdown_signal.await; // We must decommission the ingester first before terminating the indexing pipelines that // may consume from it. We also need to keep the gRPC server running while doing so. if let Some(ingester) = ingester_opt @@ -825,8 +827,10 @@ pub async fn serve_quickwit( "node_readiness_reporting", ); + let (shutdown_signal_abort_handle, shutdown_signal_abort_reg) = AbortHandle::new_pair(); + let shutdown_signal_abortable = Abortable::new(shutdown_signal, shutdown_signal_abort_reg); let shutdown_handle = tokio::spawn(shutdown_signal_handler( - shutdown_signal, + shutdown_signal_abortable, universe, ingester_opt, grpc_shutdown_trigger_tx, @@ -851,8 +855,12 @@ pub async fn serve_quickwit( if let Err(err) = tokio::try_join!(grpc_join_handle, rest_join_handle, chitchat_server_handle) { error!("server failed: {err:?}"); + + // Trigger a shutdown by completing the shutdown_signal handle. + shutdown_signal_abort_handle.abort(); } + info!("waiting for services to shutdown"); let actor_exit_statuses = shutdown_handle .await .context("failed to gracefully shutdown services")?;