[client] Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation.#3026
Conversation
…m 16MB to 4MB which same as netty arena.
…edAllocationManager for Arrow memory allocation.
There was a problem hiding this comment.
Pull request overview
This PR addresses Arrow memory fragmentation/OOM during high-column-count decompression by replacing Netty-backed pooled allocations with a bump-pointer, chunk-recycling allocation strategy and wiring it into client read/write paths.
Changes:
- Introduces
ChunkedAllocationManager(bump-pointer sub-allocation within reusable native chunks) for Arrow allocations. - Adds allocator construction utilities (
AllocatorUtil) and a custom rounding policy (FlussRoundingPolicy) used by the new allocator setup. - Updates client read/write components to use the new allocation manager (e.g.,
LogRecordReadContext,RecordAccumulator,LogFetcher,LimitBatchScanner).
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java | Centralizes creation of a RootAllocator configured with a custom AllocationManager.Factory. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java | Allows injecting an AllocationManager.Factory and defaults Arrow read contexts to ChunkedFactory. |
| fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java | Adds a custom rounding policy used in allocator configuration. |
| fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java | Implements the new chunked bump-pointer allocation manager + factory/pool. |
| fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java | Switches Arrow allocator used for writing to the new chunked allocator. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java | Shares a ChunkedFactory across local/remote read contexts for scanning. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java | Uses chunked allocator in read context for limit scans (Arrow path). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Align to 8 bytes for safe direct-memory access. | ||
| long alignedSize = (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1); | ||
|
|
||
| if (activeChunk == null || !activeChunk.hasRoom(alignedSize)) { | ||
| // Current chunk is full or doesn't exist — obtain a recycled or new chunk. | ||
| activeChunk = obtainChunk(); | ||
| } | ||
|
|
||
| long offset = activeChunk.bumpAllocate(alignedSize); | ||
| return new ChunkedAllocationManager(accountingAllocator, activeChunk, offset, size); | ||
| } |
There was a problem hiding this comment.
ChunkedFactory.create() aligns allocations to 8 bytes (alignedSize) but the created ChunkedAllocationManager reports allocatedSize = size (unaligned). This can make Arrow's allocator accounting inconsistent with the actual bytes carved out of the chunk (e.g., size=1/2/4 becomes 8 bytes reserved) and may also return a buffer size that doesn't reflect the backing slice. Consider tracking and reporting the aligned size (or otherwise ensuring accounting matches the real reserved bytes).
| /** | ||
| * Closes this factory, freeing all cached chunks. Active chunks with outstanding | ||
| * sub-allocations will be freed when their last ArrowBuf is released. | ||
| */ | ||
| public synchronized void close() { | ||
| while (!freeChunks.isEmpty()) { | ||
| freeChunks.poll().destroy(); | ||
| } | ||
| if (activeChunk != null && activeChunk.subAllocCount.get() == 0) { | ||
| activeChunk.destroy(); | ||
| } | ||
| activeChunk = null; | ||
| } |
There was a problem hiding this comment.
ChunkedFactory allocates and caches native chunks (Unsafe.allocateMemory) and provides a close() method to free cached chunks, but no call sites invoke ChunkedFactory.close() (and it is not an @Override). If Arrow/RootAllocator does not automatically close the factory, this will retain native memory even after BufferAllocator.close(). Please wire factory closure into the owning component lifecycle (e.g., keep a reference and close it alongside the allocator) or implement the appropriate closeable interface expected by Arrow so it is closed automatically.
| * @param chunkSize maximum size of each chunk (bytes). Allocations >= this go direct. | ||
| * @param maxFreeChunks maximum number of empty chunks to keep cached for reuse. | ||
| */ | ||
| public ChunkedFactory(long chunkSize, int maxFreeChunks) { |
There was a problem hiding this comment.
ChunkedFactory(long chunkSize, int maxFreeChunks) does not validate its inputs. Passing chunkSize <= 0 or maxFreeChunks < 0 can lead to Unsafe.allocateMemory errors or a factory that never recycles/frees chunks as intended. Please add argument validation (and fail fast with a clear exception).
| public ChunkedFactory(long chunkSize, int maxFreeChunks) { | |
| public ChunkedFactory(long chunkSize, int maxFreeChunks) { | |
| if (chunkSize <= 0) { | |
| throw new IllegalArgumentException( | |
| "chunkSize must be > 0, but was " + chunkSize); | |
| } | |
| if (maxFreeChunks < 0) { | |
| throw new IllegalArgumentException( | |
| "maxFreeChunks must be >= 0, but was " + maxFreeChunks); | |
| } |
| } | ||
|
|
||
| private static long validateAndCalculateChunkSize(long pageSize, int maxOrder) { | ||
| if (maxOrder > 14) { |
There was a problem hiding this comment.
validateAndCalculateChunkSize rejects maxOrder > 14 but does not reject negative values, even though the error message says the expected range is 0-14. This can silently produce an invalid chunkSize when -Dorg.apache.memory.allocator.maxOrder is negative. Please validate maxOrder >= 0 as well.
| if (maxOrder > 14) { | |
| if (maxOrder < 0 || maxOrder > 14) { |
| * A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB to align with Netty | ||
| * 4.1+ memory allocation behavior. | ||
| * | ||
| * <p>Arrow's default maxOrder=11 (16MB chunks) can cause memory inefficiency when used with Netty's | ||
| * maxOrder=9 (4MB chunks). This class patches the default by using maxOrder=9. | ||
| * |
There was a problem hiding this comment.
The JavaDoc for FlussRoundingPolicy is now misleading in the context of this PR: it describes aligning Arrow to Netty’s chunking behavior and “reducing Arrow’s chunk size”, but the PR is explicitly replacing Netty pooling with ChunkedAllocationManager. Please update the documentation to reflect the new allocator strategy (or, if this rounding policy is still needed, explain why it’s still relevant without Netty).
| * A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB to align with Netty | |
| * 4.1+ memory allocation behavior. | |
| * | |
| * <p>Arrow's default maxOrder=11 (16MB chunks) can cause memory inefficiency when used with Netty's | |
| * maxOrder=9 (4MB chunks). This class patches the default by using maxOrder=9. | |
| * | |
| * A custom rounding policy that uses Fluss's configured Arrow chunk size instead of relying on | |
| * Arrow's historical default chunk sizing. | |
| * | |
| * <p>By default, Arrow uses {@code maxOrder=11}, which yields 16MB chunks for the default page | |
| * size. Fluss defaults {@code org.apache.memory.allocator.maxOrder} to {@code 9}, which yields | |
| * 4MB chunks and keeps rounding behavior aligned with the chunk size used by Fluss's allocation | |
| * strategy. | |
| * | |
| * <p>This policy is still needed without Netty because requests smaller than the configured chunk | |
| * size are rounded to the next power of two, while larger requests are left unchanged. Using the | |
| * configured chunk size here avoids Arrow rounding small allocations as if the chunk were larger | |
| * than the allocator actually uses. | |
| * |
| package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; | ||
|
|
||
| import org.apache.fluss.record.FlussRoundingPolicy; | ||
|
|
||
| import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.configBuilder; | ||
|
|
||
| /** Util to create UNSAFE_ALLOCATOR. */ | ||
| public class AllocatorUtil { | ||
|
|
||
| private AllocatorUtil() {} | ||
|
|
||
| public static BufferAllocator createBufferAllocator( | ||
| AllocationManager.Factory allocationManagerFactory) { | ||
| return new RootAllocator( | ||
| configBuilder() | ||
| .listener(AllocationListener.NOOP) | ||
| .maxAllocation(Long.MAX_VALUE) | ||
| .roundingPolicy(FlussRoundingPolicy.DEFAULT_ROUNDING_POLICY) | ||
| .allocationManagerFactory(allocationManagerFactory) |
There was a problem hiding this comment.
AllocatorUtil is placed under the shaded Arrow package but imports org.apache.fluss.record.FlussRoundingPolicy. This introduces a dependency from shaded Arrow code back into Fluss packages (unlike the other shaded Arrow sources under org/apache/fluss/shaded/arrow/...), which can undermine the purpose of shading and complicate dependency isolation. Consider moving this utility to a non-shaded Fluss package, or keep it in the shaded package but avoid referencing Fluss classes directly (e.g., accept a RoundingPolicy/config as a parameter).
| LogRecordReadContext readContext = | ||
| LogRecordReadContext.createReadContext(tableInfo, false, null, schemaGetter); | ||
| LogRecordReadContext.createReadContext( | ||
| tableInfo, | ||
| false, | ||
| null, | ||
| schemaGetter, | ||
| new ChunkedAllocationManager.ChunkedFactory()); | ||
| LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer); |
There was a problem hiding this comment.
In parseLimitScanResponse, the newly created LogRecordReadContext is never closed. Since LogRecordReadContext owns Arrow resources (VectorSchemaRoots + BufferAllocator), this will leak direct memory and native buffers. Please wrap the LogRecordReadContext in a try-with-resources (or otherwise ensure it’s closed) around the iteration over batches/records.
| ChunkedAllocationManager.ChunkedFactory chunkedFactory = | ||
| new ChunkedAllocationManager.ChunkedFactory(); | ||
| this.readContext = | ||
| LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter); | ||
| LogRecordReadContext.createReadContext( | ||
| tableInfo, false, projection, schemaGetter, chunkedFactory); | ||
| this.remoteReadContext = | ||
| LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter); | ||
| LogRecordReadContext.createReadContext( | ||
| tableInfo, true, projection, schemaGetter, chunkedFactory); |
There was a problem hiding this comment.
LogFetcher creates a ChunkedFactory instance and shares it across two LogRecordReadContexts, but it is not retained as a field and never closed. If ChunkedFactory retains native chunks (as its close() method suggests), those chunks may remain allocated after readContext/remoteReadContext are closed. Consider storing the factory as a member and closing it in LogFetcher.close() (or ensure allocator shutdown closes the factory automatically).
| public class ChunkedAllocationManager extends AllocationManager { | ||
|
|
||
| /** 8-byte alignment for all sub-allocations within a chunk. */ | ||
| private static final long ALIGNMENT = 8; | ||
|
|
||
| /** Default chunk size: 4MB (matches Netty 4.1+ maxOrder=9). */ | ||
| private static final long DEFAULT_CHUNK_SIZE = 4L * 1024 * 1024; | ||
|
|
||
| /** Default maximum number of empty chunks to keep in the free-list. */ | ||
| private static final int DEFAULT_MAX_FREE_CHUNKS = 3; |
There was a problem hiding this comment.
This PR is titled as a [client] change, but it introduces new allocator infrastructure in fluss-common (e.g., ChunkedAllocationManager, FlussRoundingPolicy, AllocatorUtil) in addition to the client wiring. Please consider adjusting the PR title/component to reflect the broader scope, or split common vs client wiring if that matches project norms.
| /** | ||
| * Factory that creates {@link ChunkedAllocationManager} instances. | ||
| * | ||
| * <p>Small allocations (< chunkSize) are packed into the current active chunk via bump-pointer. | ||
| * When the active chunk is full, a recycled or freshly-allocated chunk is used. Large | ||
| * allocations (>= chunkSize) get their own dedicated memory region. | ||
| * | ||
| * <p>This factory is thread-safe: {@link #create} and {@link #onChunkDrained} are {@code | ||
| * synchronized}. | ||
| */ | ||
| public static class ChunkedFactory implements AllocationManager.Factory { |
There was a problem hiding this comment.
ChunkedAllocationManager is a new, complex native-memory allocator (chunk pooling, ref-counting, concurrency). There are existing tests under fluss-common/src/test/java/org/apache/fluss/compression/* but no tests exercising this allocator’s core behaviors (chunk reuse, drain/recycle, direct-allocation path, and concurrent release race described in onChunkDrained). Adding focused unit tests here would help prevent regressions and validate the OOM fix scenario from #3025.
There was a problem hiding this comment.
+1 please add UT for this class
wuchong
left a comment
There was a problem hiding this comment.
I remain concerned about modifying such a low-level memory allocator, especially given its complexity and the lack of unit test coverage. Could we investigate whether existing configuration options in the current allocator could mitigate this issue? Additionally, have we evaluated Arrow’s UnsafeAllocationManager and NettyAllocationManager as alternatives?
Furthermore, please note that KvManager still initializes RootAllocator with Long.MAX_VALUE.
| * <p>For allocations >= chunkSize, a dedicated memory region is allocated directly (no bump | ||
| * pointer), behaving identically to {@link UnsafeAllocationManager}. | ||
| */ | ||
| public class ChunkedAllocationManager extends AllocationManager { |
There was a problem hiding this comment.
move this class to package org.apache.fluss.row.arrow.memory, along with FlussRoundingPolicy and AllocatorUtil
| /** | ||
| * Factory that creates {@link ChunkedAllocationManager} instances. | ||
| * | ||
| * <p>Small allocations (< chunkSize) are packed into the current active chunk via bump-pointer. | ||
| * When the active chunk is full, a recycled or freshly-allocated chunk is used. Large | ||
| * allocations (>= chunkSize) get their own dedicated memory region. | ||
| * | ||
| * <p>This factory is thread-safe: {@link #create} and {@link #onChunkDrained} are {@code | ||
| * synchronized}. | ||
| */ | ||
| public static class ChunkedFactory implements AllocationManager.Factory { |
There was a problem hiding this comment.
+1 please add UT for this class
| * }</pre> | ||
| * | ||
| * <p>For allocations >= chunkSize, a dedicated memory region is allocated directly (no bump | ||
| * pointer), behaving identically to {@link UnsafeAllocationManager}. |
There was a problem hiding this comment.
{@link UnsafeAllocationManager} -> {@code UnsafeAllocationManager}
| public static class ChunkedFactory implements AllocationManager.Factory { | ||
|
|
||
| private static final ArrowBuf EMPTY = | ||
| new ArrowBuf(ReferenceManager.NO_OP, null, 0, MemoryUtil.UNSAFE.allocateMemory(0)); |
There was a problem hiding this comment.
UNSAFE.allocateMemory(0) may return a non-zero address on some platforms (implementation-defined). This allocation is never freed since it's a static field with NO_OP reference manager. While the size is 0 (or very small), it's technically a native memory leak. Consider using the way of NettyAllocationManager#EMPTY_BUFFER.
| selectedFields, | ||
| false, | ||
| schemaGetter, | ||
| new ChunkedAllocationManager.ChunkedFactory()); |
There was a problem hiding this comment.
This creates a new ChunkedAllocationManager.ChunkedFactory() per call (and thus a new chunk pool). It seems the default allocation manager is shared in JVM (and thus the chunk pool).
(The sections below can be removed for hotfixes or typos)
-->
Purpose
Linked issue: close #3025
Brief change log
Tests
API and Format
Documentation