Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.core.memory;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;

import java.io.EOFException;
Expand All @@ -30,6 +31,13 @@
/** A simple and efficient serializer for the {@link java.io.DataOutput} interface. */
public class DataOutputSerializer implements DataOutputView, MemorySegmentWritable {

/**
* Maximum array length the JVM can allocate. Some VMs reserve a few header bytes, so we cap
* below {@link Integer#MAX_VALUE} for safety. Matches the bound used by {@code
* java.util.ArrayList}.
*/
@VisibleForTesting static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

private byte[] buffer;

private int position;
Expand Down Expand Up @@ -333,26 +341,52 @@ private int getUTFBytesSize(int c) {
}
}

/**
* Computes the new buffer length for a {@link #resize(int)} call.
*
* <p>Uses {@code long} arithmetic so that doubling does not silently overflow once the current
* buffer length crosses {@code Integer.MAX_VALUE / 2}. When doubling would exceed {@link
* #MAX_ARRAY_SIZE}, the buffer jumps directly to the cap rather than growing by {@code
* minCapacityAdd} bytes at a time — the latter would degrade every subsequent resize into a
* full copy of a ~1–2 GB buffer.
*
* @throws IOException if the required size exceeds {@link #MAX_ARRAY_SIZE}.
*/
@VisibleForTesting
static int computeNewBufferLength(int currentLength, int minCapacityAdd) throws IOException {
long requiredLen = (long) currentLength + minCapacityAdd;
if (requiredLen > MAX_ARRAY_SIZE) {
throw new IOException(
"Serialization failed because the record length ("
+ requiredLen
+ " bytes) would exceed the maximum Java array size ("
+ MAX_ARRAY_SIZE
+ " bytes).");
}
long doubledLen = (long) currentLength * 2L;
if (doubledLen > MAX_ARRAY_SIZE) {
return MAX_ARRAY_SIZE;
}
return (int) Math.max(doubledLen, requiredLen);
}

private void resize(int minCapacityAdd) throws IOException {
int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd);
int newLen = computeNewBufferLength(this.buffer.length, minCapacityAdd);
byte[] nb;
try {
nb = new byte[newLen];
} catch (NegativeArraySizeException e) {
throw new IOException(
"Serialization failed because the record length would exceed 2GB (max addressable array size in Java).");
} catch (OutOfMemoryError e) {
// this was too large to allocate, try the smaller size (if possible)
if (newLen > this.buffer.length + minCapacityAdd) {
newLen = this.buffer.length + minCapacityAdd;
int minLen = this.buffer.length + minCapacityAdd;
if (newLen > minLen) {
try {
nb = new byte[newLen];
nb = new byte[minLen];
} catch (OutOfMemoryError ee) {
// still not possible. give an informative exception message that reports the
// size
throw new IOException(
"Failed to serialize element. Serialized size (> "
+ newLen
+ minLen
+ " bytes) exceeds JVM heap space",
ee);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import java.util.ArrayDeque;
import java.util.Random;

import static org.apache.flink.core.memory.DataOutputSerializer.MAX_ARRAY_SIZE;
import static org.apache.flink.core.memory.DataOutputSerializer.computeNewBufferLength;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the combination of {@link DataOutputSerializer} and {@link DataInputDeserializer}. */
class DataInputOutputSerializerTest {
Expand Down Expand Up @@ -139,4 +142,32 @@ void testUTFWriteRead() throws IOException {
String actual = deserializer.readUTF();
assertThat(actual).isEqualTo(expected);
}

@Test
void testComputeNewBufferLengthNormalDoubling() throws IOException {
assertThat(computeNewBufferLength(1024, 1)).isEqualTo(2048);
}

@Test
void testComputeNewBufferLengthMinCapacityDominates() throws IOException {
assertThat(computeNewBufferLength(1000, 5000)).isEqualTo(6000);
}

@Test
void testComputeNewBufferLengthJumpsToCapWhenDoublingOverflows() throws IOException {
// currentLength * 2 overflows int and would otherwise produce a linear-step resize loop.
assertThat(computeNewBufferLength(1_500_000_000, 100)).isEqualTo(MAX_ARRAY_SIZE);
}

@Test
void testComputeNewBufferLengthExactlyAtCap() throws IOException {
assertThat(computeNewBufferLength(MAX_ARRAY_SIZE - 100, 50)).isEqualTo(MAX_ARRAY_SIZE);
}

@Test
void testComputeNewBufferLengthThrowsWhenRequiredExceedsCap() {
assertThatThrownBy(() -> computeNewBufferLength(MAX_ARRAY_SIZE - 10, 100))
.isInstanceOf(IOException.class)
.hasMessageContaining("would exceed the maximum Java array size");
}
}