Skip to content

[improve][broker] Cut PendingAcksMap per-entry overhead with primitive storage#26024

Draft
void-ptr974 wants to merge 4 commits into
apache:masterfrom
void-ptr974:pending-acks-local-primitive-map
Draft

[improve][broker] Cut PendingAcksMap per-entry overhead with primitive storage#26024
void-ptr974 wants to merge 4 commits into
apache:masterfrom
void-ptr974:pending-acks-local-primitive-map

Conversation

@void-ptr974

@void-ptr974 void-ptr974 commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Motivation

PendingAcksMap is used on the Shared and Key_Shared consumer dispatch path to track entries that have been sent but not fully acknowledged. The current layout stores each ledger bucket as:

TreeMap<Long, TreeMap<Long, IntIntPair>>

The outer ledger ordering is still needed for range removal. The inner per-ledger entry map does not need sorted iteration: boundary removal checks entryId <= markDeleteEntryId, and replay ordering is rebuilt later by MessageRedeliveryController.

This PR keeps the outer TreeMap and replaces the inner map with primitive long -> long storage. The value packs (remainingUnacked, stickyKeyHash) into one long, reducing boxed Long, tree node, and IntIntPair allocation on the hot path.

Changes

Area Change
Primitive map Adds Long2LongMap and Long2LongOpenHashMap, following the existing primitive collection style.
Pending ack storage Changes each ledger bucket from TreeMap<Long, IntIntPair> to Long2LongOpenHashMap.
Packed value Stores remainingUnacked and stickyKeyHash in one long; unpacking stays at API and callback boundaries.
Hot-path accessors Adds remaining-unacked-only accessors to avoid materializing IntIntPair where the caller only needs the count.
Benchmark Adds PendingAcksMapBenchmark with old and new implementations in the same JMH class.

This PR does not include the separate O(1) size() change. If that lands first, this PR can be updated to keep the cached size accounting with the primitive inner map.

Correctness

Concern Coverage
Primitive map behavior Long2LongOpenHashMapTest.testZeroValueCanBeDistinguishedFromMissingKey and testRandomOperationsAgainstHashMap cover zero values, missing values, updates, removals, iteration, and randomized behavior against HashMap.
Packed value preservation PendingAcksMapTest.packedPendingAckFields_RoundTripThroughPublicAccessors, removeAllUpTo_PreservesPackedFieldsInCallback, and updateRemainingUnacked_PreservesPackedStickyKeyHash.
Pending ack operations Existing PendingAcksMapTest coverage continues to exercise add, contains, remove, removeAllUpTo, callbacks, forEach, forEachAndClear, and size behavior.
Inner map no longer sorted PendingAcksMapTest.removeAllUpTo_RemovesBoundaryEntriesWithUnorderedInnerMap and removeAllUpTo_RemovesWholeLedgersAndUnorderedBoundaryEntries.
Replay ordering MessageRedeliveryControllerTest.testGetMessagesToReplayNow and ConsumerRedeliveryTest.testOrderedRedelivery.

PendingAcksMap no longer depends on sorted iteration inside one ledger bucket. removeAllUpTo(...) still uses the outer TreeMap for ledger ordering; only the boundary ledger is scanned by entry id. Redelivery ordering is handled after collection by MessageRedeliveryController, which stores replay positions in sorted structures and returns ordered positions to the dispatcher.

Benchmark

The benchmark uses append-order datasets: entries fill one ledger before rolling over to the next ledger. This matches managed-ledger write order more closely than round-robin distribution.

JMH settings used for the numbers below:

./gradlew :microbench:shadowJar

java -jar microbench/build/libs/microbench-5.0.0-M1-SNAPSHOT-benchmarks.jar \
  'PendingAcksMapBenchmark\.(addOrReplace|containsHit|getRemainingUnackedHit|updateRemainingUnacked|removeAndAddRemaining)' \
  -p implementation=oldProduction,production \
  -p dataset=defaultUnacked50kEntries1Ledger,defaultUnacked50kEntries5Ledgers,defaultUnacked50kEntries10Ledgers,defaultUnacked50kEntries20Ledgers \
  -wi 2 -i 2 -w 1s -r 1s -f 1 -prof gc \
  -rf csv -rff /private/tmp/pending_acks_ledger_gradient_common.csv

java -jar microbench/build/libs/microbench-5.0.0-M1-SNAPSHOT-benchmarks.jar \
  'PendingAcksMapBenchmark\.(forEachAll|removeAllUpTo|populate)' \
  -p implementation=oldProduction,production \
  -p dataset=defaultUnacked50kEntries1Ledger,defaultUnacked50kEntries5Ledgers,defaultUnacked50kEntries10Ledgers,defaultUnacked50kEntries20Ledgers \
  -wi 2 -i 2 -w 1s -r 1s -f 1 -prof gc \
  -rf csv -rff /private/tmp/pending_acks_ledger_gradient_range.csv

Common pending-ack operations, 50k pending entries:

