Rebalance shards when ingester status changes#6185
Rebalance shards when ingester status changes#6185nadav-govari merged 18 commits intoquickwit-oss:mainfrom
Conversation
technically the ingest router should just retry when that happens and there should be a path for the router to open a new shard if the ingester being decommissioned was the only one to have shard(s) for this index. Is it not what you observed? |
|
@ncoiffier-celonis I giving you write access to the repo so next time you can push directly on this repo rather than our fork. It makes it easier for me to checkout your changes locally. Though I learnt how to use |
guilload
left a comment
There was a problem hiding this comment.
I think we're close but we need to fix a few issues.
| } | ||
|
|
||
| #[cfg(any(test, feature = "testsuite"))] | ||
| pub async fn for_test_with_ingester_status( |
There was a problem hiding this comment.
nit: seems a bit overkill to me
There was a problem hiding this comment.
Addressed with 0c1c82b: I unified ClusterNode::for_test_with_ingester_status into ClusterNode::for_test
| pub struct IngestController { | ||
| ingester_pool: IngesterPool, | ||
| pub(crate) ingester_pool: IngesterPool, | ||
| pub(crate) stats: IngestControllerStats, |
| let Some(mailbox) = weak_mailbox.upgrade() else { | ||
| return; | ||
| }; | ||
| let mut trigger_rebalance = false; |
There was a problem hiding this comment.
@ncoiffier-celonis please review this tricky logic thoroughly. I'm the initial author of this change and now I'm also reviewing it so I'm more likely to miss something. I could use a second pair of eyes.
There was a problem hiding this comment.
Yeah this logic def needs a comment. Here, we're considering both indexers and ingesters. Indexers run indexing pipelines when they're ready, they are ready to index, so we want to rebuild an index plan. Same thing when they leave.
In addition, we're considering ingesters (technically all indexers are ingesters and vice-versa because we didn't want to expose users to a new service (service as metastore, janitor, control-plane, etc. not micro service as router, ingester, debug info, etc.)
Ingesters have two level of readiness. First one same as indexer, "I'm up and running, I can connect to the metastore". Second one, "I have loaded my WAL".
So we want to rebalance when the ingester is ready ready, which can happens from the perspective of the stream of events as:
- Add(ready, ready)
OR
- Add(ready, not ready)
- Update(ready, ready)
The logic below tries to implement that.
There was a problem hiding this comment.
That's my understanding too. In 678bef5 I've added some comments, improved the tests and fixed something that I believe to be a bug (Add(ready, IngesterStatus::Initializing) should not trigger a rebalance - not a big deal, but we would have trigger an extra rebalance for nothing)
There was a problem hiding this comment.
Technically, Add(ready, IngesterStatus::Initializing) should not trigger a rebalance (the ingester is not ready to ingest) but should trigger rebuilding the indexing plan (the indexer is ready to index). Today, rebalance and rebuilding the index plan are coupled so have no way to handle this edge case perfectly and I think that's fine if we do both on when the ingester becomes ready.
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_wait_for_ingester_decommission_elapsed_timeout_not_zero() { |
| // Ingest docs with auto-commit. With a 5s commit timeout, these documents | ||
| // sit uncommitted in the ingesters' WAL - exactly the in-flight state we | ||
| // want to exercise during draining. | ||
| ingest( |
There was a problem hiding this comment.
How do we know the shard for this index is always go to be created on the indexer that we're about to shutdown?
There was a problem hiding this comment.
Nice catch; I've modified the sandbox to dynamically add nodes with e02cc5e and I've changed the test with 8815224 to start with a single indexer (to force the shard creation there) before adding a second indexer and decommissioning the first one.
Hopefully that ensures that we're creating the shard on the indexer we're decomissioning.
| /// Tests that the graceful shutdown sequence works correctly in a multi-indexer | ||
| /// cluster: shutting down one indexer does NOT cause 500 errors or data loss, | ||
| /// and the cluster eventually rebalances. see #6158 | ||
| #[tokio::test] |
There was a problem hiding this comment.
Very very nice! Let's make sure this is not flaky, though. Run it 1,000 times! This is how I do it (fish):
while true
c t --manifest-path quickwit/Cargo.toml -p quickwit-integration-tests --nocapture -- test_graceful_shutdown_no_data_loss
endThere was a problem hiding this comment.
Thank you for the suggestion; nextest has a build-in method for that (run in the folder that contains the Cargo.toml):
cargo nextest run test_graceful_shutdown_no_data_loss --stress-count 1000 --max-fail 1
1000 iterations will take 15s * 1000 ~ 4h, it's still running for me. But so far I haven't observed any failure after 100 iterations.
There was a problem hiding this comment.
TIL nextest can do that. Sweet.
There was a problem hiding this comment.
I wasn't able to run for 1000 iterations. The test is somewhat flaky, I've observed 1 failure in ~300 iterations. However, the build is configured with --retries 5, so it's unlikely to break the build, and I think it brings more value to keep it enabled. What do you think?
There was a problem hiding this comment.
Some kind of timeout:
stacktrace
running 1 test
test tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss has been running for over 60 seconds
test tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss ... FAILED
failures:
failures:
tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 44 filtered out; finished in 62.47s
stderr ───
thread 'tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss' (50872832) panicked at quickwit-integration-tests/src/tests/ingest_v2_tests.rs:961:19:
ingest during shutdown should succeed (no 500 errors): client middleware error: error sending request for url (http://127.0.0.1:64377/api/v1/test_graceful_shutdown_no_data_loss/ingest)
Caused by:
operation timed out
Stack backtrace:
0: std::backtrace::Backtrace::create
1: anyhow::error::<impl core::convert::From<E> for anyhow::Error>::from
2: <T as core::convert::Into<U>>::into
3: reqwest_retry::middleware::RetryTransientMiddleware<T,R>::execute_with_retry::{{closure}}::{{closure}}
4: core::result::Result<T,E>::map_err
5: reqwest_retry::middleware::RetryTransientMiddleware<T,R>::execute_with_retry::{{closure}}
6: <reqwest_retry::middleware::RetryTransientMiddleware<T,R> as reqwest_middleware::middleware::Middleware>::handle::{{closure}}
7: <core::pin::Pin<P> as core::future::future::Future>::poll
8: reqwest_middleware::client::ClientWithMiddleware::execute_with_extensions::{{closure}}
9: reqwest_middleware::client::RequestBuilder::send::{{closure}}
10: quickwit_rest_client::rest_client::Transport::send::{{closure}}
11: quickwit_rest_client::rest_client::QuickwitClient::ingest::{{closure}}
12: quickwit_integration_tests::test_utils::cluster_sandbox::ingest::{{closure}}
13: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}::{{closure}}
14: <tokio::future::maybe_done::MaybeDone<Fut> as core::future::future::Future>::poll
15: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}::{{closure}}
16: <core::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
17: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
18: <core::pin::Pin<P> as core::future::future::Future>::poll
19: <core::pin::Pin<P> as core::future::future::Future>::poll
20: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
21: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
22: tokio::runtime::scheduler::current_thread::Context::enter
23: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
24: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
25: tokio::runtime::context::scoped::Scoped<T>::set
26: tokio::runtime::context::set_scheduler::{{closure}}
27: std::thread::local::LocalKey<T>::try_with
28: std::thread::local::LocalKey<T>::with
29: tokio::runtime::context::set_scheduler
30: tokio::runtime::scheduler::current_thread::CoreGuard::enter
31: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
32: tokio::runtime::scheduler::current_thread::CurrentThread::block_on::{{closure}}
33: tokio::runtime::context::runtime::enter_runtime
34: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
35: tokio::runtime::runtime::Runtime::block_on_inner
36: tokio::runtime::runtime::Runtime::block_on 37: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss
38: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
39: core::ops::function::FnOnce::call_once
40: test::__rust_begin_short_backtrace
41: test::run_test::{{closure}}
42: std::sys::backtrace::__rust_begin_short_backtrace
43: core::ops::function::FnOnce::call_once{{vtable.shim}}
44: std::sys::thread::unix::Thread::new::thread_start
45: __pthread_cond_wait
Stack backtrace:
0: std::backtrace::Backtrace::create
1: anyhow::error::<impl core::convert::From<E> for anyhow::Error>::from
2: <core::result::Result<T,F> as core::ops::try_trait::FromResidual<core::result::Result<core::convert::Infallible,E>>>::from_residual
3: quickwit_integration_tests::test_utils::cluster_sandbox::ingest::{{closure}}
4: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}::{{closure}}
5: <tokio::future::maybe_done::MaybeDone<Fut> as core::future::future::Future>::poll
6: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}::{{closure}}
7: <core::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
8: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
9: <core::pin::Pin<P> as core::future::future::Future>::poll
10: <core::pin::Pin<P> as core::future::future::Future>::poll
11: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
12: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
13: tokio::runtime::scheduler::current_thread::Context::enter
14: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
15: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
16: tokio::runtime::context::scoped::Scoped<T>::set
17: tokio::runtime::context::set_scheduler::{{closure}}
18: std::thread::local::LocalKey<T>::try_with
19: std::thread::local::LocalKey<T>::with
20: tokio::runtime::context::set_scheduler
21: tokio::runtime::scheduler::current_thread::CoreGuard::enter
22: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
23: tokio::runtime::scheduler::current_thread::CurrentThread::block_on::{{closure}}
24: tokio::runtime::context::runtime::enter_runtime
25: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
26: tokio::runtime::runtime::Runtime::block_on_inner
27: tokio::runtime::runtime::Runtime::block_on
28: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss
29: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
30: core::ops::function::FnOnce::call_once
31: test::__rust_begin_short_backtrace
32: test::run_test::{{closure}}
33: std::sys::backtrace::__rust_begin_short_backtrace
34: core::ops::function::FnOnce::call_once{{vtable.shim}}
35: std::sys::thread::unix::Thread::new::thread_start
36: __pthread_cond_wait
stack backtrace:
0: __rustc::rust_begin_unwind
1: core::panicking::panic_fmt
2: core::result::unwrap_failed
3: core::result::Result<T,E>::expect
4: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
5: <core::pin::Pin<P> as core::future::future::Future>::poll
6: <core::pin::Pin<P> as core::future::future::Future>::poll
7: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
8: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}
9: tokio::runtime::scheduler::current_thread::Context::enter 10: tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}
11: tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}
12: tokio::runtime::context::scoped::Scoped<T>::set
13: tokio::runtime::context::set_scheduler::{{closure}}
14: std::thread::local::LocalKey<T>::try_with
15: std::thread::local::LocalKey<T>::with
16: tokio::runtime::context::set_scheduler
17: tokio::runtime::scheduler::current_thread::CoreGuard::enter
18: tokio::runtime::scheduler::current_thread::CoreGuard::block_on
19: tokio::runtime::scheduler::current_thread::CurrentThread::block_on::{{closure}}
20: tokio::runtime::context::runtime::enter_runtime
21: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
22: tokio::runtime::runtime::Runtime::block_on_inner
23: tokio::runtime::runtime::Runtime::block_on
24: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss
25: quickwit_integration_tests::tests::ingest_v2_tests::test_graceful_shutdown_no_data_loss::{{closure}}
26: core::ops::function::FnOnce::call_once
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
| Ok((ingest_router, ingest_router_service, ingester_opt)) | ||
| } | ||
|
|
||
| fn setup_ingester_pool( |
There was a problem hiding this comment.
Same here, we need to be extremely careful about this convoluted logic.
There was a problem hiding this comment.
Now that I've thought more about this, I think we have an issue with this logic. This creates a pool of write-only ingesters, which is great for the logic in quickwit-ingest, but in quickwit-indexing, the source also holds an ingester pool and we still want to be able to read and truncate from ingesters when they are in the retiring and decommissioning status. I don't think we want to actually create and mange those distinct pools so we need to maybe restrict this pool to not initializing ingesters and push the additional filtering logic whereever needed (router, control plane).
There was a problem hiding this comment.
I'm not sure I follow you, could you explain a bit more?
From what I can see, for the ingestion paths, we're already guarding agains ingester status:
- has_open_shards: https://github.com/ncoiffier-celonis/quickwit/blob/ingester-status-rebased/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs#L134
- next_open_shard_round_robin: https://github.com/ncoiffier-celonis/quickwit/blob/ingester-status-rebased/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs#L173
- allocate_shards https://github.com/ncoiffier-celonis/quickwit/blob/ingester-status-rebased/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs#L555
- compute_shards_to_rebalance https://github.com/ncoiffier-celonis/quickwit/blob/ingester-status-rebased/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs#L1127
And for the read/truncate paths, we're not guarding on the ingester status (as expected):
- truncate: https://github.com/ncoiffier-celonis/quickwit/blob/ingester-status-rebased/quickwit/quickwit-indexing/src/source/ingest/mod.rs#L348
- fault_tolerant_fetch_stream: https://github.com/ncoiffier-celonis/quickwit/blob/ingester-status-rebased/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs#L458
- sync_with_all_ingesters: https://github.com/ncoiffier-celonis/quickwit/blob/ingester-status-rebased/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs#L319
- try_scale_down_shards: https://github.com/ncoiffier-celonis/quickwit/blob/ingester-status-rebased/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs#L904
Are you saying that setup_ingester_pool should update the ingester pool regardless of the status? Something like 84880a1 maybe?
There was a problem hiding this comment.
Yeah, exactly like 84880a1. Actually, I would also not include Initializing ingesters in the pool because they're useless, but either way is fine.
|
@nadav-govari, I need your eyes on this because:
|
81e493d to
6bebf0d
Compare
|
(I took the liberty to force-push after signing all the individual commits, no code change) |
… a node to a running cluster
…raceful_shutdown_no_data_loss integration test
…); improve comments and tests for it
59f585c to
678bef5
Compare
|
@guilload I think I've addressed all your comments, would you be able to give it a second look? Thank you!! (Feel free to close the obsolete ones) |
|
Looking good. @nadav-govari please take a look on Monday. |
nadav-govari
left a comment
There was a problem hiding this comment.
Generally looks good to me (pending the comment).
The way I see this PR, a lot of this logic is avoided with the changes that come in my in-development change on the branch nadav/feature-node-based-routing. It refactors (and greatly simplifies) the routing layer to only deal with ingesters rather than with shards.
A big part of this change is broadcasting routing updates directly (rather than shard updates). Additionally, routing updates are also returned as part of every persist request; with enough load, the cluster eventually converges to having a very accurate picture of the state of other ingesters.
I believe we could accomplish what this PR is trying to do by piggybacking onto that change- add INGESTER_STATUS_DECOMMISSIONING as a persist response, and then update the ingester pool from there. We'd still need the rebalancing logic in the control plane, however, to ensure that the same shard isn't scheduled on the retiring ingester.
I'm not convinced that it's necessarily better, and this seems to work, so I think this is a reasonable way to start. We can revisit it later if we believe there's something more efficient we can do.
| .filter(|ingester| !unavailable_leaders.contains(ingester)) | ||
| .map(|ingester| (ingester, 0)) | ||
| .filter(|(ingester_id, ingester)| { | ||
| ingester.status.is_ready() && !unavailable_leaders.contains(ingester_id) |
There was a problem hiding this comment.
I want to confirm this is correct - wouldn't we want to capture open shards on ingesters with other statuses, so that we can re-allocate them below? And then we can apply the ingester status when allocating?
There was a problem hiding this comment.
From my understanding, this function is only when opening new shards (called from try_open_shards); here the counts are used purely for load balancing new allocations among eligible ingesters. Including non-ready ingesters would incorrectly make them allocation targets. Or did I misunderstood something?
There was a problem hiding this comment.
Nope, you're right. Sorry for the confusion.
nadav-govari
left a comment
There was a problem hiding this comment.
Thanks for making this change!
Description
Attempt to fix #6158
Following @guilload's suggestion here, this PR:
no open shard found on ingestererror)With this approach, even if we have some 10s propagation delay before decomissioning, it is still possible to fail to ingest some documents if the chitchat takes longer than expected to gossip the ingester status to the control-plane.
Any feedback is welcome!!
How was this PR tested?
In addition of the unit and integration tests, I've run it against a local cluster with 2 indexer and observed that the number of errors reported in #6158 decreases from a few 100 to no errors.
Other approches
This PR is fairly identical to the branch
guilload/ingester-status, rebased onmainand with some additional bugfixes: