[improve][broker] Cut PendingAcksMap per-entry overhead with primitive storage#26024
Draft
void-ptr974 wants to merge 4 commits into
Draft
[improve][broker] Cut PendingAcksMap per-entry overhead with primitive storage#26024void-ptr974 wants to merge 4 commits into
void-ptr974 wants to merge 4 commits into
Conversation
Contributor
Author
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
PendingAcksMapis used on the Shared and Key_Shared consumer dispatch path to track entries that have been sent but not fully acknowledged. The current layout stores each ledger bucket as:The outer ledger ordering is still needed for range removal. The inner per-ledger entry map does not need sorted iteration: boundary removal checks
entryId <= markDeleteEntryId, and replay ordering is rebuilt later byMessageRedeliveryController.This PR keeps the outer
TreeMapand replaces the inner map with primitivelong -> longstorage. The value packs(remainingUnacked, stickyKeyHash)into onelong, reducing boxedLong, tree node, andIntIntPairallocation on the hot path.Changes
Long2LongMapandLong2LongOpenHashMap, following the existing primitive collection style.TreeMap<Long, IntIntPair>toLong2LongOpenHashMap.remainingUnackedandstickyKeyHashin onelong; unpacking stays at API and callback boundaries.IntIntPairwhere the caller only needs the count.PendingAcksMapBenchmarkwith old and new implementations in the same JMH class.This PR does not include the separate O(1)
size()change. If that lands first, this PR can be updated to keep the cached size accounting with the primitive inner map.Correctness
Long2LongOpenHashMapTest.testZeroValueCanBeDistinguishedFromMissingKeyandtestRandomOperationsAgainstHashMapcover zero values, missing values, updates, removals, iteration, and randomized behavior againstHashMap.PendingAcksMapTest.packedPendingAckFields_RoundTripThroughPublicAccessors,removeAllUpTo_PreservesPackedFieldsInCallback, andupdateRemainingUnacked_PreservesPackedStickyKeyHash.PendingAcksMapTestcoverage continues to exercise add, contains, remove,removeAllUpTo, callbacks,forEach,forEachAndClear, and size behavior.PendingAcksMapTest.removeAllUpTo_RemovesBoundaryEntriesWithUnorderedInnerMapandremoveAllUpTo_RemovesWholeLedgersAndUnorderedBoundaryEntries.MessageRedeliveryControllerTest.testGetMessagesToReplayNowandConsumerRedeliveryTest.testOrderedRedelivery.PendingAcksMapno longer depends on sorted iteration inside one ledger bucket.removeAllUpTo(...)still uses the outerTreeMapfor ledger ordering; only the boundary ledger is scanned by entry id. Redelivery ordering is handled after collection byMessageRedeliveryController, which stores replay positions in sorted structures and returns ordered positions to the dispatcher.Benchmark
The benchmark uses append-order datasets: entries fill one ledger before rolling over to the next ledger. This matches managed-ledger write order more closely than round-robin distribution.
JMH settings used for the numbers below:
Common pending-ack operations, 50k pending entries:
addOrReplacecontainsHitgetRemainingUnackedHitupdateRemainingUnackedremoveAndAddRemainingRange and iteration operations, 50k pending entries:
forEachAllpopulateremoveAllUpToAllocation examples from
-prof gc:addOrReplaceaddOrReplaceupdateRemainingUnackedremoveAndAddRemainingremoveAndAddRemainingThe main trade-off is
removeAllUpToon a single large ledger. The old innerTreeMapcan remove a sorted prefix withheadMap(...); the new primitive hash map scans the boundary ledger. Once the pending window spans several ledgers, whole-ledger removal through the outerTreeMapdominates and the new layout is faster.Object Size
Measured locally with JOL 0.17
GraphLayout.totalSize()on OpenJDK 25.0.2. The helper source and JOL dependency are not included in this PR.PendingAcksMapstores pending entries, not messages. With batching, the entry count is lower than the unacked message count.The sparse one-entry-per-ledger case is not the expected hot path, but it is useful as a bound. It can be optimized separately if it shows up in production profiles.
Verification
./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.PendingAcksMapTest./gradlew :pulsar-common:test --tests org.apache.pulsar.common.util.collections.Long2LongOpenHashMapTest./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.MessageRedeliveryControllerTest.testGetMessagesToReplayNow --tests org.apache.pulsar.client.api.ConsumerRedeliveryTest.testOrderedRedelivery -PtestRetryCount=0./gradlew :pulsar-broker:checkstyleMain :pulsar-common:checkstyleMain./gradlew :pulsar-broker:checkstyleTest./gradlew :pulsar-common:checkstyleTest./gradlew :microbench:checkstyleMain./gradlew :microbench:shadowJarGraphLayout.totalSize()run against an out-of-tree helper