[fix][broker] Don't let a stuck or aborted topic policies cache init make a namespace's topics unloadable#26025
Open
lhotari wants to merge 1 commit into
Open
Conversation
c94b39b to
93fa3ba
Compare
…make a namespace's topics unloadable Topic loading waits for the namespace's topic policies cache to be initialized by reading the __change_events system topic to the end (SystemTopicBasedTopicPoliciesService#initPolicesCache), which completes a shared per-namespace future in policyCacheInitMap that every topic load awaits. That future could be left pending forever, leaving every topic in the namespace stuck and unloadable until the broker was restarted (issue apache#25294), in two ways: 1. The read loop had no timeout: a system-topic reader that reconnected but stopped making progress pinned the future indefinitely. 2. Several cleanup paths removed the future from policyCacheInitMap without ever completing it (most importantly the namespace-bundle unload path, removeOwnedNamespaceBundleAsync), relying on the reader being closed and the init chain failing to complete it indirectly. Modifications: - Add topicPoliciesCacheInitTimeoutSeconds (default 60s, dynamic). prepareInitPoliciesCacheAsync now schedules a timeout that fails the init future, and via an identity-guarded cleanup (cleanupAfterPolicyCacheInitTimeout) clears the cached state and closes the stuck reader only when the timed-out future is still the current one, so a concurrent retry/unload is never clobbered. A new metric pulsar.broker.topic.policies.cache.init.timeout.count counts these events. - cleanPoliciesCacheInitMap and close() now complete any pending init future they drop (exceptionally), so awaiting topic loads fail fast and retry instead of hanging. Completion happens outside the ConcurrentHashMap compute() remapping to avoid a recursive update / deadlock (apache#24977). Assisted-by: Claude Code (Opus 4.8)
93fa3ba to
33f4ddd
Compare
merlimat
approved these changes
Jun 14, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #25294
Motivation
When a topic is loaded, the broker first waits for the namespace's topic policies cache to be
initialized. Initialization (
SystemTopicBasedTopicPoliciesService#initPolicesCache) reads thenamespace's
__change_eventssystem topic to the end and completes a shared, per-namespacefuture (
policyCacheInitMap) that every topic load in the namespace awaits.That shared future could be left pending forever, leaving every topic in the namespace stuck and
unloadable until the broker was restarted (issue #25294), in two distinct ways:
__change_eventsreader reconnects but then stops makingprogress — e.g. after
__change_eventsis unloaded/moved and the reconnected reader gets stuck(see the compacted-read stuck-reader bug fixed in [fix][broker] Fix compacted read could be stuck forever or message loss due to cursor mark delete #25998) — the read loop never finishes and the
future stays pending. The 60s
topicLoadTimeoutSecondsonly fails the individual topic-loadfuture; it does not clear the poisoned
policyCacheInitMapentry or close the stuck reader, so thenamespace stays poisoned and every later load times out the same way.
policyCacheInitMapbut never complete the future — most importantly the namespace-bundle unloadpath (
removeOwnedNamespaceBundleAsync). They relied on the reader being closed and the init chainfailing to complete the future indirectly; if that didn't happen, awaiting topic loads hung. This
matches the "futures accumulate until the broker is restarted" symptom in the report.
This is defense-in-depth that is complementary to #25998 (which fixes one concrete stuck-reader
trigger on the broker/cursor side): it guarantees the per-namespace init future is always completed,
so a single stuck/aborted initialization can no longer take a whole namespace's topics down until
restart.
Modifications
topicPoliciesCacheInitTimeoutSeconds(default60, dynamic). It bounds topic policies cacheinitialization for a namespace. Set to
0/negative to disable (previous unbounded behavior).prepareInitPoliciesCacheAsyncschedules a timeout that fails the init future. On timeout, anidentity-guarded cleanup (
cleanupAfterPolicyCacheInitTimeout) clears the cached state andcloses the stuck reader only when the timed-out future is still the current one — it captures
the reader before the gate and uses
policyCacheInitMap.remove(ns, future)/readerCaches.remove(ns, reader), so a concurrent retry (or an unload that already replaced theentry) is never clobbered. A new
pulsar.broker.topic.policies.cache.init.timeout.countOpenTelemetry counter records these events. The timeout task is cancelled as soon as initialization
completes, so it adds no overhead on the normal path.
cleanPoliciesCacheInitMapandclose()now complete any pending init future they drop(exceptionally), so awaiting topic loads fail fast and retry with a fresh reader instead of hanging.
The completion is done outside the
ConcurrentHashMap#computeremapping function, becausecompleting the future can run the awaiting topic-load callbacks synchronously and doing that while
holding the bin lock risks a recursive map update / deadlock (the hazard addressed in [Bug] [broker] Concurrent error in SystemTopicBasedTopicPoliciesService#prepareInitPoliciesCacheAsync #24977).
Verifying this change
This change added tests and can be verified as follows:
...TopicPoliciesServiceTest#testPrepareInitPoliciesCacheAsyncTimesOutWhenReaderStuck: spies the__change_eventsreader so it reports more events but never delivers one (a stuck reader), thenasserts
prepareInitPoliciesCacheAsyncfails with aTimeoutExceptioninstead of hanging, that thepoisoned
policyCacheInitMapentry is cleared, and that the stuck reader is closed. Verified redwithout the fix and green with it.
...TopicPoliciesServiceTest#testCleanPoliciesCacheInitMapCompletesPendingInitFuture: asserts thatdropping a pending init future (both the reader-close and non-reader-close branches) completes it
exceptionally and removes it from the map, and that an already-completed future is left untouched.
SystemTopicBasedTopicPoliciesServiceTestsuite passes, including the existinginit/cleanup tests (cleanup call counts and behavior unchanged on the normal path).
Does this pull request potentially affect one of the following parts:
topicPoliciesCacheInitTimeoutSeconds, default 60s; topic policies cache initialization is now bounded by default instead of unbounded)pulsar.broker.topic.policies.cache.init.timeout.count)