Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use anyhow::{Context, bail};
use bytesize::ByteSize;
pub(crate) use decompression::Body;
pub use format::BodyFormat;
use futures::stream::Abortable;
use futures::stream::AbortHandle;
use futures::StreamExt;
use itertools::Itertools;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -396,14 +398,14 @@ fn start_shard_positions_service(
/// Usually called when receiving a SIGTERM signal, e.g. k8s trying to
/// decomission a pod.
async fn shutdown_signal_handler(
shutdown_signal: BoxFutureInfaillible<()>,
shutdown_signal: Abortable<BoxFutureInfaillible<()>>,
universe: Universe,
ingester_opt: Option<Ingester>,
grpc_shutdown_trigger_tx: oneshot::Sender<()>,
rest_shutdown_trigger_tx: oneshot::Sender<()>,
cluster: Cluster,
) -> HashMap<String, ActorExitStatus> {
shutdown_signal.await;
let _ = shutdown_signal.await;
// We must decommission the ingester first before terminating the indexing pipelines that
// may consume from it. We also need to keep the gRPC server running while doing so.
if let Some(ingester) = ingester_opt
Expand Down Expand Up @@ -825,8 +827,10 @@ pub async fn serve_quickwit(
"node_readiness_reporting",
);

let (shutdown_signal_abort_handle, shutdown_signal_abort_reg) = AbortHandle::new_pair();
let shutdown_signal_abortable = Abortable::new(shutdown_signal, shutdown_signal_abort_reg);
let shutdown_handle = tokio::spawn(shutdown_signal_handler(
shutdown_signal,
shutdown_signal_abortable,
universe,
ingester_opt,
grpc_shutdown_trigger_tx,
Expand All @@ -851,8 +855,12 @@ pub async fn serve_quickwit(

if let Err(err) = tokio::try_join!(grpc_join_handle, rest_join_handle, chitchat_server_handle) {
error!("server failed: {err:?}");

// Trigger a shutdown by completing the shutdown_signal handle.
shutdown_signal_abort_handle.abort();
}

info!("waiting for services to shutdown");
let actor_exit_statuses = shutdown_handle
.await
.context("failed to gracefully shutdown services")?;
Expand Down