Benchmark 1 ledger 5 ledgers 10 ledgers 20 ledgers
addOrReplace 86.6 -> 27.5 ns/op (-68.3%) 85.9 -> 31.2 ns/op (-63.7%) 86.3 -> 33.7 ns/op (-60.9%) 82.8 -> 32.7 ns/op (-60.5%)
containsHit 89.0 -> 27.1 ns/op (-69.5%) 85.3 -> 29.0 ns/op (-65.9%) 84.6 -> 30.2 ns/op (-64.3%) 81.1 -> 31.8 ns/op (-60.8%)
getRemainingUnackedHit 97.5 -> 29.8 ns/op (-69.5%) 81.9 -> 30.2 ns/op (-63.1%) 79.7 -> 35.4 ns/op (-55.6%) 80.3 -> 32.4 ns/op (-59.6%)
updateRemainingUnacked 154.8 -> 33.2 ns/op (-78.6%) 140.5 -> 34.5 ns/op (-75.4%) 140.6 -> 35.3 ns/op (-74.9%) 133.8 -> 36.9 ns/op (-72.4%)
removeAndAddRemaining 181.0 -> 74.0 ns/op (-59.1%) 162.4 -> 92.4 ns/op (-43.1%) 166.8 -> 109.2 ns/op (-34.6%) 161.8 -> 97.1 ns/op (-40.0%)

Range and iteration operations, 50k pending entries:

Benchmark 1 ledger 5 ledgers 10 ledgers 20 ledgers
forEachAll 455.7 -> 468.6 us/op (+2.8%) 477.9 -> 121.2 us/op (-74.6%) 530.9 -> 66.3 us/op (-87.5%) 447.0 -> 65.0 us/op (-85.5%)
populate 4.687 -> 3.174 ms/op (-32.3%) 4.078 -> 2.570 ms/op (-37.0%) 4.024 -> 2.574 ms/op (-36.0%) 3.743 -> 2.361 ms/op (-36.9%)
removeAllUpTo 669.7 us -> 1.769 ms/op (+164.2%) 478.9 -> 358.8 us/op (-25.1%) 497.6 -> 205.5 us/op (-58.7%) 451.1 -> 109.8 us/op (-75.7%)

Allocation examples from -prof gc:

Benchmark Ledgers Old New
addOrReplace 1 47.939 B/op ~0 B/op
addOrReplace 20 46.772 B/op ~0 B/op
updateRemainingUnacked 1 47.940 B/op ~0 B/op
removeAndAddRemaining 1 87.940 B/op ~0 B/op
removeAndAddRemaining 20 86.773 B/op ~0 B/op

The main trade-off is removeAllUpTo on a single large ledger. The old inner TreeMap can remove a sorted prefix with headMap(...); the new primitive hash map scans the boundary ledger. Once the pending window spans several ledgers, whole-ledger removal through the outer TreeMap dominates and the new layout is faster.

Object Size

Measured locally with JOL 0.17 GraphLayout.totalSize() on OpenJDK 25.0.2. The helper source and JOL dependency are not included in this PR.

PendingAcksMap stores pending entries, not messages. With batching, the entry count is lower than the unacked message count.

Scenario Entries / ledgers Old New Change
Healthy consumer window 1,000 / 1 86.3 KiB 34.4 KiB -60.2%
Healthy window, avg batch size 10 100 / 1 8.9 KiB 4.6 KiB -48.3%
Slow consumer, half of default consumer unacked limit 25,000 / 1 2.10 MiB 1.06 MiB -49.3%
Slow consumer, default consumer unacked limit 50,000 / 1 4.20 MiB 2.13 MiB -49.4%
Default consumer unacked limit across 5 ledgers 50,000 / 5 4.20 MiB 1.33 MiB -68.3%
Default consumer unacked limit across 10 ledgers 50,000 / 10 4.20 MiB 1.33 MiB -68.3%
Default consumer unacked limit across 20 ledgers 50,000 / 20 4.20 MiB 1.33 MiB -68.3%
Residual pending entries across 100 ledgers 1,000 / 100 97.1 KiB 68.2 KiB -29.8%
Sparse worst case, one entry per ledger 1,000 / 1,000 195.5 KiB 679.9 KiB +247.7%

The sparse one-entry-per-ledger case is not the expected hot path, but it is useful as a bound. It can be optimized separately if it shows up in production profiles.

Verification

  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.PendingAcksMapTest
  • ./gradlew :pulsar-common:test --tests org.apache.pulsar.common.util.collections.Long2LongOpenHashMapTest
  • ./gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.MessageRedeliveryControllerTest.testGetMessagesToReplayNow --tests org.apache.pulsar.client.api.ConsumerRedeliveryTest.testOrderedRedelivery -PtestRetryCount=0
  • ./gradlew :pulsar-broker:checkstyleMain :pulsar-common:checkstyleMain
  • ./gradlew :pulsar-broker:checkstyleTest
  • ./gradlew :pulsar-common:checkstyleTest
  • ./gradlew :microbench:checkstyleMain
  • ./gradlew :microbench:shadowJar
  • JMH commands listed above
  • Local JOL GraphLayout.totalSize() run against an out-of-tree helper

@void-ptr974 void-ptr974 marked this pull request as ready for review June 13, 2026 15:31
@void-ptr974

Copy link
Copy Markdown
Contributor Author

This PR intentionally does not include the O(1) PendingAcksMap.size() change from #26019. If #26019 is merged first, I will update this PR on top of it and keep the cached size accounting in the primitive-map implementation.

@void-ptr974 void-ptr974 marked this pull request as draft June 14, 2026 02:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant