diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java index 7cf802a91a8cf..a004873c62a87 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java @@ -18,6 +18,7 @@ package org.apache.flink.core.memory; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.Preconditions; import java.io.EOFException; @@ -30,6 +31,13 @@ /** A simple and efficient serializer for the {@link java.io.DataOutput} interface. */ public class DataOutputSerializer implements DataOutputView, MemorySegmentWritable { + /** + * Maximum array length the JVM can allocate. Some VMs reserve a few header bytes, so we cap + * below {@link Integer#MAX_VALUE} for safety. Matches the bound used by {@code + * java.util.ArrayList}. + */ + @VisibleForTesting static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + private byte[] buffer; private int position; @@ -333,26 +341,52 @@ private int getUTFBytesSize(int c) { } } + /** + * Computes the new buffer length for a {@link #resize(int)} call. + * + *

Uses {@code long} arithmetic so that doubling does not silently overflow once the current + * buffer length crosses {@code Integer.MAX_VALUE / 2}. When doubling would exceed {@link + * #MAX_ARRAY_SIZE}, the buffer jumps directly to the cap rather than growing by {@code + * minCapacityAdd} bytes at a time — the latter would degrade every subsequent resize into a + * full copy of a ~1–2 GB buffer. + * + * @throws IOException if the required size exceeds {@link #MAX_ARRAY_SIZE}. + */ + @VisibleForTesting + static int computeNewBufferLength(int currentLength, int minCapacityAdd) throws IOException { + long requiredLen = (long) currentLength + minCapacityAdd; + if (requiredLen > MAX_ARRAY_SIZE) { + throw new IOException( + "Serialization failed because the record length (" + + requiredLen + + " bytes) would exceed the maximum Java array size (" + + MAX_ARRAY_SIZE + + " bytes)."); + } + long doubledLen = (long) currentLength * 2L; + if (doubledLen > MAX_ARRAY_SIZE) { + return MAX_ARRAY_SIZE; + } + return (int) Math.max(doubledLen, requiredLen); + } + private void resize(int minCapacityAdd) throws IOException { - int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd); + int newLen = computeNewBufferLength(this.buffer.length, minCapacityAdd); byte[] nb; try { nb = new byte[newLen]; - } catch (NegativeArraySizeException e) { - throw new IOException( - "Serialization failed because the record length would exceed 2GB (max addressable array size in Java)."); } catch (OutOfMemoryError e) { // this was too large to allocate, try the smaller size (if possible) - if (newLen > this.buffer.length + minCapacityAdd) { - newLen = this.buffer.length + minCapacityAdd; + int minLen = this.buffer.length + minCapacityAdd; + if (newLen > minLen) { try { - nb = new byte[newLen]; + nb = new byte[minLen]; } catch (OutOfMemoryError ee) { // still not possible. give an informative exception message that reports the // size throw new IOException( "Failed to serialize element. Serialized size (> " - + newLen + + minLen + " bytes) exceeds JVM heap space", ee); } diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java index c3389ad899344..3dc0a8cce734b 100644 --- a/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java @@ -31,7 +31,10 @@ import java.util.ArrayDeque; import java.util.Random; +import static org.apache.flink.core.memory.DataOutputSerializer.MAX_ARRAY_SIZE; +import static org.apache.flink.core.memory.DataOutputSerializer.computeNewBufferLength; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the combination of {@link DataOutputSerializer} and {@link DataInputDeserializer}. */ class DataInputOutputSerializerTest { @@ -139,4 +142,32 @@ void testUTFWriteRead() throws IOException { String actual = deserializer.readUTF(); assertThat(actual).isEqualTo(expected); } + + @Test + void testComputeNewBufferLengthNormalDoubling() throws IOException { + assertThat(computeNewBufferLength(1024, 1)).isEqualTo(2048); + } + + @Test + void testComputeNewBufferLengthMinCapacityDominates() throws IOException { + assertThat(computeNewBufferLength(1000, 5000)).isEqualTo(6000); + } + + @Test + void testComputeNewBufferLengthJumpsToCapWhenDoublingOverflows() throws IOException { + // currentLength * 2 overflows int and would otherwise produce a linear-step resize loop. + assertThat(computeNewBufferLength(1_500_000_000, 100)).isEqualTo(MAX_ARRAY_SIZE); + } + + @Test + void testComputeNewBufferLengthExactlyAtCap() throws IOException { + assertThat(computeNewBufferLength(MAX_ARRAY_SIZE - 100, 50)).isEqualTo(MAX_ARRAY_SIZE); + } + + @Test + void testComputeNewBufferLengthThrowsWhenRequiredExceedsCap() { + assertThatThrownBy(() -> computeNewBufferLength(MAX_ARRAY_SIZE - 10, 100)) + .isInstanceOf(IOException.class) + .hasMessageContaining("would exceed the maximum Java array size"); + } }