From 0658c3ff2bf3a05934d1fba60842bb777e6c9a6e Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Sat, 7 Mar 2026 17:01:51 +0000 Subject: [PATCH 01/24] ALP: Add AlpConstants with encoding tables and format constants Add the foundational constants class for ALP encoding: - 7-byte page header matching C++ AlpHeader wire format - Float/double power-of-ten lookup tables - Sampler constants (vector sizes, combination limits) - Encoding limits and magic numbers for fast rounding - Per-vector metadata sizes (AlpInfo, ForInfo) - integerPow10() helper and validateVectorSize() --- .../column/values/alp/AlpConstants.java | 137 ++++++++++++++++++ .../column/values/alp/AlpConstantsTest.java | 103 +++++++++++++ 2 files changed, 240 insertions(+) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpConstantsTest.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java new file mode 100644 index 0000000000..6e14518aa0 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import org.apache.parquet.Preconditions; + +/** + * Constants for the ALP (Adaptive Lossless floating-Point) encoding. + * + *

ALP encoding converts floating-point values to integers using decimal scaling, + * then applies Frame of Reference encoding and bit-packing. + * Values that cannot be losslessly converted are stored as exceptions. + * + *

Based on the paper: "ALP: Adaptive Lossless floating-Point Compression" (SIGMOD 2024) + * + * @see ALP Paper + */ +public final class AlpConstants { + + private AlpConstants() { + // Utility class + } + + // ========== Page header (7 bytes, matching C++ AlpHeader) ========== + // [compression_mode(1)][integer_encoding(1)][log_vector_size(1)][num_elements(4)] + public static final int HEADER_SIZE = 7; + public static final int COMPRESSION_MODE_ALP = 0; + public static final int INTEGER_ENCODING_FOR = 0; + + // ========== Vector sizing ========== + public static final int DEFAULT_VECTOR_SIZE = 1024; + public static final int DEFAULT_VECTOR_SIZE_LOG = 10; + + // Capped at 15 (vectorSize=32768) because num_exceptions is uint16, + // so vectorSize must not exceed 65535 to avoid overflow when all values are exceptions. + static final int MAX_LOG_VECTOR_SIZE = 15; + static final int MIN_LOG_VECTOR_SIZE = 3; + + // ========== Sampler constants (matching C++ AlpConstants) ========== + static final int SAMPLER_VECTOR_SIZE = 4096; + static final int SAMPLER_ROWGROUP_SIZE = 122880; + static final int SAMPLER_SAMPLES_PER_VECTOR = 256; + static final int SAMPLER_SAMPLE_VECTORS_PER_ROWGROUP = 8; + static final int MAX_COMBINATIONS = 5; + static final int SAMPLING_EARLY_EXIT_THRESHOLD = 4; + + // ========== Float-specific ========== + static final int FLOAT_MAX_EXPONENT = 10; + static final float MAGIC_FLOAT = 12_582_912.0f; // 2^22 + 2^23 + static final float FLOAT_ENCODING_UPPER_LIMIT = 2147483520.0f; + static final float FLOAT_ENCODING_LOWER_LIMIT = -2147483520.0f; + static final int FLOAT_NEGATIVE_ZERO_BITS = 0x80000000; + + static final float[] FLOAT_POW10 = {1e0f, 1e1f, 1e2f, 1e3f, 1e4f, 1e5f, 1e6f, 1e7f, 1e8f, 1e9f, 1e10f}; + + // ========== Double-specific ========== + static final int DOUBLE_MAX_EXPONENT = 18; + static final double MAGIC_DOUBLE = 6_755_399_441_055_744.0; // 2^51 + 2^52 + static final double DOUBLE_ENCODING_UPPER_LIMIT = 9223372036854774784.0; + static final double DOUBLE_ENCODING_LOWER_LIMIT = -9223372036854774784.0; + static final long DOUBLE_NEGATIVE_ZERO_BITS = 0x8000000000000000L; + + static final double[] DOUBLE_POW10 = { + 1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12, 1e13, 1e14, 1e15, 1e16, 1e17, 1e18 + }; + + // ========== Per-vector metadata sizes ========== + public static final int ALP_INFO_SIZE = 4; // exponent(1) + factor(1) + num_exceptions(2) + public static final int FLOAT_FOR_INFO_SIZE = 5; // frame_of_reference(4) + bit_width(1) + public static final int DOUBLE_FOR_INFO_SIZE = 9; // frame_of_reference(8) + bit_width(1) + + // ========== Offset and position types ========== + // OffsetType = int (4 bytes), PositionType = short (2 bytes) — matching C++ uint32_t and uint16_t + public static final int OFFSET_SIZE = Integer.BYTES; + public static final int POSITION_SIZE = Short.BYTES; + + /** Returns 10^power as a long, for power in [0, 18]. */ + static long integerPow10(int power) { + Preconditions.checkArgument(power >= 0 && power <= 18, "power must be in [0, 18], got: %s", power); + return INTEGER_POW10[power]; + } + + private static final long[] INTEGER_POW10 = { + 1L, + 10L, + 100L, + 1_000L, + 10_000L, + 100_000L, + 1_000_000L, + 10_000_000L, + 100_000_000L, + 1_000_000_000L, + 10_000_000_000L, + 100_000_000_000L, + 1_000_000_000_000L, + 10_000_000_000_000L, + 100_000_000_000_000L, + 1_000_000_000_000_000L, + 10_000_000_000_000_000L, + 100_000_000_000_000_000L, + 1_000_000_000_000_000_000L, + }; + + /** Validates vector size: must be a power of 2 in [2^MIN_LOG .. 2^MAX_LOG]. */ + public static int validateVectorSize(int vectorSize) { + Preconditions.checkArgument( + vectorSize > 0 && (vectorSize & (vectorSize - 1)) == 0, + "Vector size must be a power of 2, got: %s", + vectorSize); + int logSize = Integer.numberOfTrailingZeros(vectorSize); + Preconditions.checkArgument( + logSize >= MIN_LOG_VECTOR_SIZE && logSize <= MAX_LOG_VECTOR_SIZE, + "Vector size log2 must be between %s and %s, got: %s (vectorSize=%s)", + MIN_LOG_VECTOR_SIZE, + MAX_LOG_VECTOR_SIZE, + logSize, + vectorSize); + return vectorSize; + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpConstantsTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpConstantsTest.java new file mode 100644 index 0000000000..10c1ea1454 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpConstantsTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class AlpConstantsTest { + + @Test + public void testHeaderSize() { + assertEquals(7, AlpConstants.HEADER_SIZE); + } + + @Test + public void testFloatPow10Table() { + assertEquals(11, AlpConstants.FLOAT_POW10.length); + assertEquals(1.0f, AlpConstants.FLOAT_POW10[0], 0.0f); + assertEquals(10.0f, AlpConstants.FLOAT_POW10[1], 0.0f); + assertEquals(1e10f, AlpConstants.FLOAT_POW10[10], 0.0f); + } + + @Test + public void testDoublePow10Table() { + assertEquals(19, AlpConstants.DOUBLE_POW10.length); + assertEquals(1.0, AlpConstants.DOUBLE_POW10[0], 0.0); + assertEquals(10.0, AlpConstants.DOUBLE_POW10[1], 0.0); + assertEquals(1e18, AlpConstants.DOUBLE_POW10[18], 0.0); + } + + @Test + public void testIntegerPow10() { + assertEquals(1L, AlpConstants.integerPow10(0)); + assertEquals(10L, AlpConstants.integerPow10(1)); + assertEquals(100L, AlpConstants.integerPow10(2)); + assertEquals(1_000_000_000L, AlpConstants.integerPow10(9)); + assertEquals(1_000_000_000_000_000_000L, AlpConstants.integerPow10(18)); + } + + @Test(expected = IllegalArgumentException.class) + public void testIntegerPow10NegativePower() { + AlpConstants.integerPow10(-1); + } + + @Test(expected = IllegalArgumentException.class) + public void testIntegerPow10TooLargePower() { + AlpConstants.integerPow10(19); + } + + @Test + public void testValidateVectorSize() { + assertEquals(8, AlpConstants.validateVectorSize(8)); + assertEquals(1024, AlpConstants.validateVectorSize(1024)); + assertEquals(32768, AlpConstants.validateVectorSize(32768)); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateVectorSizeNotPowerOf2() { + AlpConstants.validateVectorSize(100); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateVectorSizeTooSmall() { + AlpConstants.validateVectorSize(4); // 2^2 < MIN_LOG=3 + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateVectorSizeTooLarge() { + AlpConstants.validateVectorSize(65536); // 2^16 > MAX_LOG=15 + } + + @Test + public void testEncodingLimits() { + assertTrue(AlpConstants.FLOAT_ENCODING_UPPER_LIMIT > 0); + assertTrue(AlpConstants.FLOAT_ENCODING_LOWER_LIMIT < 0); + assertTrue(AlpConstants.DOUBLE_ENCODING_UPPER_LIMIT > 0); + assertTrue(AlpConstants.DOUBLE_ENCODING_LOWER_LIMIT < 0); + } + + @Test + public void testMetadataSizes() { + assertEquals(4, AlpConstants.ALP_INFO_SIZE); + assertEquals(5, AlpConstants.FLOAT_FOR_INFO_SIZE); + assertEquals(9, AlpConstants.DOUBLE_FOR_INFO_SIZE); + } +} From d656aafc6cf11e9132cc56ab4aa9c6ba2aabb4e1 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Sat, 7 Mar 2026 17:03:34 +0000 Subject: [PATCH 02/24] ALP: Add AlpEncoderDecoder with core encode/decode logic Add the core ALP encoding/decoding primitives: - Per-value float/double encode and decode - Fast rounding via magic number trick (matching C++) - Exception detection (NaN, Inf, -0.0, round-trip failure) - Bit width calculation for int and long - Bit packed size calculation - Best (exponent, factor) parameter search (full and preset-based) --- .../column/values/alp/AlpEncoderDecoder.java | 303 ++++++++++++++++++ .../values/alp/AlpEncoderDecoderTest.java | 151 +++++++++ 2 files changed, 454 insertions(+) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpEncoderDecoderTest.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java new file mode 100644 index 0000000000..34d2f091f1 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +/** + * Core ALP (Adaptive Lossless floating-Point) encoding and decoding logic. + * + *

ALP works by converting floating-point values to integers using decimal scaling, + * then applying Frame of Reference encoding and bit-packing. + * Values that cannot be losslessly converted are stored as exceptions. + * + *

Encoding formula: encoded = round(value * 10^exponent / 10^factor) + *

Decoding formula: value = encoded / 10^exponent * 10^factor + * + *

Exception conditions: + *

+ */ +final class AlpEncoderDecoder { + + private AlpEncoderDecoder() { + // Utility class + } + + // ========== Float multiplier ========== + + static float getFloatMultiplier(int exponent, int factor) { + float multiplier = FLOAT_POW10[exponent]; + if (factor > 0) { + multiplier /= FLOAT_POW10[factor]; + } + return multiplier; + } + + // ========== Double multiplier ========== + + static double getDoubleMultiplier(int exponent, int factor) { + double multiplier = DOUBLE_POW10[exponent]; + if (factor > 0) { + multiplier /= DOUBLE_POW10[factor]; + } + return multiplier; + } + + // ========== Float exception detection ========== + + /** NaN, Inf, and -0.0 can never be encoded regardless of exponent/factor. */ + static boolean isFloatException(float value) { + if (Float.isNaN(value)) { + return true; + } + if (Float.isInfinite(value)) { + return true; + } + return Float.floatToRawIntBits(value) == FLOAT_NEGATIVE_ZERO_BITS; + } + + /** Check round-trip: encode then decode, and see if we get the same bits back. */ + static boolean isFloatException(float value, int exponent, int factor) { + if (isFloatException(value)) { + return true; + } + float multiplier = getFloatMultiplier(exponent, factor); + float scaled = value * multiplier; + if (scaled > Integer.MAX_VALUE || scaled < Integer.MIN_VALUE) { + return true; + } + int encoded = encodeFloat(value, exponent, factor); + float decoded = decodeFloat(encoded, exponent, factor); + return Float.floatToRawIntBits(value) != Float.floatToRawIntBits(decoded); + } + + // ========== Float encode/decode ========== + + /** Encode: round(value * 10^exponent / 10^factor) */ + static int encodeFloat(float value, int exponent, int factor) { + return fastRoundFloat(value * getFloatMultiplier(exponent, factor)); + } + + /** Decode: encoded / 10^exponent * 10^factor */ + static float decodeFloat(int encoded, int exponent, int factor) { + return encoded / getFloatMultiplier(exponent, factor); + } + + // Uses the 2^22+2^23 magic-number trick to round without branching on the FPU. + static int fastRoundFloat(float value) { + if (value >= 0) { + return (int) ((value + MAGIC_FLOAT) - MAGIC_FLOAT); + } else { + return (int) ((value - MAGIC_FLOAT) + MAGIC_FLOAT); + } + } + + // ========== Double exception detection ========== + + static boolean isDoubleException(double value) { + if (Double.isNaN(value)) { + return true; + } + if (Double.isInfinite(value)) { + return true; + } + return Double.doubleToRawLongBits(value) == DOUBLE_NEGATIVE_ZERO_BITS; + } + + static boolean isDoubleException(double value, int exponent, int factor) { + if (isDoubleException(value)) { + return true; + } + double multiplier = getDoubleMultiplier(exponent, factor); + double scaled = value * multiplier; + if (scaled > Long.MAX_VALUE || scaled < Long.MIN_VALUE) { + return true; + } + long encoded = encodeDouble(value, exponent, factor); + double decoded = decodeDouble(encoded, exponent, factor); + return Double.doubleToRawLongBits(value) != Double.doubleToRawLongBits(decoded); + } + + // ========== Double encode/decode ========== + + static long encodeDouble(double value, int exponent, int factor) { + return fastRoundDouble(value * getDoubleMultiplier(exponent, factor)); + } + + static double decodeDouble(long encoded, int exponent, int factor) { + return encoded / getDoubleMultiplier(exponent, factor); + } + + // Same trick but with 2^51+2^52 for double precision. + static long fastRoundDouble(double value) { + if (value >= 0) { + return (long) ((value + MAGIC_DOUBLE) - MAGIC_DOUBLE); + } else { + return (long) ((value - MAGIC_DOUBLE) + MAGIC_DOUBLE); + } + } + + // ========== Bit width ========== + + /** Number of bits needed to represent maxDelta as an unsigned value. */ + static int bitWidthForInt(int maxDelta) { + if (maxDelta == 0) { + return 0; + } + return Integer.SIZE - Integer.numberOfLeadingZeros(maxDelta); + } + + static int bitWidthForLong(long maxDelta) { + if (maxDelta == 0) { + return 0; + } + return Long.SIZE - Long.numberOfLeadingZeros(maxDelta); + } + + /** Packed data size in bytes: ceil(numElements * bitWidth / 8). */ + static int bitPackedSize(int numElements, int bitWidth) { + return (numElements * bitWidth + 7) / 8; + } + + // ========== Encoding params ========== + + static class EncodingParams { + final int exponent; + final int factor; + final int numExceptions; + + EncodingParams(int exponent, int factor, int numExceptions) { + this.exponent = exponent; + this.factor = factor; + this.numExceptions = numExceptions; + } + } + + /** Try all (exponent, factor) combos and pick the one with fewest exceptions. */ + static EncodingParams findBestFloatParams(float[] values, int offset, int length) { + int bestExponent = 0; + int bestFactor = 0; + int bestExceptions = length; + + for (int e = 0; e <= FLOAT_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + int exceptions = 0; + for (int i = 0; i < length; i++) { + if (isFloatException(values[offset + i], e, f)) { + exceptions++; + } + } + if (exceptions < bestExceptions) { + bestExponent = e; + bestFactor = f; + bestExceptions = exceptions; + if (bestExceptions == 0) { + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + } + } + } + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + + /** Same as findBestFloatParams but only tries the cached preset combos. */ + static EncodingParams findBestFloatParamsWithPresets(float[] values, int offset, int length, int[][] presets) { + int bestExponent = presets[0][0]; + int bestFactor = presets[0][1]; + int bestExceptions = length; + + for (int[] preset : presets) { + int e = preset[0]; + int f = preset[1]; + int exceptions = 0; + for (int i = 0; i < length; i++) { + if (isFloatException(values[offset + i], e, f)) { + exceptions++; + } + } + if (exceptions < bestExceptions) { + bestExponent = e; + bestFactor = f; + bestExceptions = exceptions; + if (bestExceptions == 0) { + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + } + } + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + + static EncodingParams findBestDoubleParams(double[] values, int offset, int length) { + int bestExponent = 0; + int bestFactor = 0; + int bestExceptions = length; + + for (int e = 0; e <= DOUBLE_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + int exceptions = 0; + for (int i = 0; i < length; i++) { + if (isDoubleException(values[offset + i], e, f)) { + exceptions++; + } + } + if (exceptions < bestExceptions) { + bestExponent = e; + bestFactor = f; + bestExceptions = exceptions; + if (bestExceptions == 0) { + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + } + } + } + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + + static EncodingParams findBestDoubleParamsWithPresets(double[] values, int offset, int length, int[][] presets) { + int bestExponent = presets[0][0]; + int bestFactor = presets[0][1]; + int bestExceptions = length; + + for (int[] preset : presets) { + int e = preset[0]; + int f = preset[1]; + int exceptions = 0; + for (int i = 0; i < length; i++) { + if (isDoubleException(values[offset + i], e, f)) { + exceptions++; + } + } + if (exceptions < bestExceptions) { + bestExponent = e; + bestFactor = f; + bestExceptions = exceptions; + if (bestExceptions == 0) { + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + } + } + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpEncoderDecoderTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpEncoderDecoderTest.java new file mode 100644 index 0000000000..2c2279fdd9 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpEncoderDecoderTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class AlpEncoderDecoderTest { + + @Test + public void testFloatRoundTrip() { + float[] testValues = {0.0f, 1.0f, -1.0f, 3.14159f, 100.5f, 0.001f, 1234567.0f}; + for (float value : testValues) { + for (int e = 0; e <= AlpConstants.FLOAT_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + if (!AlpEncoderDecoder.isFloatException(value, e, f)) { + int encoded = AlpEncoderDecoder.encodeFloat(value, e, f); + float decoded = AlpEncoderDecoder.decodeFloat(encoded, e, f); + assertEquals(Float.floatToRawIntBits(value), Float.floatToRawIntBits(decoded)); + } + } + } + } + } + + @Test + public void testFloatExceptionDetection() { + assertTrue(AlpEncoderDecoder.isFloatException(Float.NaN)); + assertTrue(AlpEncoderDecoder.isFloatException(Float.POSITIVE_INFINITY)); + assertTrue(AlpEncoderDecoder.isFloatException(Float.NEGATIVE_INFINITY)); + assertTrue(AlpEncoderDecoder.isFloatException(-0.0f)); + assertFalse(AlpEncoderDecoder.isFloatException(1.0f)); + assertFalse(AlpEncoderDecoder.isFloatException(0.0f)); + } + + @Test + public void testFloatEncoding() { + assertEquals(123, AlpEncoderDecoder.encodeFloat(1.23f, 2, 0)); + assertEquals(123, AlpEncoderDecoder.encodeFloat(12.3f, 2, 1)); + assertEquals(0, AlpEncoderDecoder.encodeFloat(0.0f, 5, 0)); + } + + @Test + public void testFastRoundFloat() { + assertEquals(5, AlpEncoderDecoder.fastRoundFloat(5.4f)); + assertEquals(6, AlpEncoderDecoder.fastRoundFloat(5.5f)); + assertEquals(-5, AlpEncoderDecoder.fastRoundFloat(-5.4f)); + assertEquals(-6, AlpEncoderDecoder.fastRoundFloat(-5.5f)); + assertEquals(0, AlpEncoderDecoder.fastRoundFloat(0.0f)); + } + + @Test + public void testDoubleRoundTrip() { + double[] testValues = {0.0, 1.0, -1.0, 3.14159265358979, 100.5, 0.001}; + for (double value : testValues) { + for (int e = 0; e <= Math.min(AlpConstants.DOUBLE_MAX_EXPONENT, 10); e++) { + for (int f = 0; f <= e; f++) { + if (!AlpEncoderDecoder.isDoubleException(value, e, f)) { + long encoded = AlpEncoderDecoder.encodeDouble(value, e, f); + double decoded = AlpEncoderDecoder.decodeDouble(encoded, e, f); + assertEquals(Double.doubleToRawLongBits(value), Double.doubleToRawLongBits(decoded)); + } + } + } + } + } + + @Test + public void testDoubleExceptionDetection() { + assertTrue(AlpEncoderDecoder.isDoubleException(Double.NaN)); + assertTrue(AlpEncoderDecoder.isDoubleException(Double.POSITIVE_INFINITY)); + assertTrue(AlpEncoderDecoder.isDoubleException(Double.NEGATIVE_INFINITY)); + assertTrue(AlpEncoderDecoder.isDoubleException(-0.0)); + assertFalse(AlpEncoderDecoder.isDoubleException(1.0)); + assertFalse(AlpEncoderDecoder.isDoubleException(0.0)); + } + + @Test + public void testBitWidthForInt() { + assertEquals(0, AlpEncoderDecoder.bitWidthForInt(0)); + assertEquals(1, AlpEncoderDecoder.bitWidthForInt(1)); + assertEquals(8, AlpEncoderDecoder.bitWidthForInt(255)); + assertEquals(9, AlpEncoderDecoder.bitWidthForInt(256)); + assertEquals(31, AlpEncoderDecoder.bitWidthForInt(Integer.MAX_VALUE)); + } + + @Test + public void testBitWidthForLong() { + assertEquals(0, AlpEncoderDecoder.bitWidthForLong(0L)); + assertEquals(1, AlpEncoderDecoder.bitWidthForLong(1L)); + assertEquals(63, AlpEncoderDecoder.bitWidthForLong(Long.MAX_VALUE)); + } + + @Test + public void testBitPackedSize() { + assertEquals(0, AlpEncoderDecoder.bitPackedSize(1024, 0)); + assertEquals(128, AlpEncoderDecoder.bitPackedSize(1024, 1)); + assertEquals(1024, AlpEncoderDecoder.bitPackedSize(1024, 8)); + assertEquals(1, AlpEncoderDecoder.bitPackedSize(3, 2)); // ceil(6/8)=1 + } + + @Test + public void testFindBestFloatParams() { + float[] values = {1.23f, 4.56f, 7.89f, 10.11f, 12.13f}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestFloatParams(values, 0, values.length); + assertNotNull(params); + assertEquals(0, params.numExceptions); + } + + @Test + public void testFindBestFloatParamsAllExceptions() { + float[] values = {Float.NaN, Float.NaN, Float.NaN}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestFloatParams(values, 0, values.length); + assertEquals(values.length, params.numExceptions); + } + + @Test + public void testFindBestDoubleParams() { + double[] values = {1.23, 4.56, 7.89, 10.11, 12.13}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestDoubleParams(values, 0, values.length); + assertNotNull(params); + assertEquals(0, params.numExceptions); + } + + @Test + public void testFindBestParamsWithPresets() { + float[] values = {1.23f, 4.56f, 7.89f}; + AlpEncoderDecoder.EncodingParams fullResult = AlpEncoderDecoder.findBestFloatParams(values, 0, values.length); + int[][] presets = {{fullResult.exponent, fullResult.factor}, {0, 0}, {1, 0}}; + AlpEncoderDecoder.EncodingParams presetResult = + AlpEncoderDecoder.findBestFloatParamsWithPresets(values, 0, values.length, presets); + assertTrue(presetResult.numExceptions <= fullResult.numExceptions); + } +} From 388d2a929987764575dd74d9c2f050f5c65b3b8f Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Sat, 7 Mar 2026 17:48:26 +0000 Subject: [PATCH 03/24] ALP: Add AlpCompression for single-vector compress/decompress Implements the core ALP compression pipeline for individual vectors: encode floats/doubles to integers, apply Frame of Reference, bit-pack. Decompression reverses this with exception patching. Includes FloatCompressedVector/DoubleCompressedVector with store/load serialization matching the C++ wire format, and AlpEncodingPreset for preset-based encoding parameter selection. --- .../column/values/alp/AlpCompression.java | 493 ++++++++++++++++++ .../column/values/alp/AlpCompressionTest.java | 381 ++++++++++++++ 2 files changed, 874 insertions(+) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpCompression.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpCompressionTest.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpCompression.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpCompression.java new file mode 100644 index 0000000000..7baf60abef --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpCompression.java @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.apache.parquet.column.values.bitpacking.BytePacker; +import org.apache.parquet.column.values.bitpacking.BytePackerForLong; +import org.apache.parquet.column.values.bitpacking.Packer; + +/** + * ALP compression and decompression for single vectors of floating-point values. + * + *

Compression pipeline: find best (exponent, factor) → encode to integers → + * Frame of Reference → bit-pack. Decompression reverses this. + * + *

Mirrors C++ {@code AlpCompression} with overloaded static methods for float and double. + */ +final class AlpCompression { + + private AlpCompression() {} + + // ========== AlpEncodingPreset ========== + + /** Preset containing candidate (exponent, factor) combinations from sampling. */ + static final class AlpEncodingPreset { + final int[][] combinations; // each element is {exponent, factor} + + AlpEncodingPreset(int[][] combinations) { + this.combinations = combinations; + } + } + + // ========== FloatCompressedVector ========== + + /** + * A compressed ALP vector for float data. + * + *

Wire format (little-endian): + * [AlpInfo(4B)][ForInfo(5B)][PackedValues][ExcPositions][ExcValues] + */ + static final class FloatCompressedVector { + int exponent; + int factor; + int numExceptions; + int frameOfReference; + int bitWidth; + int numElements; + byte[] packedValues; + short[] exceptionPositions; + float[] exceptionValues; + + int storedSize() { + return ALP_INFO_SIZE + FLOAT_FOR_INFO_SIZE + dataStoredSize(); + } + + int dataStoredSize() { + return AlpEncoderDecoder.bitPackedSize(numElements, bitWidth) + + numExceptions * (POSITION_SIZE + Float.BYTES); + } + + void store(byte[] output, int offset) { + ByteBuffer buf = ByteBuffer.wrap(output, offset, storedSize()).order(ByteOrder.LITTLE_ENDIAN); + buf.put((byte) exponent); + buf.put((byte) factor); + buf.putShort((short) numExceptions); + buf.putInt(frameOfReference); + buf.put((byte) bitWidth); + storeDataTo(buf); + } + + void storeDataOnly(byte[] output, int offset) { + ByteBuffer buf = + ByteBuffer.wrap(output, offset, dataStoredSize()).order(ByteOrder.LITTLE_ENDIAN); + storeDataTo(buf); + } + + private void storeDataTo(ByteBuffer buf) { + int bps = AlpEncoderDecoder.bitPackedSize(numElements, bitWidth); + buf.put(packedValues, 0, bps); + for (int i = 0; i < numExceptions; i++) { + buf.putShort(exceptionPositions[i]); + } + for (int i = 0; i < numExceptions; i++) { + buf.putFloat(exceptionValues[i]); + } + } + + static FloatCompressedVector load(byte[] input, int offset, int numElements) { + ByteBuffer buf = + ByteBuffer.wrap(input, offset, input.length - offset).order(ByteOrder.LITTLE_ENDIAN); + FloatCompressedVector v = new FloatCompressedVector(); + v.numElements = numElements; + v.exponent = buf.get() & 0xFF; + v.factor = buf.get() & 0xFF; + v.numExceptions = buf.getShort() & 0xFFFF; + v.frameOfReference = buf.getInt(); + v.bitWidth = buf.get() & 0xFF; + int bps = AlpEncoderDecoder.bitPackedSize(numElements, v.bitWidth); + v.packedValues = new byte[bps]; + buf.get(v.packedValues); + v.exceptionPositions = new short[v.numExceptions]; + for (int i = 0; i < v.numExceptions; i++) { + v.exceptionPositions[i] = buf.getShort(); + } + v.exceptionValues = new float[v.numExceptions]; + for (int i = 0; i < v.numExceptions; i++) { + v.exceptionValues[i] = buf.getFloat(); + } + return v; + } + } + + // ========== DoubleCompressedVector ========== + + /** + * A compressed ALP vector for double data. + * + *

Wire format (little-endian): + * [AlpInfo(4B)][ForInfo(9B)][PackedValues][ExcPositions][ExcValues] + */ + static final class DoubleCompressedVector { + int exponent; + int factor; + int numExceptions; + long frameOfReference; + int bitWidth; + int numElements; + byte[] packedValues; + short[] exceptionPositions; + double[] exceptionValues; + + int storedSize() { + return ALP_INFO_SIZE + DOUBLE_FOR_INFO_SIZE + dataStoredSize(); + } + + int dataStoredSize() { + return AlpEncoderDecoder.bitPackedSize(numElements, bitWidth) + + numExceptions * (POSITION_SIZE + Double.BYTES); + } + + void store(byte[] output, int offset) { + ByteBuffer buf = ByteBuffer.wrap(output, offset, storedSize()).order(ByteOrder.LITTLE_ENDIAN); + buf.put((byte) exponent); + buf.put((byte) factor); + buf.putShort((short) numExceptions); + buf.putLong(frameOfReference); + buf.put((byte) bitWidth); + storeDataTo(buf); + } + + void storeDataOnly(byte[] output, int offset) { + ByteBuffer buf = + ByteBuffer.wrap(output, offset, dataStoredSize()).order(ByteOrder.LITTLE_ENDIAN); + storeDataTo(buf); + } + + private void storeDataTo(ByteBuffer buf) { + int bps = AlpEncoderDecoder.bitPackedSize(numElements, bitWidth); + buf.put(packedValues, 0, bps); + for (int i = 0; i < numExceptions; i++) { + buf.putShort(exceptionPositions[i]); + } + for (int i = 0; i < numExceptions; i++) { + buf.putDouble(exceptionValues[i]); + } + } + + static DoubleCompressedVector load(byte[] input, int offset, int numElements) { + ByteBuffer buf = + ByteBuffer.wrap(input, offset, input.length - offset).order(ByteOrder.LITTLE_ENDIAN); + DoubleCompressedVector v = new DoubleCompressedVector(); + v.numElements = numElements; + v.exponent = buf.get() & 0xFF; + v.factor = buf.get() & 0xFF; + v.numExceptions = buf.getShort() & 0xFFFF; + v.frameOfReference = buf.getLong(); + v.bitWidth = buf.get() & 0xFF; + int bps = AlpEncoderDecoder.bitPackedSize(numElements, v.bitWidth); + v.packedValues = new byte[bps]; + buf.get(v.packedValues); + v.exceptionPositions = new short[v.numExceptions]; + for (int i = 0; i < v.numExceptions; i++) { + v.exceptionPositions[i] = buf.getShort(); + } + v.exceptionValues = new double[v.numExceptions]; + for (int i = 0; i < v.numExceptions; i++) { + v.exceptionValues[i] = buf.getDouble(); + } + return v; + } + } + + // ========== Compress float ========== + + static FloatCompressedVector compressFloatVector( + float[] input, int count, AlpEncodingPreset preset) { + if (count == 0) { + FloatCompressedVector r = new FloatCompressedVector(); + r.packedValues = new byte[0]; + r.exceptionPositions = new short[0]; + r.exceptionValues = new float[0]; + return r; + } + + // 1. Find best (exponent, factor) from preset + AlpEncoderDecoder.EncodingParams params = + AlpEncoderDecoder.findBestFloatParamsWithPresets(input, 0, count, preset.combinations); + int exponent = params.exponent; + int factor = params.factor; + + // 2. Encode all values to integers + int[] encoded = new int[count]; + for (int i = 0; i < count; i++) { + encoded[i] = AlpEncoderDecoder.encodeFloat(input[i], exponent, factor); + } + + // 3. Detect exceptions via bitwise round-trip check + int numExceptions = 0; + short[] excPositions = new short[count]; + for (int i = 0; i < count; i++) { + float decoded = AlpEncoderDecoder.decodeFloat(encoded[i], exponent, factor); + if (Float.floatToRawIntBits(decoded) != Float.floatToRawIntBits(input[i])) { + excPositions[numExceptions++] = (short) i; + } + } + + // 4. Find first non-exception value as placeholder (0 if all are exceptions) + int placeholder = 0; + int excIdx = 0; + for (int i = 0; i < count; i++) { + if (excIdx < numExceptions && (excPositions[excIdx] & 0xFFFF) == i) { + excIdx++; + } else { + placeholder = encoded[i]; + break; + } + } + + // 5. Replace exceptions with placeholder, collect original values + float[] excValues = new float[numExceptions]; + for (int i = 0; i < numExceptions; i++) { + int pos = excPositions[i] & 0xFFFF; + excValues[i] = input[pos]; + encoded[pos] = placeholder; + } + + // 6. FOR encoding + int min = encoded[0]; + int max = encoded[0]; + for (int i = 1; i < count; i++) { + if (encoded[i] < min) min = encoded[i]; + if (encoded[i] > max) max = encoded[i]; + } + for (int i = 0; i < count; i++) { + encoded[i] -= min; + } + int maxDelta = max - min; + + // 7. Bit packing + int bitWidth = AlpEncoderDecoder.bitWidthForInt(maxDelta); + int bps = AlpEncoderDecoder.bitPackedSize(count, bitWidth); + byte[] packedValues = new byte[bps]; + if (bitWidth > 0) { + packInts(encoded, count, bitWidth, packedValues); + } + + // Build result + FloatCompressedVector result = new FloatCompressedVector(); + result.exponent = exponent; + result.factor = factor; + result.numExceptions = numExceptions; + result.frameOfReference = min; + result.bitWidth = bitWidth; + result.numElements = count; + result.packedValues = packedValues; + result.exceptionPositions = Arrays.copyOf(excPositions, numExceptions); + result.exceptionValues = excValues; + return result; + } + + // ========== Decompress float ========== + + static void decompressFloatVector(FloatCompressedVector v, float[] output) { + // 1. Unpack integers + int[] encoded = new int[v.numElements]; + if (v.bitWidth > 0) { + unpackInts(v.packedValues, v.numElements, v.bitWidth, encoded); + } + + // 2. Fused unFOR + decode + for (int i = 0; i < v.numElements; i++) { + int unfored = encoded[i] + v.frameOfReference; + output[i] = AlpEncoderDecoder.decodeFloat(unfored, v.exponent, v.factor); + } + + // 3. Patch exceptions + for (int i = 0; i < v.numExceptions; i++) { + output[v.exceptionPositions[i] & 0xFFFF] = v.exceptionValues[i]; + } + } + + // ========== Compress double ========== + + static DoubleCompressedVector compressDoubleVector( + double[] input, int count, AlpEncodingPreset preset) { + if (count == 0) { + DoubleCompressedVector r = new DoubleCompressedVector(); + r.packedValues = new byte[0]; + r.exceptionPositions = new short[0]; + r.exceptionValues = new double[0]; + return r; + } + + AlpEncoderDecoder.EncodingParams params = + AlpEncoderDecoder.findBestDoubleParamsWithPresets(input, 0, count, preset.combinations); + int exponent = params.exponent; + int factor = params.factor; + + long[] encoded = new long[count]; + for (int i = 0; i < count; i++) { + encoded[i] = AlpEncoderDecoder.encodeDouble(input[i], exponent, factor); + } + + int numExceptions = 0; + short[] excPositions = new short[count]; + for (int i = 0; i < count; i++) { + double decoded = AlpEncoderDecoder.decodeDouble(encoded[i], exponent, factor); + if (Double.doubleToRawLongBits(decoded) != Double.doubleToRawLongBits(input[i])) { + excPositions[numExceptions++] = (short) i; + } + } + + long placeholder = 0; + int excIdx = 0; + for (int i = 0; i < count; i++) { + if (excIdx < numExceptions && (excPositions[excIdx] & 0xFFFF) == i) { + excIdx++; + } else { + placeholder = encoded[i]; + break; + } + } + + double[] excValues = new double[numExceptions]; + for (int i = 0; i < numExceptions; i++) { + int pos = excPositions[i] & 0xFFFF; + excValues[i] = input[pos]; + encoded[pos] = placeholder; + } + + long min = encoded[0]; + long max = encoded[0]; + for (int i = 1; i < count; i++) { + if (encoded[i] < min) min = encoded[i]; + if (encoded[i] > max) max = encoded[i]; + } + for (int i = 0; i < count; i++) { + encoded[i] -= min; + } + long maxDelta = max - min; + + int bitWidth = AlpEncoderDecoder.bitWidthForLong(maxDelta); + int bps = AlpEncoderDecoder.bitPackedSize(count, bitWidth); + byte[] packedValues = new byte[bps]; + if (bitWidth > 0) { + packLongs(encoded, count, bitWidth, packedValues); + } + + DoubleCompressedVector result = new DoubleCompressedVector(); + result.exponent = exponent; + result.factor = factor; + result.numExceptions = numExceptions; + result.frameOfReference = min; + result.bitWidth = bitWidth; + result.numElements = count; + result.packedValues = packedValues; + result.exceptionPositions = Arrays.copyOf(excPositions, numExceptions); + result.exceptionValues = excValues; + return result; + } + + // ========== Decompress double ========== + + static void decompressDoubleVector(DoubleCompressedVector v, double[] output) { + long[] encoded = new long[v.numElements]; + if (v.bitWidth > 0) { + unpackLongs(v.packedValues, v.numElements, v.bitWidth, encoded); + } + + for (int i = 0; i < v.numElements; i++) { + long unfored = encoded[i] + v.frameOfReference; + output[i] = AlpEncoderDecoder.decodeDouble(unfored, v.exponent, v.factor); + } + + for (int i = 0; i < v.numExceptions; i++) { + output[v.exceptionPositions[i] & 0xFFFF] = v.exceptionValues[i]; + } + } + + // ========== Bit packing helpers ========== + + @SuppressWarnings("deprecation") + static void packInts(int[] values, int count, int bitWidth, byte[] output) { + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); + int fullGroups = count / 8; + for (int g = 0; g < fullGroups; g++) { + packer.pack8Values(values, g * 8, output, g * bitWidth); + } + int remaining = count % 8; + if (remaining > 0) { + int[] padded = new int[8]; + System.arraycopy(values, fullGroups * 8, padded, 0, remaining); + byte[] tmp = new byte[bitWidth]; + packer.pack8Values(padded, 0, tmp, 0); + int tailBytes = AlpEncoderDecoder.bitPackedSize(count, bitWidth) - fullGroups * bitWidth; + System.arraycopy(tmp, 0, output, fullGroups * bitWidth, tailBytes); + } + } + + @SuppressWarnings("deprecation") + static void unpackInts(byte[] packed, int count, int bitWidth, int[] output) { + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); + int fullGroups = count / 8; + for (int g = 0; g < fullGroups; g++) { + packer.unpack8Values(packed, g * bitWidth, output, g * 8); + } + int remaining = count % 8; + if (remaining > 0) { + byte[] tmp = new byte[bitWidth]; + int available = packed.length - fullGroups * bitWidth; + System.arraycopy(packed, fullGroups * bitWidth, tmp, 0, Math.min(available, bitWidth)); + int[] padded = new int[8]; + packer.unpack8Values(tmp, 0, padded, 0); + System.arraycopy(padded, 0, output, fullGroups * 8, remaining); + } + } + + @SuppressWarnings("deprecation") + static void packLongs(long[] values, int count, int bitWidth, byte[] output) { + BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidth); + int fullGroups = count / 8; + for (int g = 0; g < fullGroups; g++) { + packer.pack8Values(values, g * 8, output, g * bitWidth); + } + int remaining = count % 8; + if (remaining > 0) { + long[] padded = new long[8]; + System.arraycopy(values, fullGroups * 8, padded, 0, remaining); + byte[] tmp = new byte[bitWidth]; + packer.pack8Values(padded, 0, tmp, 0); + int tailBytes = AlpEncoderDecoder.bitPackedSize(count, bitWidth) - fullGroups * bitWidth; + System.arraycopy(tmp, 0, output, fullGroups * bitWidth, tailBytes); + } + } + + @SuppressWarnings("deprecation") + static void unpackLongs(byte[] packed, int count, int bitWidth, long[] output) { + BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidth); + int fullGroups = count / 8; + for (int g = 0; g < fullGroups; g++) { + packer.unpack8Values(packed, g * bitWidth, output, g * 8); + } + int remaining = count % 8; + if (remaining > 0) { + byte[] tmp = new byte[bitWidth]; + int available = packed.length - fullGroups * bitWidth; + System.arraycopy(packed, fullGroups * bitWidth, tmp, 0, Math.min(available, bitWidth)); + long[] padded = new long[8]; + packer.unpack8Values(tmp, 0, padded, 0); + System.arraycopy(padded, 0, output, fullGroups * 8, remaining); + } + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpCompressionTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpCompressionTest.java new file mode 100644 index 0000000000..5cd00d6e41 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpCompressionTest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.junit.Test; + +public class AlpCompressionTest { + + // ========== Helpers ========== + + private static AlpCompression.AlpEncodingPreset allFloatCombos() { + List combos = new ArrayList<>(); + for (int e = 0; e <= AlpConstants.FLOAT_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + combos.add(new int[] {e, f}); + } + } + return new AlpCompression.AlpEncodingPreset(combos.toArray(new int[0][])); + } + + private static AlpCompression.AlpEncodingPreset allDoubleCombos() { + List combos = new ArrayList<>(); + for (int e = 0; e <= AlpConstants.DOUBLE_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + combos.add(new int[] {e, f}); + } + } + return new AlpCompression.AlpEncodingPreset(combos.toArray(new int[0][])); + } + + private static void assertFloatRoundTrip(float[] input) { + AlpCompression.AlpEncodingPreset preset = allFloatCombos(); + AlpCompression.FloatCompressedVector cv = + AlpCompression.compressFloatVector(input, input.length, preset); + float[] output = new float[input.length]; + AlpCompression.decompressFloatVector(cv, output); + for (int i = 0; i < input.length; i++) { + assertEquals( + "Mismatch at index " + i, + Float.floatToRawIntBits(input[i]), + Float.floatToRawIntBits(output[i])); + } + } + + private static void assertDoubleRoundTrip(double[] input) { + AlpCompression.AlpEncodingPreset preset = allDoubleCombos(); + AlpCompression.DoubleCompressedVector cv = + AlpCompression.compressDoubleVector(input, input.length, preset); + double[] output = new double[input.length]; + AlpCompression.decompressDoubleVector(cv, output); + for (int i = 0; i < input.length; i++) { + assertEquals( + "Mismatch at index " + i, + Double.doubleToRawLongBits(input[i]), + Double.doubleToRawLongBits(output[i])); + } + } + + // ========== Float compress/decompress ========== + + @Test + public void testFloatConstantValues() { + float[] input = new float[100]; + for (int i = 0; i < 100; i++) { + input[i] = 3.14f; + } + assertFloatRoundTrip(input); + + // Constant → bitWidth should be 0 + AlpCompression.FloatCompressedVector cv = + AlpCompression.compressFloatVector(input, input.length, allFloatCombos()); + assertEquals(0, cv.bitWidth); + } + + @Test + public void testFloatDecimalValues() { + float[] input = new float[100]; + for (int i = 0; i < 100; i++) { + input[i] = i * 0.1f; + } + assertFloatRoundTrip(input); + } + + @Test + public void testFloatIntegerValues() { + float[] input = new float[100]; + for (int i = 0; i < 100; i++) { + input[i] = i; + } + assertFloatRoundTrip(input); + } + + @Test + public void testFloatRandomValues() { + Random rng = new Random(42); + float[] input = new float[200]; + for (int i = 0; i < 200; i++) { + input[i] = Math.round(rng.nextFloat() * 10000) / 100.0f; + } + assertFloatRoundTrip(input); + } + + @Test + public void testFloatSpecialValues() { + float[] input = {1.0f, Float.NaN, 2.0f, Float.POSITIVE_INFINITY, 3.0f, + Float.NEGATIVE_INFINITY, 4.0f, -0.0f, 5.0f}; + assertFloatRoundTrip(input); + + AlpCompression.FloatCompressedVector cv = + AlpCompression.compressFloatVector(input, input.length, allFloatCombos()); + // NaN, +Inf, -Inf, -0.0 should be exceptions + assertTrue(cv.numExceptions >= 4); + } + + @Test + public void testFloatSingleElement() { + assertFloatRoundTrip(new float[] {42.5f}); + } + + @Test + public void testFloatEmptyVector() { + AlpCompression.FloatCompressedVector cv = + AlpCompression.compressFloatVector(new float[0], 0, allFloatCombos()); + assertEquals(0, cv.numElements); + assertEquals(0, cv.numExceptions); + } + + @Test + public void testFloatAllExceptions() { + float[] input = new float[16]; + for (int i = 0; i < 16; i++) { + input[i] = Float.NaN; + } + assertFloatRoundTrip(input); + + AlpCompression.FloatCompressedVector cv = + AlpCompression.compressFloatVector(input, input.length, allFloatCombos()); + assertEquals(16, cv.numExceptions); + assertEquals(0, cv.bitWidth); + } + + @Test + public void testFloatExactVectorSize() { + float[] input = new float[AlpConstants.DEFAULT_VECTOR_SIZE]; + for (int i = 0; i < input.length; i++) { + input[i] = i * 0.01f; + } + assertFloatRoundTrip(input); + } + + @Test + public void testFloatNonMultipleOf8() { + // 13 elements — tests tail handling in pack/unpack + float[] input = new float[13]; + for (int i = 0; i < 13; i++) { + input[i] = i * 1.5f; + } + assertFloatRoundTrip(input); + } + + // ========== Float store/load ========== + + @Test + public void testFloatStoreLoadRoundTrip() { + float[] input = new float[50]; + for (int i = 0; i < 50; i++) { + input[i] = i * 0.3f; + } + + AlpCompression.FloatCompressedVector cv = + AlpCompression.compressFloatVector(input, input.length, allFloatCombos()); + + byte[] buf = new byte[cv.storedSize()]; + cv.store(buf, 0); + + AlpCompression.FloatCompressedVector loaded = + AlpCompression.FloatCompressedVector.load(buf, 0, input.length); + + float[] output = new float[input.length]; + AlpCompression.decompressFloatVector(loaded, output); + + for (int i = 0; i < input.length; i++) { + assertEquals( + "Mismatch at index " + i, + Float.floatToRawIntBits(input[i]), + Float.floatToRawIntBits(output[i])); + } + } + + @Test + public void testFloatStoreLoadMetadata() { + float[] input = {1.1f, 2.2f, 3.3f, Float.NaN, 5.5f}; + AlpCompression.FloatCompressedVector cv = + AlpCompression.compressFloatVector(input, input.length, allFloatCombos()); + + byte[] buf = new byte[cv.storedSize()]; + cv.store(buf, 0); + + AlpCompression.FloatCompressedVector loaded = + AlpCompression.FloatCompressedVector.load(buf, 0, input.length); + + assertEquals(cv.exponent, loaded.exponent); + assertEquals(cv.factor, loaded.factor); + assertEquals(cv.numExceptions, loaded.numExceptions); + assertEquals(cv.frameOfReference, loaded.frameOfReference); + assertEquals(cv.bitWidth, loaded.bitWidth); + assertEquals(cv.numElements, loaded.numElements); + } + + // ========== Double compress/decompress ========== + + @Test + public void testDoubleConstantValues() { + double[] input = new double[100]; + for (int i = 0; i < 100; i++) { + input[i] = 3.14; + } + assertDoubleRoundTrip(input); + + AlpCompression.DoubleCompressedVector cv = + AlpCompression.compressDoubleVector(input, input.length, allDoubleCombos()); + assertEquals(0, cv.bitWidth); + } + + @Test + public void testDoubleDecimalValues() { + double[] input = new double[100]; + for (int i = 0; i < 100; i++) { + input[i] = i * 0.01; + } + assertDoubleRoundTrip(input); + } + + @Test + public void testDoubleRandomValues() { + Random rng = new Random(42); + double[] input = new double[200]; + for (int i = 0; i < 200; i++) { + input[i] = Math.round(rng.nextDouble() * 10000) / 100.0; + } + assertDoubleRoundTrip(input); + } + + @Test + public void testDoubleSpecialValues() { + double[] input = {1.0, Double.NaN, 2.0, Double.POSITIVE_INFINITY, 3.0, + Double.NEGATIVE_INFINITY, 4.0, -0.0, 5.0}; + assertDoubleRoundTrip(input); + + AlpCompression.DoubleCompressedVector cv = + AlpCompression.compressDoubleVector(input, input.length, allDoubleCombos()); + assertTrue(cv.numExceptions >= 4); + } + + @Test + public void testDoubleSingleElement() { + assertDoubleRoundTrip(new double[] {42.5}); + } + + @Test + public void testDoubleAllExceptions() { + double[] input = new double[16]; + for (int i = 0; i < 16; i++) { + input[i] = Double.NaN; + } + assertDoubleRoundTrip(input); + + AlpCompression.DoubleCompressedVector cv = + AlpCompression.compressDoubleVector(input, input.length, allDoubleCombos()); + assertEquals(16, cv.numExceptions); + } + + @Test + public void testDoubleNonMultipleOf8() { + double[] input = new double[13]; + for (int i = 0; i < 13; i++) { + input[i] = i * 1.5; + } + assertDoubleRoundTrip(input); + } + + // ========== Double store/load ========== + + @Test + public void testDoubleStoreLoadRoundTrip() { + double[] input = new double[50]; + for (int i = 0; i < 50; i++) { + input[i] = i * 0.3; + } + + AlpCompression.DoubleCompressedVector cv = + AlpCompression.compressDoubleVector(input, input.length, allDoubleCombos()); + + byte[] buf = new byte[cv.storedSize()]; + cv.store(buf, 0); + + AlpCompression.DoubleCompressedVector loaded = + AlpCompression.DoubleCompressedVector.load(buf, 0, input.length); + + double[] output = new double[input.length]; + AlpCompression.decompressDoubleVector(loaded, output); + + for (int i = 0; i < input.length; i++) { + assertEquals( + "Mismatch at index " + i, + Double.doubleToRawLongBits(input[i]), + Double.doubleToRawLongBits(output[i])); + } + } + + // ========== Bit packing helpers ========== + + @Test + public void testPackUnpackInts() { + int[] values = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; + int bitWidth = 4; + byte[] packed = new byte[AlpEncoderDecoder.bitPackedSize(values.length, bitWidth)]; + AlpCompression.packInts(values, values.length, bitWidth, packed); + + int[] unpacked = new int[values.length]; + AlpCompression.unpackInts(packed, values.length, bitWidth, unpacked); + assertArrayEquals(values, unpacked); + } + + @Test + public void testPackUnpackIntsNonMultipleOf8() { + int[] values = {5, 10, 15, 20, 25}; + int bitWidth = 5; + byte[] packed = new byte[AlpEncoderDecoder.bitPackedSize(values.length, bitWidth)]; + AlpCompression.packInts(values, values.length, bitWidth, packed); + + int[] unpacked = new int[values.length]; + AlpCompression.unpackInts(packed, values.length, bitWidth, unpacked); + assertArrayEquals(values, unpacked); + } + + @Test + public void testPackUnpackLongs() { + long[] values = {0, 1, 2, 3, 4, 5, 6, 7, 100, 200, 300, 400, 500, 600, 700, 800}; + int bitWidth = 10; + byte[] packed = new byte[AlpEncoderDecoder.bitPackedSize(values.length, bitWidth)]; + AlpCompression.packLongs(values, values.length, bitWidth, packed); + + long[] unpacked = new long[values.length]; + AlpCompression.unpackLongs(packed, values.length, bitWidth, unpacked); + assertArrayEquals(values, unpacked); + } + + @Test + public void testPackUnpackLongsNonMultipleOf8() { + long[] values = {10, 20, 30}; + int bitWidth = 6; + byte[] packed = new byte[AlpEncoderDecoder.bitPackedSize(values.length, bitWidth)]; + AlpCompression.packLongs(values, values.length, bitWidth, packed); + + long[] unpacked = new long[values.length]; + AlpCompression.unpackLongs(packed, values.length, bitWidth, unpacked); + assertArrayEquals(values, unpacked); + } +} From 9bb8a8622590b60cce1cce21ab5a6e48b6040608 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Sat, 7 Mar 2026 18:03:04 +0000 Subject: [PATCH 04/24] ALP: Add AlpSampler for sampling-based encoding preset generation Implements FloatSampler and DoubleSampler that collect representative samples from input data and generate AlpEncodingPreset with the best (exponent, factor) combinations. Matches C++ AlpSampler behavior: equidistant vector sampling, compressed-size estimation, and top-k combination selection. --- .../parquet/column/values/alp/AlpSampler.java | 349 ++++++++++++++++++ .../column/values/alp/AlpSamplerTest.java | 195 ++++++++++ 2 files changed, 544 insertions(+) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpSampler.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpSamplerTest.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpSampler.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpSampler.java new file mode 100644 index 0000000000..20a9f3ac67 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpSampler.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * ALP sampler that collects representative samples and generates encoding presets. + * + *

Usage: call {@code addSample()} to feed data, then {@code finalize()} to get the preset. + * Mirrors C++ {@code AlpSampler} with separate inner classes for float and double. + */ +final class AlpSampler { + + private AlpSampler() {} + + // ========== FloatSampler ========== + + static final class FloatSampler { + private final long sampleVectorSize = SAMPLER_VECTOR_SIZE; + private final long rowgroupSize = SAMPLER_ROWGROUP_SIZE; + private final long samplesPerVector = SAMPLER_SAMPLES_PER_VECTOR; + private final long sampleVectorsPerRowgroup = SAMPLER_SAMPLE_VECTORS_PER_ROWGROUP; + private final long rowgroupSampleJump; + + private long vectorsSampledCount; + private long totalValuesCount; + private long vectorsCount; + private final List rowgroupSample = new ArrayList<>(); + + FloatSampler() { + rowgroupSampleJump = (rowgroupSize / sampleVectorsPerRowgroup) / sampleVectorSize; + } + + /** Add a sample of arbitrary size (split into vectors internally). */ + void addSample(float[] data, int count) { + for (int i = 0; i < count; i += (int) sampleVectorSize) { + int elements = (int) Math.min(count - i, sampleVectorSize); + addSampleVector(data, i, elements); + } + } + + private void addSampleVector(float[] data, int offset, int length) { + boolean mustSkip = mustSkipSamplingFromCurrentVector( + vectorsCount, vectorsSampledCount, length); + vectorsCount++; + totalValuesCount += length; + if (mustSkip) { + return; + } + + int numLookup = (int) Math.min(length, DEFAULT_VECTOR_SIZE); + int increment = (int) Math.max(1, (int) Math.ceil((double) numLookup / samplesPerVector)); + + // Take equidistant subsample + List sample = new ArrayList<>(); + for (int i = 0; i < numLookup; i += increment) { + sample.add(data[offset + i]); + } + + float[] sampleArray = new float[sample.size()]; + for (int i = 0; i < sample.size(); i++) { + sampleArray[i] = sample.get(i); + } + + rowgroupSample.add(sampleArray); + vectorsSampledCount++; + } + + private boolean mustSkipSamplingFromCurrentVector( + long vectorsCount, long vectorsSampledCount, int currentVectorSize) { + if ((vectorsCount % rowgroupSampleJump) != 0) { + return true; + } + return currentVectorSize < SAMPLER_SAMPLES_PER_VECTOR && vectorsSampledCount != 0; + } + + /** Finalize sampling and return the encoding preset. */ + AlpCompression.AlpEncodingPreset finalizeSampling() { + return createFloatEncodingPreset(rowgroupSample); + } + } + + // ========== DoubleSampler ========== + + static final class DoubleSampler { + private final long sampleVectorSize = SAMPLER_VECTOR_SIZE; + private final long rowgroupSize = SAMPLER_ROWGROUP_SIZE; + private final long samplesPerVector = SAMPLER_SAMPLES_PER_VECTOR; + private final long sampleVectorsPerRowgroup = SAMPLER_SAMPLE_VECTORS_PER_ROWGROUP; + private final long rowgroupSampleJump; + + private long vectorsSampledCount; + private long totalValuesCount; + private long vectorsCount; + private final List rowgroupSample = new ArrayList<>(); + + DoubleSampler() { + rowgroupSampleJump = (rowgroupSize / sampleVectorsPerRowgroup) / sampleVectorSize; + } + + void addSample(double[] data, int count) { + for (int i = 0; i < count; i += (int) sampleVectorSize) { + int elements = (int) Math.min(count - i, sampleVectorSize); + addSampleVector(data, i, elements); + } + } + + private void addSampleVector(double[] data, int offset, int length) { + boolean mustSkip = mustSkipSamplingFromCurrentVector( + vectorsCount, vectorsSampledCount, length); + vectorsCount++; + totalValuesCount += length; + if (mustSkip) { + return; + } + + int numLookup = (int) Math.min(length, DEFAULT_VECTOR_SIZE); + int increment = (int) Math.max(1, (int) Math.ceil((double) numLookup / samplesPerVector)); + + List sample = new ArrayList<>(); + for (int i = 0; i < numLookup; i += increment) { + sample.add(data[offset + i]); + } + + double[] sampleArray = new double[sample.size()]; + for (int i = 0; i < sample.size(); i++) { + sampleArray[i] = sample.get(i); + } + + rowgroupSample.add(sampleArray); + vectorsSampledCount++; + } + + private boolean mustSkipSamplingFromCurrentVector( + long vectorsCount, long vectorsSampledCount, int currentVectorSize) { + if ((vectorsCount % rowgroupSampleJump) != 0) { + return true; + } + return currentVectorSize < SAMPLER_SAMPLES_PER_VECTOR && vectorsSampledCount != 0; + } + + AlpCompression.AlpEncodingPreset finalizeSampling() { + return createDoubleEncodingPreset(rowgroupSample); + } + } + + // ========== CreateEncodingPreset (float) ========== + + /** + * Estimate compressed size in bits for a given (exponent, factor) on sample data. + * Returns -1 if the combination yields almost all exceptions (< 2 non-exceptions). + */ + private static long estimateFloatCompressedSize( + float[] sample, int exponent, int factor, boolean penalizeExceptions) { + int minEncoded = Integer.MAX_VALUE; + int maxEncoded = Integer.MIN_VALUE; + int numExceptions = 0; + int numNonExceptions = 0; + + for (float value : sample) { + int encoded = AlpEncoderDecoder.encodeFloat(value, exponent, factor); + float decoded = AlpEncoderDecoder.decodeFloat(encoded, exponent, factor); + if (Float.floatToRawIntBits(decoded) == Float.floatToRawIntBits(value)) { + numNonExceptions++; + if (encoded < minEncoded) minEncoded = encoded; + if (encoded > maxEncoded) maxEncoded = encoded; + } else { + numExceptions++; + } + } + + if (penalizeExceptions && numNonExceptions < 2) { + return -1; + } + + long delta; + if (numNonExceptions >= 2) { + // Unsigned difference + delta = Integer.toUnsignedLong(maxEncoded) - Integer.toUnsignedLong(minEncoded); + } else { + delta = 0; + } + int bitsPerValue = (delta == 0) ? 0 : (64 - Long.numberOfLeadingZeros(delta)); + long estimatedSize = (long) sample.length * bitsPerValue; + estimatedSize += (long) numExceptions * (32 + POSITION_SIZE * 8); + return estimatedSize; + } + + static AlpCompression.AlpEncodingPreset createFloatEncodingPreset( + List vectorsSampled) { + // For each sampled vector, find the best (e,f) combo by estimated compressed size. + // Count how many times each best combo appears across all sampled vectors. + Map bestCombosCount = new HashMap<>(); // key = e<<8|f, value = [count] + + for (float[] sample : vectorsSampled) { + long bestSize = Long.MAX_VALUE; + int bestE = FLOAT_MAX_EXPONENT; + int bestF = FLOAT_MAX_EXPONENT; + + for (int e = 0; e <= FLOAT_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + long size = estimateFloatCompressedSize(sample, e, f, true); + if (size < 0) continue; + if (size < bestSize + || (size == bestSize && e > bestE) + || (size == bestSize && e == bestE && f > bestF)) { + bestSize = size; + bestE = e; + bestF = f; + } + } + } + long key = ((long) bestE << 8) | bestF; + bestCombosCount.computeIfAbsent(key, k -> new int[1])[0]++; + } + + // Sort by appearance count (descending), then by exponent/factor (descending) + List> sorted = new ArrayList<>(bestCombosCount.entrySet()); + sorted.sort((a, b) -> { + int cmpCount = Integer.compare(b.getValue()[0], a.getValue()[0]); + if (cmpCount != 0) return cmpCount; + int eA = (int) (a.getKey() >> 8); + int fA = (int) (a.getKey() & 0xFF); + int eB = (int) (b.getKey() >> 8); + int fB = (int) (b.getKey() & 0xFF); + if (eA != eB) return Integer.compare(eB, eA); + return Integer.compare(fB, fA); + }); + + int k = Math.min(MAX_COMBINATIONS, sorted.size()); + int[][] combinations = new int[k][2]; + for (int i = 0; i < k; i++) { + long key = sorted.get(i).getKey(); + combinations[i][0] = (int) (key >> 8); + combinations[i][1] = (int) (key & 0xFF); + } + return new AlpCompression.AlpEncodingPreset(combinations); + } + + // ========== CreateEncodingPreset (double) ========== + + private static long estimateDoubleCompressedSize( + double[] sample, int exponent, int factor, boolean penalizeExceptions) { + long minEncoded = Long.MAX_VALUE; + long maxEncoded = Long.MIN_VALUE; + int numExceptions = 0; + int numNonExceptions = 0; + + for (double value : sample) { + long encoded = AlpEncoderDecoder.encodeDouble(value, exponent, factor); + double decoded = AlpEncoderDecoder.decodeDouble(encoded, exponent, factor); + if (Double.doubleToRawLongBits(decoded) == Double.doubleToRawLongBits(value)) { + numNonExceptions++; + if (encoded < minEncoded) minEncoded = encoded; + if (encoded > maxEncoded) maxEncoded = encoded; + } else { + numExceptions++; + } + } + + if (penalizeExceptions && numNonExceptions < 2) { + return -1; + } + + // For bit width: unsigned difference. Use Long.compareUnsigned logic. + int bitsPerValue; + if (numNonExceptions < 2) { + bitsPerValue = 0; + } else { + // Unsigned subtraction: maxEncoded - minEncoded as unsigned + long delta = maxEncoded - minEncoded; + bitsPerValue = (delta == 0) ? 0 : (64 - Long.numberOfLeadingZeros(delta)); + } + long estimatedSize = (long) sample.length * bitsPerValue; + estimatedSize += (long) numExceptions * (64 + POSITION_SIZE * 8); + return estimatedSize; + } + + static AlpCompression.AlpEncodingPreset createDoubleEncodingPreset( + List vectorsSampled) { + Map bestCombosCount = new HashMap<>(); + + for (double[] sample : vectorsSampled) { + long bestSize = Long.MAX_VALUE; + int bestE = DOUBLE_MAX_EXPONENT; + int bestF = DOUBLE_MAX_EXPONENT; + + for (int e = 0; e <= DOUBLE_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + long size = estimateDoubleCompressedSize(sample, e, f, true); + if (size < 0) continue; + if (size < bestSize + || (size == bestSize && e > bestE) + || (size == bestSize && e == bestE && f > bestF)) { + bestSize = size; + bestE = e; + bestF = f; + } + } + } + long key = ((long) bestE << 8) | bestF; + bestCombosCount.computeIfAbsent(key, k -> new int[1])[0]++; + } + + List> sorted = new ArrayList<>(bestCombosCount.entrySet()); + sorted.sort((a, b) -> { + int cmpCount = Integer.compare(b.getValue()[0], a.getValue()[0]); + if (cmpCount != 0) return cmpCount; + int eA = (int) (a.getKey() >> 8); + int fA = (int) (a.getKey() & 0xFF); + int eB = (int) (b.getKey() >> 8); + int fB = (int) (b.getKey() & 0xFF); + if (eA != eB) return Integer.compare(eB, eA); + return Integer.compare(fB, fA); + }); + + int k = Math.min(MAX_COMBINATIONS, sorted.size()); + int[][] combinations = new int[k][2]; + for (int i = 0; i < k; i++) { + long key = sorted.get(i).getKey(); + combinations[i][0] = (int) (key >> 8); + combinations[i][1] = (int) (key & 0xFF); + } + return new AlpCompression.AlpEncodingPreset(combinations); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpSamplerTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpSamplerTest.java new file mode 100644 index 0000000000..1073144a61 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpSamplerTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import java.util.Random; +import org.junit.Test; + +public class AlpSamplerTest { + + // ========== Float sampler ========== + + @Test + public void testFloatDecimalData() { + // 2-decimal-place data should pick exponent=2, factor=0 + float[] data = new float[5000]; + for (int i = 0; i < data.length; i++) { + data[i] = i * 0.01f; + } + AlpSampler.FloatSampler sampler = new AlpSampler.FloatSampler(); + sampler.addSample(data, data.length); + AlpCompression.AlpEncodingPreset preset = sampler.finalizeSampling(); + + assertNotNull(preset.combinations); + assertTrue(preset.combinations.length >= 1); + assertTrue(preset.combinations.length <= AlpConstants.MAX_COMBINATIONS); + } + + @Test + public void testFloatIntegerData() { + float[] data = new float[5000]; + for (int i = 0; i < data.length; i++) { + data[i] = i; + } + AlpSampler.FloatSampler sampler = new AlpSampler.FloatSampler(); + sampler.addSample(data, data.length); + AlpCompression.AlpEncodingPreset preset = sampler.finalizeSampling(); + + assertNotNull(preset.combinations); + assertTrue(preset.combinations.length >= 1); + // For integer data, any (e,e) combo works since multiplier = 10^e/10^e = 1. + // Tiebreaker prefers bigger exponents, matching C++ behavior. + int bestE = preset.combinations[0][0]; + int bestF = preset.combinations[0][1]; + assertEquals("Integer data: exponent should equal factor", bestE, bestF); + } + + @Test + public void testFloatPresetProducesValidRoundTrip() { + float[] data = new float[5000]; + Random rng = new Random(42); + for (int i = 0; i < data.length; i++) { + data[i] = Math.round(rng.nextFloat() * 10000) / 100.0f; + } + + AlpSampler.FloatSampler sampler = new AlpSampler.FloatSampler(); + sampler.addSample(data, data.length); + AlpCompression.AlpEncodingPreset preset = sampler.finalizeSampling(); + + // Compress and decompress a vector using the preset + int vectorSize = Math.min(1024, data.length); + float[] vector = new float[vectorSize]; + System.arraycopy(data, 0, vector, 0, vectorSize); + + AlpCompression.FloatCompressedVector cv = + AlpCompression.compressFloatVector(vector, vectorSize, preset); + float[] output = new float[vectorSize]; + AlpCompression.decompressFloatVector(cv, output); + + for (int i = 0; i < vectorSize; i++) { + assertEquals( + "Mismatch at " + i, + Float.floatToRawIntBits(vector[i]), + Float.floatToRawIntBits(output[i])); + } + } + + @Test + public void testFloatSmallDataset() { + // Fewer values than SAMPLER_VECTOR_SIZE + float[] data = {1.1f, 2.2f, 3.3f, 4.4f, 5.5f}; + AlpSampler.FloatSampler sampler = new AlpSampler.FloatSampler(); + sampler.addSample(data, data.length); + AlpCompression.AlpEncodingPreset preset = sampler.finalizeSampling(); + + assertNotNull(preset.combinations); + assertTrue(preset.combinations.length >= 1); + } + + @Test + public void testFloatMultipleSamples() { + AlpSampler.FloatSampler sampler = new AlpSampler.FloatSampler(); + for (int batch = 0; batch < 10; batch++) { + float[] data = new float[1000]; + for (int i = 0; i < 1000; i++) { + data[i] = (batch * 1000 + i) * 0.1f; + } + sampler.addSample(data, data.length); + } + AlpCompression.AlpEncodingPreset preset = sampler.finalizeSampling(); + assertNotNull(preset.combinations); + assertTrue(preset.combinations.length >= 1); + } + + // ========== Double sampler ========== + + @Test + public void testDoubleDecimalData() { + double[] data = new double[5000]; + for (int i = 0; i < data.length; i++) { + data[i] = i * 0.01; + } + AlpSampler.DoubleSampler sampler = new AlpSampler.DoubleSampler(); + sampler.addSample(data, data.length); + AlpCompression.AlpEncodingPreset preset = sampler.finalizeSampling(); + + assertNotNull(preset.combinations); + assertTrue(preset.combinations.length >= 1); + assertTrue(preset.combinations.length <= AlpConstants.MAX_COMBINATIONS); + } + + @Test + public void testDoubleIntegerData() { + double[] data = new double[5000]; + for (int i = 0; i < data.length; i++) { + data[i] = i; + } + AlpSampler.DoubleSampler sampler = new AlpSampler.DoubleSampler(); + sampler.addSample(data, data.length); + AlpCompression.AlpEncodingPreset preset = sampler.finalizeSampling(); + + assertNotNull(preset.combinations); + assertTrue(preset.combinations.length >= 1); + int bestE = preset.combinations[0][0]; + int bestF = preset.combinations[0][1]; + assertEquals("Integer data: exponent should equal factor", bestE, bestF); + } + + @Test + public void testDoublePresetProducesValidRoundTrip() { + double[] data = new double[5000]; + Random rng = new Random(42); + for (int i = 0; i < data.length; i++) { + data[i] = Math.round(rng.nextDouble() * 10000) / 100.0; + } + + AlpSampler.DoubleSampler sampler = new AlpSampler.DoubleSampler(); + sampler.addSample(data, data.length); + AlpCompression.AlpEncodingPreset preset = sampler.finalizeSampling(); + + int vectorSize = Math.min(1024, data.length); + double[] vector = new double[vectorSize]; + System.arraycopy(data, 0, vector, 0, vectorSize); + + AlpCompression.DoubleCompressedVector cv = + AlpCompression.compressDoubleVector(vector, vectorSize, preset); + double[] output = new double[vectorSize]; + AlpCompression.decompressDoubleVector(cv, output); + + for (int i = 0; i < vectorSize; i++) { + assertEquals( + "Mismatch at " + i, + Double.doubleToRawLongBits(vector[i]), + Double.doubleToRawLongBits(output[i])); + } + } + + @Test + public void testDoubleSmallDataset() { + double[] data = {1.1, 2.2, 3.3, 4.4, 5.5}; + AlpSampler.DoubleSampler sampler = new AlpSampler.DoubleSampler(); + sampler.addSample(data, data.length); + AlpCompression.AlpEncodingPreset preset = sampler.finalizeSampling(); + + assertNotNull(preset.combinations); + assertTrue(preset.combinations.length >= 1); + } +} From 60607edd92985001d51a32e4e56d69154a0f22ef Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Sat, 7 Mar 2026 18:36:24 +0000 Subject: [PATCH 05/24] ALP: Add AlpWrapper for page-level encode/decode with 7-byte header Public API for encoding/decoding full ALP pages. Layout: [Header(7B)][Offsets...][Vector0][Vector1]... Header: [compression_mode(1B)][integer_encoding(1B)][log_vector_size(1B)][num_elements(4B)] Supports sampling presets, max size estimation, and multi-vector pages. --- .../parquet/column/values/alp/AlpWrapper.java | 333 ++++++++++++++++++ .../column/values/alp/AlpWrapperTest.java | 233 ++++++++++++ 2 files changed, 566 insertions(+) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpWrapper.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpWrapperTest.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpWrapper.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpWrapper.java new file mode 100644 index 0000000000..d66861e017 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpWrapper.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import org.apache.parquet.Preconditions; + +/** + * Top-level API for ALP page-level encoding and decoding. + * + *

Page layout (offset-based interleaved, matching C++ AlpWrapper): + *

+ * [Header(7B)][Offset0..OffsetN-1][Vector0][Vector1]...[VectorN-1]
+ * 
+ * where each Vector = [AlpInfo(4B)][ForInfo(5B/9B)][PackedValues][ExcPositions][ExcValues] + * + *

Header format (7 bytes, little-endian): + *

+ * [compression_mode(1B)][integer_encoding(1B)][log_vector_size(1B)][num_elements(4B LE)]
+ * 
+ */ +public final class AlpWrapper { + + private AlpWrapper() {} + + // ========== Sampling presets ========== + + /** Create a sampling-based encoding preset for float data. */ + public static AlpCompression.AlpEncodingPreset createFloatSamplingPreset( + float[] data, int count) { + AlpSampler.FloatSampler sampler = new AlpSampler.FloatSampler(); + sampler.addSample(data, count); + return sampler.finalizeSampling(); + } + + /** Create a sampling-based encoding preset for double data. */ + public static AlpCompression.AlpEncodingPreset createDoubleSamplingPreset( + double[] data, int count) { + AlpSampler.DoubleSampler sampler = new AlpSampler.DoubleSampler(); + sampler.addSample(data, count); + return sampler.finalizeSampling(); + } + + // ========== Encode floats ========== + + /** + * Encode float data into ALP compressed page format. + * + * @param input the float values to encode + * @param count number of values + * @param output output byte array (must be at least maxCompressedSizeFloat(count) bytes) + * @param preset the encoding preset from sampling + * @return the number of compressed bytes written + */ + public static int encodeFloats( + float[] input, int count, byte[] output, AlpCompression.AlpEncodingPreset preset) { + Preconditions.checkArgument(count >= 0, "count must be non-negative, got: %s", count); + if (count == 0) { + writeHeader(output, 0, COMPRESSION_MODE_ALP, INTEGER_ENCODING_FOR, + DEFAULT_VECTOR_SIZE_LOG, 0); + return HEADER_SIZE; + } + + int vectorSize = DEFAULT_VECTOR_SIZE; + int numVectors = (count + vectorSize - 1) / vectorSize; + + // Phase 1: Compress all vectors + List vectors = new ArrayList<>(numVectors); + for (int i = 0; i < numVectors; i++) { + int offset = i * vectorSize; + int elementsInVector = Math.min(vectorSize, count - offset); + float[] vectorInput = new float[elementsInVector]; + System.arraycopy(input, offset, vectorInput, 0, elementsInVector); + vectors.add(AlpCompression.compressFloatVector(vectorInput, elementsInVector, preset)); + } + + // Phase 2: Calculate offsets + int offsetsSectionSize = numVectors * OFFSET_SIZE; + int[] vectorOffsets = new int[numVectors]; + int currentOffset = offsetsSectionSize; + for (int i = 0; i < numVectors; i++) { + vectorOffsets[i] = currentOffset; + currentOffset += ALP_INFO_SIZE + FLOAT_FOR_INFO_SIZE + vectors.get(i).dataStoredSize(); + } + int bodySize = currentOffset; + int totalSize = HEADER_SIZE + bodySize; + + // Phase 3: Write header + writeHeader(output, 0, COMPRESSION_MODE_ALP, INTEGER_ENCODING_FOR, + DEFAULT_VECTOR_SIZE_LOG, count); + + // Phase 4: Write offsets + ByteBuffer buf = ByteBuffer.wrap(output, HEADER_SIZE, bodySize).order(ByteOrder.LITTLE_ENDIAN); + for (int offset : vectorOffsets) { + buf.putInt(offset); + } + + // Phase 5: Write interleaved vectors + for (int i = 0; i < numVectors; i++) { + AlpCompression.FloatCompressedVector v = vectors.get(i); + int pos = HEADER_SIZE + vectorOffsets[i]; + v.store(output, pos); + } + + return totalSize; + } + + // ========== Encode doubles ========== + + public static int encodeDoubles( + double[] input, int count, byte[] output, AlpCompression.AlpEncodingPreset preset) { + Preconditions.checkArgument(count >= 0, "count must be non-negative, got: %s", count); + if (count == 0) { + writeHeader(output, 0, COMPRESSION_MODE_ALP, INTEGER_ENCODING_FOR, + DEFAULT_VECTOR_SIZE_LOG, 0); + return HEADER_SIZE; + } + + int vectorSize = DEFAULT_VECTOR_SIZE; + int numVectors = (count + vectorSize - 1) / vectorSize; + + List vectors = new ArrayList<>(numVectors); + for (int i = 0; i < numVectors; i++) { + int offset = i * vectorSize; + int elementsInVector = Math.min(vectorSize, count - offset); + double[] vectorInput = new double[elementsInVector]; + System.arraycopy(input, offset, vectorInput, 0, elementsInVector); + vectors.add( + AlpCompression.compressDoubleVector(vectorInput, elementsInVector, preset)); + } + + int offsetsSectionSize = numVectors * OFFSET_SIZE; + int[] vectorOffsets = new int[numVectors]; + int currentOffset = offsetsSectionSize; + for (int i = 0; i < numVectors; i++) { + vectorOffsets[i] = currentOffset; + currentOffset += + ALP_INFO_SIZE + DOUBLE_FOR_INFO_SIZE + vectors.get(i).dataStoredSize(); + } + int bodySize = currentOffset; + int totalSize = HEADER_SIZE + bodySize; + + writeHeader(output, 0, COMPRESSION_MODE_ALP, INTEGER_ENCODING_FOR, + DEFAULT_VECTOR_SIZE_LOG, count); + + ByteBuffer buf = ByteBuffer.wrap(output, HEADER_SIZE, bodySize).order(ByteOrder.LITTLE_ENDIAN); + for (int offset : vectorOffsets) { + buf.putInt(offset); + } + + for (int i = 0; i < numVectors; i++) { + AlpCompression.DoubleCompressedVector v = vectors.get(i); + int pos = HEADER_SIZE + vectorOffsets[i]; + v.store(output, pos); + } + + return totalSize; + } + + // ========== Decode floats ========== + + /** + * Decode ALP compressed page to float values. + * + * @param compressed the compressed page bytes + * @param compSize number of compressed bytes + * @param output output float array (must hold numElements values) + * @param numElements number of elements to decode + */ + public static void decodeFloats( + byte[] compressed, int compSize, float[] output, int numElements) { + Preconditions.checkArgument(compSize >= HEADER_SIZE, + "compressed size too small for header: %s", compSize); + + ByteBuffer header = ByteBuffer.wrap(compressed, 0, HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + int compressionMode = header.get() & 0xFF; + int integerEncoding = header.get() & 0xFF; + int logVectorSize = header.get() & 0xFF; + int storedNumElements = header.getInt(); + + Preconditions.checkArgument(compressionMode == COMPRESSION_MODE_ALP, + "unsupported compression mode: %s", compressionMode); + Preconditions.checkArgument(integerEncoding == INTEGER_ENCODING_FOR, + "unsupported integer encoding: %s", integerEncoding); + + int vectorSize = 1 << logVectorSize; + int numVectors = (storedNumElements + vectorSize - 1) / vectorSize; + + if (numVectors == 0) return; + + // Read offsets + ByteBuffer body = ByteBuffer.wrap(compressed, HEADER_SIZE, compSize - HEADER_SIZE) + .order(ByteOrder.LITTLE_ENDIAN); + int[] vectorOffsets = new int[numVectors]; + for (int i = 0; i < numVectors; i++) { + vectorOffsets[i] = body.getInt(); + } + + // Decode each vector + int outputOffset = 0; + for (int vi = 0; vi < numVectors; vi++) { + int elementsInVector; + if (vi < storedNumElements / vectorSize) { + elementsInVector = vectorSize; + } else { + elementsInVector = storedNumElements % vectorSize; + if (elementsInVector == 0) elementsInVector = vectorSize; + } + + int vectorPos = HEADER_SIZE + vectorOffsets[vi]; + AlpCompression.FloatCompressedVector cv = + AlpCompression.FloatCompressedVector.load(compressed, vectorPos, elementsInVector); + + float[] vectorOutput = new float[elementsInVector]; + AlpCompression.decompressFloatVector(cv, vectorOutput); + System.arraycopy(vectorOutput, 0, output, outputOffset, + Math.min(elementsInVector, numElements - outputOffset)); + outputOffset += elementsInVector; + } + } + + // ========== Decode doubles ========== + + public static void decodeDoubles( + byte[] compressed, int compSize, double[] output, int numElements) { + Preconditions.checkArgument(compSize >= HEADER_SIZE, + "compressed size too small for header: %s", compSize); + + ByteBuffer header = ByteBuffer.wrap(compressed, 0, HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + int compressionMode = header.get() & 0xFF; + int integerEncoding = header.get() & 0xFF; + int logVectorSize = header.get() & 0xFF; + int storedNumElements = header.getInt(); + + Preconditions.checkArgument(compressionMode == COMPRESSION_MODE_ALP, + "unsupported compression mode: %s", compressionMode); + Preconditions.checkArgument(integerEncoding == INTEGER_ENCODING_FOR, + "unsupported integer encoding: %s", integerEncoding); + + int vectorSize = 1 << logVectorSize; + int numVectors = (storedNumElements + vectorSize - 1) / vectorSize; + + if (numVectors == 0) return; + + ByteBuffer body = ByteBuffer.wrap(compressed, HEADER_SIZE, compSize - HEADER_SIZE) + .order(ByteOrder.LITTLE_ENDIAN); + int[] vectorOffsets = new int[numVectors]; + for (int i = 0; i < numVectors; i++) { + vectorOffsets[i] = body.getInt(); + } + + int outputOffset = 0; + for (int vi = 0; vi < numVectors; vi++) { + int elementsInVector; + if (vi < storedNumElements / vectorSize) { + elementsInVector = vectorSize; + } else { + elementsInVector = storedNumElements % vectorSize; + if (elementsInVector == 0) elementsInVector = vectorSize; + } + + int vectorPos = HEADER_SIZE + vectorOffsets[vi]; + AlpCompression.DoubleCompressedVector cv = + AlpCompression.DoubleCompressedVector.load(compressed, vectorPos, elementsInVector); + + double[] vectorOutput = new double[elementsInVector]; + AlpCompression.decompressDoubleVector(cv, vectorOutput); + System.arraycopy(vectorOutput, 0, output, outputOffset, + Math.min(elementsInVector, numElements - outputOffset)); + outputOffset += elementsInVector; + } + } + + // ========== Max compressed size ========== + + /** Maximum compressed size for float data of given element count. */ + public static long maxCompressedSizeFloat(int numElements) { + long size = HEADER_SIZE; + long numVectors = (numElements + DEFAULT_VECTOR_SIZE - 1) / DEFAULT_VECTOR_SIZE; + size += numVectors * OFFSET_SIZE; + size += numVectors * (ALP_INFO_SIZE + FLOAT_FOR_INFO_SIZE); + // Worst case: all values bit-packed at full width + all exceptions + size += (long) numElements * Float.BYTES; // packed values worst case + size += (long) numElements * Float.BYTES; // exception values + size += (long) numElements * POSITION_SIZE; // exception positions + return size; + } + + /** Maximum compressed size for double data of given element count. */ + public static long maxCompressedSizeDouble(int numElements) { + long size = HEADER_SIZE; + long numVectors = (numElements + DEFAULT_VECTOR_SIZE - 1) / DEFAULT_VECTOR_SIZE; + size += numVectors * OFFSET_SIZE; + size += numVectors * (ALP_INFO_SIZE + DOUBLE_FOR_INFO_SIZE); + size += (long) numElements * Double.BYTES; + size += (long) numElements * Double.BYTES; + size += (long) numElements * POSITION_SIZE; + return size; + } + + // ========== Header helpers ========== + + private static void writeHeader( + byte[] output, int offset, int compressionMode, int integerEncoding, + int logVectorSize, int numElements) { + ByteBuffer buf = ByteBuffer.wrap(output, offset, HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + buf.put((byte) compressionMode); + buf.put((byte) integerEncoding); + buf.put((byte) logVectorSize); + buf.putInt(numElements); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpWrapperTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpWrapperTest.java new file mode 100644 index 0000000000..d3132c8df7 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpWrapperTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Random; +import org.junit.Test; + +public class AlpWrapperTest { + + // ========== Float round-trip ========== + + private static void assertFloatPageRoundTrip(float[] input) { + AlpCompression.AlpEncodingPreset preset = + AlpWrapper.createFloatSamplingPreset(input, input.length); + byte[] compressed = new byte[(int) AlpWrapper.maxCompressedSizeFloat(input.length)]; + int compSize = AlpWrapper.encodeFloats(input, input.length, compressed, preset); + assertTrue(compSize > 0); + assertTrue(compSize <= compressed.length); + + float[] output = new float[input.length]; + AlpWrapper.decodeFloats(compressed, compSize, output, input.length); + + for (int i = 0; i < input.length; i++) { + assertEquals( + "Mismatch at index " + i, + Float.floatToRawIntBits(input[i]), + Float.floatToRawIntBits(output[i])); + } + } + + @Test + public void testFloatSingleVector() { + float[] input = new float[100]; + for (int i = 0; i < 100; i++) { + input[i] = i * 0.1f; + } + assertFloatPageRoundTrip(input); + } + + @Test + public void testFloatMultipleVectors() { + // 2500 values = 2 full vectors (1024) + 1 partial (452) + float[] input = new float[2500]; + for (int i = 0; i < 2500; i++) { + input[i] = i * 0.01f; + } + assertFloatPageRoundTrip(input); + } + + @Test + public void testFloatExactVectorSize() { + float[] input = new float[AlpConstants.DEFAULT_VECTOR_SIZE]; + for (int i = 0; i < input.length; i++) { + input[i] = i * 0.5f; + } + assertFloatPageRoundTrip(input); + } + + @Test + public void testFloatExactTwoVectors() { + float[] input = new float[2 * AlpConstants.DEFAULT_VECTOR_SIZE]; + for (int i = 0; i < input.length; i++) { + input[i] = (i % 100) * 0.3f; + } + assertFloatPageRoundTrip(input); + } + + @Test + public void testFloatSpecialValues() { + float[] input = new float[20]; + for (int i = 0; i < 20; i++) { + input[i] = i * 1.5f; + } + input[3] = Float.NaN; + input[7] = Float.POSITIVE_INFINITY; + input[11] = Float.NEGATIVE_INFINITY; + input[15] = -0.0f; + assertFloatPageRoundTrip(input); + } + + @Test + public void testFloatEmptyInput() { + byte[] compressed = new byte[AlpConstants.HEADER_SIZE]; + int compSize = AlpWrapper.encodeFloats( + new float[0], 0, compressed, + new AlpCompression.AlpEncodingPreset(new int[][] {{0, 0}})); + assertEquals(AlpConstants.HEADER_SIZE, compSize); + } + + @Test + public void testFloatRandomLargeDataset() { + Random rng = new Random(42); + float[] input = new float[5000]; + for (int i = 0; i < 5000; i++) { + input[i] = Math.round(rng.nextFloat() * 10000) / 100.0f; + } + assertFloatPageRoundTrip(input); + } + + // ========== Double round-trip ========== + + private static void assertDoublePageRoundTrip(double[] input) { + AlpCompression.AlpEncodingPreset preset = + AlpWrapper.createDoubleSamplingPreset(input, input.length); + byte[] compressed = new byte[(int) AlpWrapper.maxCompressedSizeDouble(input.length)]; + int compSize = AlpWrapper.encodeDoubles(input, input.length, compressed, preset); + assertTrue(compSize > 0); + + double[] output = new double[input.length]; + AlpWrapper.decodeDoubles(compressed, compSize, output, input.length); + + for (int i = 0; i < input.length; i++) { + assertEquals( + "Mismatch at index " + i, + Double.doubleToRawLongBits(input[i]), + Double.doubleToRawLongBits(output[i])); + } + } + + @Test + public void testDoubleSingleVector() { + double[] input = new double[100]; + for (int i = 0; i < 100; i++) { + input[i] = i * 0.1; + } + assertDoublePageRoundTrip(input); + } + + @Test + public void testDoubleMultipleVectors() { + double[] input = new double[2500]; + for (int i = 0; i < 2500; i++) { + input[i] = i * 0.01; + } + assertDoublePageRoundTrip(input); + } + + @Test + public void testDoubleSpecialValues() { + double[] input = new double[20]; + for (int i = 0; i < 20; i++) { + input[i] = i * 1.5; + } + input[3] = Double.NaN; + input[7] = Double.POSITIVE_INFINITY; + input[11] = Double.NEGATIVE_INFINITY; + input[15] = -0.0; + assertDoublePageRoundTrip(input); + } + + @Test + public void testDoubleRandomLargeDataset() { + Random rng = new Random(42); + double[] input = new double[5000]; + for (int i = 0; i < 5000; i++) { + input[i] = Math.round(rng.nextDouble() * 10000) / 100.0; + } + assertDoublePageRoundTrip(input); + } + + // ========== Wire format verification ========== + + @Test + public void testHeaderFormat() { + float[] input = {1.0f, 2.0f, 3.0f}; + AlpCompression.AlpEncodingPreset preset = + AlpWrapper.createFloatSamplingPreset(input, input.length); + byte[] compressed = new byte[(int) AlpWrapper.maxCompressedSizeFloat(input.length)]; + int compSize = AlpWrapper.encodeFloats(input, input.length, compressed, preset); + + // Verify 7-byte header + ByteBuffer header = + ByteBuffer.wrap(compressed, 0, AlpConstants.HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + assertEquals(AlpConstants.COMPRESSION_MODE_ALP, header.get() & 0xFF); + assertEquals(AlpConstants.INTEGER_ENCODING_FOR, header.get() & 0xFF); + assertEquals(AlpConstants.DEFAULT_VECTOR_SIZE_LOG, header.get() & 0xFF); + assertEquals(3, header.getInt()); // num_elements + } + + @Test + public void testOffsetLayout() { + // 2048 elements = 2 vectors + float[] input = new float[2048]; + for (int i = 0; i < 2048; i++) { + input[i] = i * 0.5f; + } + AlpCompression.AlpEncodingPreset preset = + AlpWrapper.createFloatSamplingPreset(input, input.length); + byte[] compressed = new byte[(int) AlpWrapper.maxCompressedSizeFloat(input.length)]; + AlpWrapper.encodeFloats(input, input.length, compressed, preset); + + // After header (7B), offsets section should have 2 int offsets (8 bytes) + ByteBuffer body = ByteBuffer.wrap(compressed, AlpConstants.HEADER_SIZE, + compressed.length - AlpConstants.HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + int offset0 = body.getInt(); + int offset1 = body.getInt(); + + // First vector starts right after offsets (2 * 4 = 8) + assertEquals(8, offset0); + // Second vector starts after first vector's data + assertTrue(offset1 > offset0); + } + + // ========== Max compressed size ========== + + @Test + public void testMaxCompressedSize() { + assertTrue(AlpWrapper.maxCompressedSizeFloat(0) >= AlpConstants.HEADER_SIZE); + assertTrue(AlpWrapper.maxCompressedSizeFloat(1024) > AlpConstants.HEADER_SIZE); + assertTrue(AlpWrapper.maxCompressedSizeDouble(0) >= AlpConstants.HEADER_SIZE); + assertTrue(AlpWrapper.maxCompressedSizeDouble(1024) > AlpConstants.HEADER_SIZE); + } +} From a2c3992c3c209e7663adee9840bc09729f2c70dc Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Sat, 7 Mar 2026 18:57:40 +0000 Subject: [PATCH 06/24] ALP: Add incremental AlpValuesWriter for float and double columns Extends ValuesWriter with FloatAlpValuesWriter and DoubleAlpValuesWriter. Buffers values into 1024-element vectors, compresses each when full, and assembles the ALP page format on getBytes(). Sampling is done on the first batch of values to generate the encoding preset. Uses Encoding.PLAIN as placeholder until ALP is added to parquet-format. --- .../column/values/alp/AlpValuesWriter.java | 363 ++++++++++++++++++ .../values/alp/AlpValuesWriterTest.java | 278 ++++++++++++++ 2 files changed, 641 insertions(+) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesWriter.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesWriterTest.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesWriter.java new file mode 100644 index 0000000000..e4e57a4d09 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesWriter.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.util.ArrayList; +import java.util.List; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.ValuesWriter; + +/** + * Incremental ALP values writer for float and double columns. + * + *

Buffers values into fixed-size vectors (default 1024). When a vector is full, + * it is compressed via {@link AlpCompression} and stored. On {@link #getBytes()}, + * assembles the ALP page: [Header(7B)][Offsets...][Vector0][Vector1]... + * + *

Sampling: the first vector's data is used to create an encoding preset via + * {@link AlpSampler}. The preset is cached for subsequent vectors. + */ +public abstract class AlpValuesWriter extends ValuesWriter { + + protected final int vectorSize; + protected int bufferedCount; // values in current partial vector + protected int totalCount; // total values written + protected final List encodedVectors = new ArrayList<>(); + protected final List encodedVectorSizes = new ArrayList<>(); + protected AlpCompression.AlpEncodingPreset preset; + protected boolean presetReady; + + protected AlpValuesWriter(int vectorSize) { + this.vectorSize = AlpConstants.validateVectorSize(vectorSize); + } + + protected AlpValuesWriter() { + this(DEFAULT_VECTOR_SIZE); + } + + // TODO: Replace with Encoding.ALP once ALP is added to parquet-format Thrift spec. + @Override + public Encoding getEncoding() { + return Encoding.PLAIN; + } + + @Override + public void reset() { + bufferedCount = 0; + totalCount = 0; + encodedVectors.clear(); + encodedVectorSizes.clear(); + preset = null; + presetReady = false; + resetVectorBuffer(); + } + + protected abstract void resetVectorBuffer(); + + @Override + public void close() { + reset(); + } + + // ========== FloatAlpValuesWriter ========== + + public static class FloatAlpValuesWriter extends AlpValuesWriter { + private float[] vectorBuffer; + private float[] samplerBuffer; + private int samplerCount; + + public FloatAlpValuesWriter(int vectorSize) { + super(vectorSize); + this.vectorBuffer = new float[this.vectorSize]; + this.samplerBuffer = new float[SAMPLER_ROWGROUP_SIZE]; + } + + public FloatAlpValuesWriter() { + this(DEFAULT_VECTOR_SIZE); + } + + @Override + public void writeFloat(float v) { + // Collect for sampling if preset not ready + if (!presetReady && samplerCount < samplerBuffer.length) { + samplerBuffer[samplerCount++] = v; + } + + vectorBuffer[bufferedCount++] = v; + totalCount++; + + if (bufferedCount == vectorSize) { + ensurePreset(); + flushVector(); + } + } + + private void ensurePreset() { + if (!presetReady) { + AlpSampler.FloatSampler sampler = new AlpSampler.FloatSampler(); + sampler.addSample(samplerBuffer, samplerCount); + preset = sampler.finalizeSampling(); + presetReady = true; + samplerBuffer = null; // free sampling buffer + } + } + + private void flushVector() { + AlpCompression.FloatCompressedVector cv = + AlpCompression.compressFloatVector(vectorBuffer, bufferedCount, preset); + int size = cv.storedSize(); + byte[] encoded = new byte[size]; + cv.store(encoded, 0); + encodedVectors.add(encoded); + encodedVectorSizes.add(size); + bufferedCount = 0; + } + + @Override + public BytesInput getBytes() { + // Flush any partial vector + if (bufferedCount > 0) { + ensurePreset(); + flushVector(); + } + + if (totalCount == 0) { + byte[] header = new byte[HEADER_SIZE]; + writeAlpHeader(header, vectorSize, 0); + return BytesInput.from(header); + } + + // Calculate layout + int numVectors = encodedVectors.size(); + int offsetsSectionSize = numVectors * OFFSET_SIZE; + int[] offsets = new int[numVectors]; + int currentOffset = offsetsSectionSize; + for (int i = 0; i < numVectors; i++) { + offsets[i] = currentOffset; + currentOffset += encodedVectorSizes.get(i); + } + + // Assemble page + int bodySize = currentOffset; + byte[] page = new byte[HEADER_SIZE + bodySize]; + writeAlpHeader(page, vectorSize, totalCount); + + // Write offsets + int pos = HEADER_SIZE; + for (int offset : offsets) { + writeLittleEndianInt(page, pos, offset); + pos += OFFSET_SIZE; + } + + // Write vectors + for (byte[] vec : encodedVectors) { + System.arraycopy(vec, 0, page, pos, vec.length); + pos += vec.length; + } + + return BytesInput.from(page); + } + + @Override + public long getBufferedSize() { + long size = HEADER_SIZE; + for (int s : encodedVectorSizes) { + size += OFFSET_SIZE + s; + } + size += (long) bufferedCount * Float.BYTES; + return size; + } + + @Override + public long getAllocatedSize() { + long size = (vectorBuffer != null ? (long) vectorBuffer.length * Float.BYTES : 0); + if (samplerBuffer != null) { + size += (long) samplerBuffer.length * Float.BYTES; + } + for (byte[] vec : encodedVectors) { + size += vec.length; + } + return size; + } + + @Override + public String memUsageString(String prefix) { + return String.format( + "%s ALPFloatWriter: %d values, %d vectors, %d bytes allocated", + prefix, totalCount, encodedVectors.size(), getAllocatedSize()); + } + + @Override + protected void resetVectorBuffer() { + vectorBuffer = new float[vectorSize]; + samplerBuffer = new float[SAMPLER_ROWGROUP_SIZE]; + samplerCount = 0; + } + } + + // ========== DoubleAlpValuesWriter ========== + + public static class DoubleAlpValuesWriter extends AlpValuesWriter { + private double[] vectorBuffer; + private double[] samplerBuffer; + private int samplerCount; + + public DoubleAlpValuesWriter(int vectorSize) { + super(vectorSize); + this.vectorBuffer = new double[this.vectorSize]; + this.samplerBuffer = new double[SAMPLER_ROWGROUP_SIZE]; + } + + public DoubleAlpValuesWriter() { + this(DEFAULT_VECTOR_SIZE); + } + + @Override + public void writeDouble(double v) { + if (!presetReady && samplerCount < samplerBuffer.length) { + samplerBuffer[samplerCount++] = v; + } + + vectorBuffer[bufferedCount++] = v; + totalCount++; + + if (bufferedCount == vectorSize) { + ensurePreset(); + flushVector(); + } + } + + private void ensurePreset() { + if (!presetReady) { + AlpSampler.DoubleSampler sampler = new AlpSampler.DoubleSampler(); + sampler.addSample(samplerBuffer, samplerCount); + preset = sampler.finalizeSampling(); + presetReady = true; + samplerBuffer = null; + } + } + + private void flushVector() { + AlpCompression.DoubleCompressedVector cv = + AlpCompression.compressDoubleVector(vectorBuffer, bufferedCount, preset); + int size = cv.storedSize(); + byte[] encoded = new byte[size]; + cv.store(encoded, 0); + encodedVectors.add(encoded); + encodedVectorSizes.add(size); + bufferedCount = 0; + } + + @Override + public BytesInput getBytes() { + if (bufferedCount > 0) { + ensurePreset(); + flushVector(); + } + + if (totalCount == 0) { + byte[] header = new byte[HEADER_SIZE]; + writeAlpHeader(header, vectorSize, 0); + return BytesInput.from(header); + } + + int numVectors = encodedVectors.size(); + int offsetsSectionSize = numVectors * OFFSET_SIZE; + int[] offsets = new int[numVectors]; + int currentOffset = offsetsSectionSize; + for (int i = 0; i < numVectors; i++) { + offsets[i] = currentOffset; + currentOffset += encodedVectorSizes.get(i); + } + + int bodySize = currentOffset; + byte[] page = new byte[HEADER_SIZE + bodySize]; + writeAlpHeader(page, vectorSize, totalCount); + + int pos = HEADER_SIZE; + for (int offset : offsets) { + writeLittleEndianInt(page, pos, offset); + pos += OFFSET_SIZE; + } + + for (byte[] vec : encodedVectors) { + System.arraycopy(vec, 0, page, pos, vec.length); + pos += vec.length; + } + + return BytesInput.from(page); + } + + @Override + public long getBufferedSize() { + long size = HEADER_SIZE; + for (int s : encodedVectorSizes) { + size += OFFSET_SIZE + s; + } + size += (long) bufferedCount * Double.BYTES; + return size; + } + + @Override + public long getAllocatedSize() { + long size = (vectorBuffer != null ? (long) vectorBuffer.length * Double.BYTES : 0); + if (samplerBuffer != null) { + size += (long) samplerBuffer.length * Double.BYTES; + } + for (byte[] vec : encodedVectors) { + size += vec.length; + } + return size; + } + + @Override + public String memUsageString(String prefix) { + return String.format( + "%s ALPDoubleWriter: %d values, %d vectors, %d bytes allocated", + prefix, totalCount, encodedVectors.size(), getAllocatedSize()); + } + + @Override + protected void resetVectorBuffer() { + vectorBuffer = new double[vectorSize]; + samplerBuffer = new double[SAMPLER_ROWGROUP_SIZE]; + samplerCount = 0; + } + } + + // ========== Header helpers ========== + + static void writeAlpHeader(byte[] output, int vectorSize, int numElements) { + int logVs = Integer.numberOfTrailingZeros(vectorSize); + output[0] = (byte) COMPRESSION_MODE_ALP; + output[1] = (byte) INTEGER_ENCODING_FOR; + output[2] = (byte) logVs; + writeLittleEndianInt(output, 3, numElements); + } + + static void writeLittleEndianInt(byte[] output, int pos, int value) { + output[pos] = (byte) value; + output[pos + 1] = (byte) (value >> 8); + output[pos + 2] = (byte) (value >> 16); + output[pos + 3] = (byte) (value >> 24); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesWriterTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesWriterTest.java new file mode 100644 index 0000000000..e8f8712dd1 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesWriterTest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Random; +import org.apache.parquet.bytes.BytesInput; +import org.junit.Test; + +public class AlpValuesWriterTest { + + // ========== Float writer ========== + + @Test + public void testFloatWriterRoundTrip() throws IOException { + float[] values = new float[100]; + for (int i = 0; i < 100; i++) { + values[i] = i * 0.1f; + } + + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput bytes = writer.getBytes(); + byte[] compressed = bytes.toByteArray(); + assertTrue(compressed.length >= AlpConstants.HEADER_SIZE); + + float[] output = new float[values.length]; + AlpWrapper.decodeFloats(compressed, compressed.length, output, values.length); + + for (int i = 0; i < values.length; i++) { + assertEquals( + "Mismatch at " + i, + Float.floatToRawIntBits(values[i]), + Float.floatToRawIntBits(output[i])); + } + } + + @Test + public void testFloatWriterMultipleVectors() throws IOException { + // 2500 values = 2 full vectors + 1 partial + float[] values = new float[2500]; + Random rng = new Random(42); + for (int i = 0; i < 2500; i++) { + values[i] = Math.round(rng.nextFloat() * 10000) / 100.0f; + } + + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + for (float v : values) { + writer.writeFloat(v); + } + + byte[] compressed = writer.getBytes().toByteArray(); + + float[] output = new float[values.length]; + AlpWrapper.decodeFloats(compressed, compressed.length, output, values.length); + + for (int i = 0; i < values.length; i++) { + assertEquals( + "Mismatch at " + i, + Float.floatToRawIntBits(values[i]), + Float.floatToRawIntBits(output[i])); + } + } + + @Test + public void testFloatWriterExactVectorSize() throws IOException { + float[] values = new float[AlpConstants.DEFAULT_VECTOR_SIZE]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.5f; + } + + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + for (float v : values) { + writer.writeFloat(v); + } + + byte[] compressed = writer.getBytes().toByteArray(); + + float[] output = new float[values.length]; + AlpWrapper.decodeFloats(compressed, compressed.length, output, values.length); + + for (int i = 0; i < values.length; i++) { + assertEquals( + Float.floatToRawIntBits(values[i]), + Float.floatToRawIntBits(output[i])); + } + } + + @Test + public void testFloatWriterSpecialValues() throws IOException { + float[] values = {1.0f, Float.NaN, 2.0f, Float.POSITIVE_INFINITY, + 3.0f, Float.NEGATIVE_INFINITY, 4.0f, -0.0f, 5.0f}; + + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + for (float v : values) { + writer.writeFloat(v); + } + + byte[] compressed = writer.getBytes().toByteArray(); + + float[] output = new float[values.length]; + AlpWrapper.decodeFloats(compressed, compressed.length, output, values.length); + + for (int i = 0; i < values.length; i++) { + assertEquals( + "Mismatch at " + i, + Float.floatToRawIntBits(values[i]), + Float.floatToRawIntBits(output[i])); + } + } + + @Test + public void testFloatWriterEmpty() throws IOException { + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + byte[] compressed = writer.getBytes().toByteArray(); + assertEquals(AlpConstants.HEADER_SIZE, compressed.length); + } + + @Test + public void testFloatWriterReset() throws IOException { + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + for (int i = 0; i < 100; i++) { + writer.writeFloat(i * 0.1f); + } + + byte[] first = writer.getBytes().toByteArray(); + assertTrue(first.length > AlpConstants.HEADER_SIZE); + + writer.reset(); + + // Write different data + for (int i = 0; i < 50; i++) { + writer.writeFloat(i * 2.0f); + } + + byte[] second = writer.getBytes().toByteArray(); + assertTrue(second.length > AlpConstants.HEADER_SIZE); + + // Verify second batch round-trips correctly + float[] output = new float[50]; + AlpWrapper.decodeFloats(second, second.length, output, 50); + for (int i = 0; i < 50; i++) { + assertEquals( + Float.floatToRawIntBits(i * 2.0f), + Float.floatToRawIntBits(output[i])); + } + } + + // ========== Double writer ========== + + @Test + public void testDoubleWriterRoundTrip() throws IOException { + double[] values = new double[100]; + for (int i = 0; i < 100; i++) { + values[i] = i * 0.1; + } + + AlpValuesWriter.DoubleAlpValuesWriter writer = new AlpValuesWriter.DoubleAlpValuesWriter(); + for (double v : values) { + writer.writeDouble(v); + } + + byte[] compressed = writer.getBytes().toByteArray(); + + double[] output = new double[values.length]; + AlpWrapper.decodeDoubles(compressed, compressed.length, output, values.length); + + for (int i = 0; i < values.length; i++) { + assertEquals( + "Mismatch at " + i, + Double.doubleToRawLongBits(values[i]), + Double.doubleToRawLongBits(output[i])); + } + } + + @Test + public void testDoubleWriterMultipleVectors() throws IOException { + double[] values = new double[2500]; + Random rng = new Random(42); + for (int i = 0; i < 2500; i++) { + values[i] = Math.round(rng.nextDouble() * 10000) / 100.0; + } + + AlpValuesWriter.DoubleAlpValuesWriter writer = new AlpValuesWriter.DoubleAlpValuesWriter(); + for (double v : values) { + writer.writeDouble(v); + } + + byte[] compressed = writer.getBytes().toByteArray(); + + double[] output = new double[values.length]; + AlpWrapper.decodeDoubles(compressed, compressed.length, output, values.length); + + for (int i = 0; i < values.length; i++) { + assertEquals( + "Mismatch at " + i, + Double.doubleToRawLongBits(values[i]), + Double.doubleToRawLongBits(output[i])); + } + } + + @Test + public void testDoubleWriterSpecialValues() throws IOException { + double[] values = {1.0, Double.NaN, 2.0, Double.POSITIVE_INFINITY, + 3.0, Double.NEGATIVE_INFINITY, 4.0, -0.0, 5.0}; + + AlpValuesWriter.DoubleAlpValuesWriter writer = new AlpValuesWriter.DoubleAlpValuesWriter(); + for (double v : values) { + writer.writeDouble(v); + } + + byte[] compressed = writer.getBytes().toByteArray(); + + double[] output = new double[values.length]; + AlpWrapper.decodeDoubles(compressed, compressed.length, output, values.length); + + for (int i = 0; i < values.length; i++) { + assertEquals( + "Mismatch at " + i, + Double.doubleToRawLongBits(values[i]), + Double.doubleToRawLongBits(output[i])); + } + } + + @Test + public void testDoubleWriterEmpty() throws IOException { + AlpValuesWriter.DoubleAlpValuesWriter writer = new AlpValuesWriter.DoubleAlpValuesWriter(); + byte[] compressed = writer.getBytes().toByteArray(); + assertEquals(AlpConstants.HEADER_SIZE, compressed.length); + } + + // ========== Buffered size / allocated size ========== + + @Test + public void testBufferedSizeGrowsWithValues() { + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + long initial = writer.getBufferedSize(); + for (int i = 0; i < 10; i++) { + writer.writeFloat(i * 0.1f); + } + assertTrue(writer.getBufferedSize() > initial); + } + + @Test + public void testAllocatedSizeNonNegative() { + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + assertTrue(writer.getAllocatedSize() > 0); + } + + @Test + public void testMemUsageString() { + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + String usage = writer.memUsageString("TEST"); + assertTrue(usage.startsWith("TEST")); + assertTrue(usage.contains("ALP")); + } +} From 27508d93bf59e23300f78172e753abdd41436ecc Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Sat, 7 Mar 2026 19:00:54 +0000 Subject: [PATCH 07/24] ALP: Add lazy AlpValuesReader for float and double columns Implements AlpValuesReader (abstract), AlpValuesReaderForFloat, and AlpValuesReaderForDouble. Uses lazy per-vector decoding: initFromPage reads only the header and offset array, vectors are decoded on first access. skip() is O(1) with no decoding. --- .../column/values/alp/AlpValuesReader.java | 117 +++++++++ .../values/alp/AlpValuesReaderForDouble.java | 54 ++++ .../values/alp/AlpValuesReaderForFloat.java | 54 ++++ .../values/alp/AlpValuesReaderTest.java | 241 ++++++++++++++++++ 4 files changed, 466 insertions(+) create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReader.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java create mode 100644 parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesReaderTest.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReader.java new file mode 100644 index 0000000000..dc8c04c960 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReader.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.ParquetDecodingException; + +/** + * Abstract base class for ALP values readers with lazy per-vector decoding. + * + *

On {@link #initFromPage}, reads the 7-byte header and offset array but does NOT + * decode any vectors. Vectors are decoded on demand when values are accessed. + * {@link #skip()} is O(1) — it just advances the index. + */ +abstract class AlpValuesReader extends ValuesReader { + + protected int vectorSize; + protected int totalCount; + protected int numVectors; + protected int currentIndex; + protected int decodedVectorIndex = -1; + protected int[] vectorOffsets; + protected byte[] rawData; // all data after header + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException { + int available = (int) stream.available(); + if (available < HEADER_SIZE) { + throw new ParquetDecodingException( + "ALP page too small for header: " + available + " bytes"); + } + + // Read header + byte[] headerBytes = new byte[HEADER_SIZE]; + stream.read(headerBytes); + ByteBuffer header = ByteBuffer.wrap(headerBytes).order(ByteOrder.LITTLE_ENDIAN); + + int compressionMode = header.get() & 0xFF; + int integerEncoding = header.get() & 0xFF; + int logVectorSize = header.get() & 0xFF; + totalCount = header.getInt(); + + if (compressionMode != COMPRESSION_MODE_ALP) { + throw new ParquetDecodingException( + "Unsupported ALP compression mode: " + compressionMode); + } + if (integerEncoding != INTEGER_ENCODING_FOR) { + throw new ParquetDecodingException( + "Unsupported ALP integer encoding: " + integerEncoding); + } + + vectorSize = 1 << logVectorSize; + numVectors = (totalCount + vectorSize - 1) / vectorSize; + currentIndex = 0; + decodedVectorIndex = -1; + + if (numVectors == 0) { + vectorOffsets = new int[0]; + rawData = new byte[0]; + return; + } + + // Read remaining data (offsets + vectors) + int remaining = (int) stream.available(); + rawData = new byte[remaining]; + stream.read(rawData); + + // Parse offsets from rawData + ByteBuffer body = ByteBuffer.wrap(rawData, 0, numVectors * OFFSET_SIZE) + .order(ByteOrder.LITTLE_ENDIAN); + vectorOffsets = new int[numVectors]; + for (int i = 0; i < numVectors; i++) { + vectorOffsets[i] = body.getInt(); + } + } + + @Override + public void skip() { + currentIndex++; + } + + @Override + public void skip(int n) { + currentIndex += n; + } + + /** Number of elements in the given vector (last vector may be partial). */ + protected int elementsInVector(int vectorIdx) { + if (vectorIdx < totalCount / vectorSize) { + return vectorSize; + } + int rem = totalCount % vectorSize; + return (rem == 0) ? vectorSize : rem; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java new file mode 100644 index 0000000000..69ae6debb5 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import org.apache.parquet.io.ParquetDecodingException; + +/** + * ALP values reader for double columns with lazy per-vector decoding. + */ +public class AlpValuesReaderForDouble extends AlpValuesReader { + + private double[] decodedBuffer; + + @Override + public double readDouble() { + if (currentIndex >= totalCount) { + throw new ParquetDecodingException("ALP double reader exhausted at index " + currentIndex); + } + int vectorIdx = currentIndex / vectorSize; + int posInVector = currentIndex % vectorSize; + ensureVectorDecoded(vectorIdx); + currentIndex++; + return decodedBuffer[posInVector]; + } + + private void ensureVectorDecoded(int vectorIdx) { + if (vectorIdx == decodedVectorIndex) { + return; + } + int numElements = elementsInVector(vectorIdx); + int dataOffset = vectorOffsets[vectorIdx]; + AlpCompression.DoubleCompressedVector cv = + AlpCompression.DoubleCompressedVector.load(rawData, dataOffset, numElements); + decodedBuffer = new double[numElements]; + AlpCompression.decompressDoubleVector(cv, decodedBuffer); + decodedVectorIndex = vectorIdx; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java new file mode 100644 index 0000000000..8fd39f3df0 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import org.apache.parquet.io.ParquetDecodingException; + +/** + * ALP values reader for float columns with lazy per-vector decoding. + */ +public class AlpValuesReaderForFloat extends AlpValuesReader { + + private float[] decodedBuffer; + + @Override + public float readFloat() { + if (currentIndex >= totalCount) { + throw new ParquetDecodingException("ALP float reader exhausted at index " + currentIndex); + } + int vectorIdx = currentIndex / vectorSize; + int posInVector = currentIndex % vectorSize; + ensureVectorDecoded(vectorIdx); + currentIndex++; + return decodedBuffer[posInVector]; + } + + private void ensureVectorDecoded(int vectorIdx) { + if (vectorIdx == decodedVectorIndex) { + return; + } + int numElements = elementsInVector(vectorIdx); + int dataOffset = vectorOffsets[vectorIdx]; + AlpCompression.FloatCompressedVector cv = + AlpCompression.FloatCompressedVector.load(rawData, dataOffset, numElements); + decodedBuffer = new float[numElements]; + AlpCompression.decompressFloatVector(cv, decodedBuffer); + decodedVectorIndex = vectorIdx; + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesReaderTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesReaderTest.java new file mode 100644 index 0000000000..51d81e8a42 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesReaderTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.junit.Test; + +public class AlpValuesReaderTest { + + private static ByteBufferInputStream toInputStream(byte[] data) { + return ByteBufferInputStream.wrap(ByteBuffer.wrap(data)); + } + + // ========== Float writer → reader round-trip ========== + + private static void assertFloatWriterReaderRoundTrip(float[] values) throws IOException { + // Write + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + for (float v : values) { + writer.writeFloat(v); + } + byte[] compressed = writer.getBytes().toByteArray(); + + // Read + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, toInputStream(compressed)); + + for (int i = 0; i < values.length; i++) { + float actual = reader.readFloat(); + assertEquals( + "Mismatch at " + i, + Float.floatToRawIntBits(values[i]), + Float.floatToRawIntBits(actual)); + } + } + + @Test + public void testFloatReaderSingleVector() throws IOException { + float[] values = new float[100]; + for (int i = 0; i < 100; i++) { + values[i] = i * 0.1f; + } + assertFloatWriterReaderRoundTrip(values); + } + + @Test + public void testFloatReaderMultipleVectors() throws IOException { + float[] values = new float[2500]; + Random rng = new Random(42); + for (int i = 0; i < 2500; i++) { + values[i] = Math.round(rng.nextFloat() * 10000) / 100.0f; + } + assertFloatWriterReaderRoundTrip(values); + } + + @Test + public void testFloatReaderExactVectorSize() throws IOException { + float[] values = new float[AlpConstants.DEFAULT_VECTOR_SIZE]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.5f; + } + assertFloatWriterReaderRoundTrip(values); + } + + @Test + public void testFloatReaderSpecialValues() throws IOException { + float[] values = {1.0f, Float.NaN, 2.0f, Float.POSITIVE_INFINITY, + 3.0f, Float.NEGATIVE_INFINITY, 4.0f, -0.0f, 5.0f}; + assertFloatWriterReaderRoundTrip(values); + } + + // ========== Float skip ========== + + @Test + public void testFloatReaderSkip() throws IOException { + float[] values = new float[50]; + for (int i = 0; i < 50; i++) { + values[i] = i * 0.3f; + } + + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + for (float v : values) { + writer.writeFloat(v); + } + byte[] compressed = writer.getBytes().toByteArray(); + + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, toInputStream(compressed)); + + // Skip first 10, read next 5 + reader.skip(10); + for (int i = 10; i < 15; i++) { + assertEquals( + Float.floatToRawIntBits(values[i]), + Float.floatToRawIntBits(reader.readFloat())); + } + + // Skip 20 more, read next + reader.skip(20); + assertEquals( + Float.floatToRawIntBits(values[35]), + Float.floatToRawIntBits(reader.readFloat())); + } + + @Test + public void testFloatReaderSkipAcrossVectors() throws IOException { + float[] values = new float[2500]; + for (int i = 0; i < 2500; i++) { + values[i] = i * 0.01f; + } + + AlpValuesWriter.FloatAlpValuesWriter writer = new AlpValuesWriter.FloatAlpValuesWriter(); + for (float v : values) { + writer.writeFloat(v); + } + byte[] compressed = writer.getBytes().toByteArray(); + + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, toInputStream(compressed)); + + // Skip into second vector + reader.skip(1500); + assertEquals( + Float.floatToRawIntBits(values[1500]), + Float.floatToRawIntBits(reader.readFloat())); + } + + // ========== Double writer → reader round-trip ========== + + private static void assertDoubleWriterReaderRoundTrip(double[] values) throws IOException { + AlpValuesWriter.DoubleAlpValuesWriter writer = new AlpValuesWriter.DoubleAlpValuesWriter(); + for (double v : values) { + writer.writeDouble(v); + } + byte[] compressed = writer.getBytes().toByteArray(); + + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, toInputStream(compressed)); + + for (int i = 0; i < values.length; i++) { + double actual = reader.readDouble(); + assertEquals( + "Mismatch at " + i, + Double.doubleToRawLongBits(values[i]), + Double.doubleToRawLongBits(actual)); + } + } + + @Test + public void testDoubleReaderSingleVector() throws IOException { + double[] values = new double[100]; + for (int i = 0; i < 100; i++) { + values[i] = i * 0.1; + } + assertDoubleWriterReaderRoundTrip(values); + } + + @Test + public void testDoubleReaderMultipleVectors() throws IOException { + double[] values = new double[2500]; + Random rng = new Random(42); + for (int i = 0; i < 2500; i++) { + values[i] = Math.round(rng.nextDouble() * 10000) / 100.0; + } + assertDoubleWriterReaderRoundTrip(values); + } + + @Test + public void testDoubleReaderSpecialValues() throws IOException { + double[] values = {1.0, Double.NaN, 2.0, Double.POSITIVE_INFINITY, + 3.0, Double.NEGATIVE_INFINITY, 4.0, -0.0, 5.0}; + assertDoubleWriterReaderRoundTrip(values); + } + + @Test + public void testDoubleReaderSkip() throws IOException { + double[] values = new double[50]; + for (int i = 0; i < 50; i++) { + values[i] = i * 0.3; + } + + AlpValuesWriter.DoubleAlpValuesWriter writer = new AlpValuesWriter.DoubleAlpValuesWriter(); + for (double v : values) { + writer.writeDouble(v); + } + byte[] compressed = writer.getBytes().toByteArray(); + + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, toInputStream(compressed)); + + reader.skip(10); + for (int i = 10; i < 15; i++) { + assertEquals( + Double.doubleToRawLongBits(values[i]), + Double.doubleToRawLongBits(reader.readDouble())); + } + } + + // ========== Partial last vector ========== + + @Test + public void testFloatReaderPartialLastVector() throws IOException { + // 1030 = 1024 + 6 → 2 vectors, last has 6 elements + float[] values = new float[1030]; + for (int i = 0; i < 1030; i++) { + values[i] = i * 0.1f; + } + assertFloatWriterReaderRoundTrip(values); + } + + @Test + public void testDoubleReaderPartialLastVector() throws IOException { + double[] values = new double[1030]; + for (int i = 0; i < 1030; i++) { + values[i] = i * 0.1; + } + assertDoubleWriterReaderRoundTrip(values); + } +} From 1036d0954bf8b402dde668fb85bd117e3f19417c Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Sat, 7 Mar 2026 19:21:25 +0000 Subject: [PATCH 08/24] ALP: Add cross-implementation tests and fix encode/decode to match C++ Add AlpCrossImplTest with 7 test cases that decode C++ reference blobs and verify bit-identical output. Reference blobs were generated by the C++ Arrow ALP implementation via generate_reference_blobs.cc. Fix encode/decode math to use two-step multiplication matching C++: - Encode: value * 10^exponent * 10^(-factor) - Decode: encoded * 10^factor * 10^(-exponent) The previous single-operation approach (value / (10^e / 10^f)) produced 1-ULP differences due to different intermediate floating-point rounding. --- .../column/values/alp/AlpConstants.java | 12 + .../column/values/alp/AlpEncoderDecoder.java | 52 ++- .../column/values/alp/AlpCrossImplTest.java | 303 ++++++++++++++++++ 3 files changed, 333 insertions(+), 34 deletions(-) create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpCrossImplTest.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java index 6e14518aa0..a4420bbecd 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java @@ -69,6 +69,12 @@ private AlpConstants() { static final float[] FLOAT_POW10 = {1e0f, 1e1f, 1e2f, 1e3f, 1e4f, 1e5f, 1e6f, 1e7f, 1e8f, 1e9f, 1e10f}; + // Negative powers of 10 as float, matching C++ PowerOfTenFloat(-power). + // Used in the two-step encode/decode to match C++ floating-point rounding behavior. + static final float[] FLOAT_POW10_NEGATIVE = { + 1e0f, 1e-1f, 1e-2f, 1e-3f, 1e-4f, 1e-5f, 1e-6f, 1e-7f, 1e-8f, 1e-9f, 1e-10f + }; + // ========== Double-specific ========== static final int DOUBLE_MAX_EXPONENT = 18; static final double MAGIC_DOUBLE = 6_755_399_441_055_744.0; // 2^51 + 2^52 @@ -80,6 +86,12 @@ private AlpConstants() { 1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12, 1e13, 1e14, 1e15, 1e16, 1e17, 1e18 }; + // Negative powers of 10 as double, matching C++ PowerOfTenDouble(-power). + static final double[] DOUBLE_POW10_NEGATIVE = { + 1e0, 1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9, 1e-10, + 1e-11, 1e-12, 1e-13, 1e-14, 1e-15, 1e-16, 1e-17, 1e-18 + }; + // ========== Per-vector metadata sizes ========== public static final int ALP_INFO_SIZE = 4; // exponent(1) + factor(1) + num_exceptions(2) public static final int FLOAT_FOR_INFO_SIZE = 5; // frame_of_reference(4) + bit_width(1) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java index 34d2f091f1..da489ec35e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java @@ -27,8 +27,8 @@ * then applying Frame of Reference encoding and bit-packing. * Values that cannot be losslessly converted are stored as exceptions. * - *

Encoding formula: encoded = round(value * 10^exponent / 10^factor) - *

Decoding formula: value = encoded / 10^exponent * 10^factor + *

Encoding formula: encoded = round(value * 10^exponent * 10^(-factor)) + *

Decoding formula: value = encoded * 10^factor * 10^(-exponent) * *

Exception conditions: *