Skip to content

[client] Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation.#3026

Open
loserwang1024 wants to merge 2 commits intoapache:mainfrom
loserwang1024:arrow-memory-manager
Open

[client] Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation.#3026
loserwang1024 wants to merge 2 commits intoapache:mainfrom
loserwang1024:arrow-memory-manager

Conversation

@loserwang1024
Copy link
Copy Markdown
Contributor

(The sections below can be removed for hotfixes or typos)
-->

Purpose

Linked issue: close #3025

Brief change log

Tests

API and Format

Documentation

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses Arrow memory fragmentation/OOM during high-column-count decompression by replacing Netty-backed pooled allocations with a bump-pointer, chunk-recycling allocation strategy and wiring it into client read/write paths.

Changes:

  • Introduces ChunkedAllocationManager (bump-pointer sub-allocation within reusable native chunks) for Arrow allocations.
  • Adds allocator construction utilities (AllocatorUtil) and a custom rounding policy (FlussRoundingPolicy) used by the new allocator setup.
  • Updates client read/write components to use the new allocation manager (e.g., LogRecordReadContext, RecordAccumulator, LogFetcher, LimitBatchScanner).

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java Centralizes creation of a RootAllocator configured with a custom AllocationManager.Factory.
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java Allows injecting an AllocationManager.Factory and defaults Arrow read contexts to ChunkedFactory.
fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java Adds a custom rounding policy used in allocator configuration.
fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java Implements the new chunked bump-pointer allocation manager + factory/pool.
fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java Switches Arrow allocator used for writing to the new chunked allocator.
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java Shares a ChunkedFactory across local/remote read contexts for scanning.
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java Uses chunked allocator in read context for limit scans (Arrow path).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +297 to +307
// Align to 8 bytes for safe direct-memory access.
long alignedSize = (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1);

if (activeChunk == null || !activeChunk.hasRoom(alignedSize)) {
// Current chunk is full or doesn't exist — obtain a recycled or new chunk.
activeChunk = obtainChunk();
}

long offset = activeChunk.bumpAllocate(alignedSize);
return new ChunkedAllocationManager(accountingAllocator, activeChunk, offset, size);
}
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkedFactory.create() aligns allocations to 8 bytes (alignedSize) but the created ChunkedAllocationManager reports allocatedSize = size (unaligned). This can make Arrow's allocator accounting inconsistent with the actual bytes carved out of the chunk (e.g., size=1/2/4 becomes 8 bytes reserved) and may also return a buffer size that doesn't reflect the backing slice. Consider tracking and reporting the aligned size (or otherwise ensuring accounting matches the real reserved bytes).

Copilot uses AI. Check for mistakes.
Comment on lines +354 to +366
/**
* Closes this factory, freeing all cached chunks. Active chunks with outstanding
* sub-allocations will be freed when their last ArrowBuf is released.
*/
public synchronized void close() {
while (!freeChunks.isEmpty()) {
freeChunks.poll().destroy();
}
if (activeChunk != null && activeChunk.subAllocCount.get() == 0) {
activeChunk.destroy();
}
activeChunk = null;
}
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkedFactory allocates and caches native chunks (Unsafe.allocateMemory) and provides a close() method to free cached chunks, but no call sites invoke ChunkedFactory.close() (and it is not an @Override). If Arrow/RootAllocator does not automatically close the factory, this will retain native memory even after BufferAllocator.close(). Please wire factory closure into the owning component lifecycle (e.g., keep a reference and close it alongside the allocator) or implement the appropriate closeable interface expected by Arrow so it is closed automatically.

Copilot uses AI. Check for mistakes.
* @param chunkSize maximum size of each chunk (bytes). Allocations >= this go direct.
* @param maxFreeChunks maximum number of empty chunks to keep cached for reuse.
*/
public ChunkedFactory(long chunkSize, int maxFreeChunks) {
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkedFactory(long chunkSize, int maxFreeChunks) does not validate its inputs. Passing chunkSize <= 0 or maxFreeChunks < 0 can lead to Unsafe.allocateMemory errors or a factory that never recycles/frees chunks as intended. Please add argument validation (and fail fast with a clear exception).

Suggested change
public ChunkedFactory(long chunkSize, int maxFreeChunks) {
public ChunkedFactory(long chunkSize, int maxFreeChunks) {
if (chunkSize <= 0) {
throw new IllegalArgumentException(
"chunkSize must be > 0, but was " + chunkSize);
}
if (maxFreeChunks < 0) {
throw new IllegalArgumentException(
"maxFreeChunks must be >= 0, but was " + maxFreeChunks);
}

Copilot uses AI. Check for mistakes.
}

private static long validateAndCalculateChunkSize(long pageSize, int maxOrder) {
if (maxOrder > 14) {
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateAndCalculateChunkSize rejects maxOrder > 14 but does not reject negative values, even though the error message says the expected range is 0-14. This can silently produce an invalid chunkSize when -Dorg.apache.memory.allocator.maxOrder is negative. Please validate maxOrder >= 0 as well.

Suggested change
if (maxOrder > 14) {
if (maxOrder < 0 || maxOrder > 14) {

Copilot uses AI. Check for mistakes.
Comment on lines +27 to +32
* A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB to align with Netty
* 4.1+ memory allocation behavior.
*
* <p>Arrow's default maxOrder=11 (16MB chunks) can cause memory inefficiency when used with Netty's
* maxOrder=9 (4MB chunks). This class patches the default by using maxOrder=9.
*
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDoc for FlussRoundingPolicy is now misleading in the context of this PR: it describes aligning Arrow to Netty’s chunking behavior and “reducing Arrow’s chunk size”, but the PR is explicitly replacing Netty pooling with ChunkedAllocationManager. Please update the documentation to reflect the new allocator strategy (or, if this rounding policy is still needed, explain why it’s still relevant without Netty).

Suggested change
* A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB to align with Netty
* 4.1+ memory allocation behavior.
*
* <p>Arrow's default maxOrder=11 (16MB chunks) can cause memory inefficiency when used with Netty's
* maxOrder=9 (4MB chunks). This class patches the default by using maxOrder=9.
*
* A custom rounding policy that uses Fluss's configured Arrow chunk size instead of relying on
* Arrow's historical default chunk sizing.
*
* <p>By default, Arrow uses {@code maxOrder=11}, which yields 16MB chunks for the default page
* size. Fluss defaults {@code org.apache.memory.allocator.maxOrder} to {@code 9}, which yields
* 4MB chunks and keeps rounding behavior aligned with the chunk size used by Fluss's allocation
* strategy.
*
* <p>This policy is still needed without Netty because requests smaller than the configured chunk
* size are rounded to the next power of two, while larger requests are left unchanged. Using the
* configured chunk size here avoids Arrow rounding small allocations as if the chunk were larger
* than the allocator actually uses.
*

Copilot uses AI. Check for mistakes.
Comment on lines +18 to +36
package org.apache.fluss.shaded.arrow.org.apache.arrow.memory;

import org.apache.fluss.record.FlussRoundingPolicy;

import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.configBuilder;

/** Util to create UNSAFE_ALLOCATOR. */
public class AllocatorUtil {

private AllocatorUtil() {}

public static BufferAllocator createBufferAllocator(
AllocationManager.Factory allocationManagerFactory) {
return new RootAllocator(
configBuilder()
.listener(AllocationListener.NOOP)
.maxAllocation(Long.MAX_VALUE)
.roundingPolicy(FlussRoundingPolicy.DEFAULT_ROUNDING_POLICY)
.allocationManagerFactory(allocationManagerFactory)
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AllocatorUtil is placed under the shaded Arrow package but imports org.apache.fluss.record.FlussRoundingPolicy. This introduces a dependency from shaded Arrow code back into Fluss packages (unlike the other shaded Arrow sources under org/apache/fluss/shaded/arrow/...), which can undermine the purpose of shading and complicate dependency isolation. Consider moving this utility to a non-shaded Fluss package, or keep it in the shaded package but avoid referencing Fluss classes directly (e.g., accept a RoundingPolicy/config as a parameter).

Copilot uses AI. Check for mistakes.
Comment on lines 167 to 174
LogRecordReadContext readContext =
LogRecordReadContext.createReadContext(tableInfo, false, null, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo,
false,
null,
schemaGetter,
new ChunkedAllocationManager.ChunkedFactory());
LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer);
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In parseLimitScanResponse, the newly created LogRecordReadContext is never closed. Since LogRecordReadContext owns Arrow resources (VectorSchemaRoots + BufferAllocator), this will leak direct memory and native buffers. Please wrap the LogRecordReadContext in a try-with-resources (or otherwise ensure it’s closed) around the iteration over batches/records.

Copilot uses AI. Check for mistakes.
Comment on lines +127 to +134
ChunkedAllocationManager.ChunkedFactory chunkedFactory =
new ChunkedAllocationManager.ChunkedFactory();
this.readContext =
LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo, false, projection, schemaGetter, chunkedFactory);
this.remoteReadContext =
LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo, true, projection, schemaGetter, chunkedFactory);
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogFetcher creates a ChunkedFactory instance and shares it across two LogRecordReadContexts, but it is not retained as a field and never closed. If ChunkedFactory retains native chunks (as its close() method suggests), those chunks may remain allocated after readContext/remoteReadContext are closed. Consider storing the factory as a member and closing it in LogFetcher.close() (or ensure allocator shutdown closes the factory automatically).

Copilot uses AI. Check for mistakes.
Comment on lines +114 to +123
public class ChunkedAllocationManager extends AllocationManager {

/** 8-byte alignment for all sub-allocations within a chunk. */
private static final long ALIGNMENT = 8;

/** Default chunk size: 4MB (matches Netty 4.1+ maxOrder=9). */
private static final long DEFAULT_CHUNK_SIZE = 4L * 1024 * 1024;

/** Default maximum number of empty chunks to keep in the free-list. */
private static final int DEFAULT_MAX_FREE_CHUNKS = 3;
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is titled as a [client] change, but it introduces new allocator infrastructure in fluss-common (e.g., ChunkedAllocationManager, FlussRoundingPolicy, AllocatorUtil) in addition to the client wiring. Please consider adjusting the PR title/component to reflect the broader scope, or split common vs client wiring if that matches project norms.

Copilot uses AI. Check for mistakes.
Comment on lines +248 to +258
/**
* Factory that creates {@link ChunkedAllocationManager} instances.
*
* <p>Small allocations (< chunkSize) are packed into the current active chunk via bump-pointer.
* When the active chunk is full, a recycled or freshly-allocated chunk is used. Large
* allocations (>= chunkSize) get their own dedicated memory region.
*
* <p>This factory is thread-safe: {@link #create} and {@link #onChunkDrained} are {@code
* synchronized}.
*/
public static class ChunkedFactory implements AllocationManager.Factory {
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkedAllocationManager is a new, complex native-memory allocator (chunk pooling, ref-counting, concurrency). There are existing tests under fluss-common/src/test/java/org/apache/fluss/compression/* but no tests exercising this allocator’s core behaviors (chunk reuse, drain/recycle, direct-allocation path, and concurrent release race described in onChunkDrained). Adding focused unit tests here would help prevent regressions and validate the OOM fix scenario from #3025.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 please add UT for this class

Copy link
Copy Markdown
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remain concerned about modifying such a low-level memory allocator, especially given its complexity and the lack of unit test coverage. Could we investigate whether existing configuration options in the current allocator could mitigate this issue? Additionally, have we evaluated Arrow’s UnsafeAllocationManager and NettyAllocationManager as alternatives?

Furthermore, please note that KvManager still initializes RootAllocator with Long.MAX_VALUE.

* <p>For allocations >= chunkSize, a dedicated memory region is allocated directly (no bump
* pointer), behaving identically to {@link UnsafeAllocationManager}.
*/
public class ChunkedAllocationManager extends AllocationManager {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this class to package org.apache.fluss.row.arrow.memory, along with FlussRoundingPolicy and AllocatorUtil

Comment on lines +248 to +258
/**
* Factory that creates {@link ChunkedAllocationManager} instances.
*
* <p>Small allocations (< chunkSize) are packed into the current active chunk via bump-pointer.
* When the active chunk is full, a recycled or freshly-allocated chunk is used. Large
* allocations (>= chunkSize) get their own dedicated memory region.
*
* <p>This factory is thread-safe: {@link #create} and {@link #onChunkDrained} are {@code
* synchronized}.
*/
public static class ChunkedFactory implements AllocationManager.Factory {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 please add UT for this class

* }</pre>
*
* <p>For allocations >= chunkSize, a dedicated memory region is allocated directly (no bump
* pointer), behaving identically to {@link UnsafeAllocationManager}.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{@link UnsafeAllocationManager} -> {@code UnsafeAllocationManager}

public static class ChunkedFactory implements AllocationManager.Factory {

private static final ArrowBuf EMPTY =
new ArrowBuf(ReferenceManager.NO_OP, null, 0, MemoryUtil.UNSAFE.allocateMemory(0));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UNSAFE.allocateMemory(0) may return a non-zero address on some platforms (implementation-defined). This allocation is never freed since it's a static field with NO_OP reference manager. While the size is 0 (or very small), it's technically a native memory leak. Consider using the way of NettyAllocationManager#EMPTY_BUFFER.

selectedFields,
false,
schemaGetter,
new ChunkedAllocationManager.ChunkedFactory());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a new ChunkedAllocationManager.ChunkedFactory() per call (and thus a new chunk pool). It seems the default allocation manager is shared in JVM (and thus the chunk pool).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation

3 participants