[improve][broker] Optimize TripleLongPriorityQueue heap operations#26010
[improve][broker] Optimize TripleLongPriorityQueue heap operations#26010nodece wants to merge 1 commit into
Conversation
lhotari
left a comment
There was a problem hiding this comment.
Didn't go through deeply, but one question about swapping.
One possible optimization for swapping would be to read 3 longs in one shot and just writing it to the other location in one shot, using a ByteBuf directly. For comparisons, reading and comparing the keys alone should be sufficient.
could you add the benchmarks to the |
|
I performed a local review with Claude Code and these were the findings:
|
Motivation
TripleLongPriorityQueueis the core data structure for Pulsar's delayed message delivery, storing(deliverAt, ledgerId, entryId)tuples in a min-heap backed bySegmentedLongArray(direct memory, 16MB segments). CPU profiling showssiftUp/siftDownconsuming ~37% of hot thread time during delayed message processing.The original implementation has two performance issues:
compare(tupleIdx1, tupleIdx2)reads 6 longs per comparison, but the current node's values are already known from the prior iteration — they're re-read fromSegmentedLongArrayunnecessarily.Each
readLongonSegmentedLongArrayincurs 3 layers of indirection (division +ArrayList.get()+ByteBuf.getLong()), so reducing readLong calls directly translates to throughput gains.Modifications
Rewrote
siftUpandsiftDownusing the hole-based algorithm with register-cached values:siftUp(tupleIdx, n1, n2, n3): Passes the inserted element's values as parameters. Each layer only reads the parent's 3 longs (was: 6 longs for self + parent), then writes the parent down. The displaced value lives in local variables and is written once at the final position.siftDown(tupleIdx, val0, val1, val2): Pop reads the last tuple and passes it directly. Each layer reads left + right child's 3 longs each (was: 6 longs for self + left + right), promotes the smaller child, and writes the displaced value at the leaf.add(): Writes tuple to array directly, passes cached values tosiftUp.pop(): Reads last tuple, passes tosiftDown(0, n1, n2, n3).compare(),swap(),put()methods.>>> 1instead of/ 2for unsigned shift.Per-layer readLong reduction:
Benchmark results on
SegmentedLongArray:Further optimization
The remaining performance gap is dominated by
SegmentedLongArray's per-readLong indirection cost. ReplacingSegmentedLongArraywith a flatlong[]eliminates the segment lookup entirely — eachreadLongbecomes a single array load with hardware prefetcher support. Benchmark comparison showslong[]-backed implementation is ~3x faster thanSegmentedLongArrayat the same heap operations. This could be considered as a follow-up if the direct memory trade-off (GC pressure vs. off-heap) is acceptable for the delayed delivery use case.