Skip to content

[improve][broker] Optimize TripleLongPriorityQueue heap operations#26010

Open
nodece wants to merge 1 commit into
apache:masterfrom
nodece:optimize-TripleLongPriorityQueue-heap-operations
Open

[improve][broker] Optimize TripleLongPriorityQueue heap operations#26010
nodece wants to merge 1 commit into
apache:masterfrom
nodece:optimize-TripleLongPriorityQueue-heap-operations

Conversation

@nodece

@nodece nodece commented Jun 12, 2026

Copy link
Copy Markdown
Member

Motivation

TripleLongPriorityQueue is the core data structure for Pulsar's delayed message delivery, storing (deliverAt, ledgerId, entryId) tuples in a min-heap backed by SegmentedLongArray (direct memory, 16MB segments). CPU profiling shows siftUp/siftDown consuming ~37% of hot thread time during delayed message processing.

The original implementation has two performance issues:

  1. Redundant readLong calls: compare(tupleIdx1, tupleIdx2) reads 6 longs per comparison, but the current node's values are already known from the prior iteration — they're re-read from SegmentedLongArray unnecessarily.
  2. Swap-based sift: Each heap layer swap performs 6 readLong + 6 writeLong, but a hole-based approach only needs 3 writeLong per layer.

Each readLong on SegmentedLongArray incurs 3 layers of indirection (division + ArrayList.get() + ByteBuf.getLong()), so reducing readLong calls directly translates to throughput gains.

Modifications

Rewrote siftUp and siftDown using the hole-based algorithm with register-cached values:

  • siftUp(tupleIdx, n1, n2, n3): Passes the inserted element's values as parameters. Each layer only reads the parent's 3 longs (was: 6 longs for self + parent), then writes the parent down. The displaced value lives in local variables and is written once at the final position.
  • siftDown(tupleIdx, val0, val1, val2): Pop reads the last tuple and passes it directly. Each layer reads left + right child's 3 longs each (was: 6 longs for self + left + right), promotes the smaller child, and writes the displaced value at the leaf.
  • add(): Writes tuple to array directly, passes cached values to siftUp.
  • pop(): Reads last tuple, passes to siftDown(0, n1, n2, n3).
  • Removed compare(), swap(), put() methods.
  • >>> 1 instead of / 2 for unsigned shift.

Per-layer readLong reduction:

Operation Before After
siftUp per layer 12 3
siftDown per layer 24 6

Benchmark results on SegmentedLongArray:

Dataset size Before (AVG) After (AVG) Improvement
50K 38,690 us 34,220 us 11.5%
500K 525,517 us 461,174 us 12.2%
2M 2,136,791 us 1,704,437 us 20.2%

Further optimization

The remaining performance gap is dominated by SegmentedLongArray's per-readLong indirection cost. Replacing SegmentedLongArray with a flat long[] eliminates the segment lookup entirely — each readLong becomes a single array load with hardware prefetcher support. Benchmark comparison shows long[]-backed implementation is ~3x faster than SegmentedLongArray at the same heap operations. This could be considered as a follow-up if the direct memory trade-off (GC pressure vs. off-heap) is acceptable for the delayed delivery use case.

@dao-jun dao-jun left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't go through deeply, but one question about swapping.

One possible optimization for swapping would be to read 3 longs in one shot and just writing it to the other location in one shot, using a ByteBuf directly. For comparisons, reading and comparing the keys alone should be sufficient.

@lhotari

lhotari commented Jun 12, 2026

Copy link
Copy Markdown
Member

Benchmark comparison shows long[]-backed implementation is ~3x faster than SegmentedLongArray at the same heap operations. This could be considered as a follow-up if the direct memory trade-off (GC pressure vs. off-heap) is acceptable for the delayed delivery use case.

could you add the benchmarks to the microbench module?

@lhotari

lhotari commented Jun 13, 2026

Copy link
Copy Markdown
Member

I performed a local review with Claude Code and these were the findings:

  1. [QUALITY] Dead writes in add()
    The three array.writeLong(arrayIdx, …) calls before siftUp(tuplesCount, n1, n2, n3) are redundant. siftUp never reads the slot at tupleIdx (it only reads parents) and unconditionally writes (n1, n2, n3) at the final hole position — which equals arrayIdx when no movement occurs. Dropping them saves 3 writeLong calls per add(), exactly the kind of saving this PR is after. (SegmentedLongArray.writeLong is a plain setLong with no size/writer-index tracking, so nothing depends on the pre-write.)

  2. [QUALITY] Single-element pop() does wasted I/O
    pop() reads the last (= only) tuple and siftDown writes it back to slot 0, which is now beyond tuplesCount. Harmless (the old code's swap(0, 0) was worse), but an early return when tuplesCount == 0 after the decrement would skip 3 reads + 3 writes. Very minor.

  3. [QUALITY] No test changes for a heap-algorithm rewrite
    Existing coverage (testLargeQueue, testCompareWithSamePrefix) does exercise ordering, but for a from-scratch sift rewrite a randomized differential test against java.util.PriorityQueue<long[]> (interleaved add/pop, duplicates, same-prefix tuples) would be cheap insurance. Worth considering, not blocking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants