From ae5618eb230071ff99e14ef860f29b7557a420ef Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 21 May 2026 09:28:00 -0600 Subject: [PATCH] chore: remove dead useDecimal128 plumbing The internal `spark.comet.use.decimal128` conf had three write-only callers and zero readers. The `useDecimal128` boolean threaded through the entire `CometVector` hierarchy was always `true` in production (both `NativeUtil` call sites hardcoded `true`), so the `false` branch of `CometVector.getDecimal()` was unreachable. The Rust `SparkParquetOptions.use_decimal_128` field was declared, defaulted to `false`, and never read. Removed: - `CometConf.COMET_USE_DECIMAL_128` and its three `setConfString` writers - `useDecimal128` field/parameter across `CometVector` and all subclasses - The dead branch in `CometVector.getDecimal()` - `use_decimal_128` from `SparkParquetOptions` - The `Seq(true, false).foreach` sweep in `ParquetReadSuite "decimals"` --- native/core/src/execution/columnar_to_row.rs | 14 +++---- native/core/src/parquet/parquet_support.rs | 4 -- .../comet/vector/CometDecodedVector.java | 9 ++--- .../comet/vector/CometDelegateVector.java | 10 ++--- .../comet/vector/CometDictionaryVector.java | 14 +++---- .../apache/comet/vector/CometListVector.java | 9 ++--- .../apache/comet/vector/CometMapVector.java | 9 ++--- .../apache/comet/vector/CometPlainVector.java | 15 ++++--- .../comet/vector/CometSelectionVector.java | 6 +-- .../comet/vector/CometStructVector.java | 9 ++--- .../org/apache/comet/vector/CometVector.java | 33 ++++++---------- .../scala/org/apache/comet/CometConf.scala | 10 ----- .../CometBatchKernelCodegenInput.scala | 3 +- .../apache/comet/rules/CometExecRule.scala | 3 -- .../org/apache/comet/vector/NativeUtil.scala | 6 +-- .../shuffle/CometShuffleExchangeExec.scala | 3 -- .../apache/spark/sql/comet/operators.scala | 5 --- .../comet/parquet/TestColumnReader.java | 6 +-- .../exec/CometColumnarShuffleSuite.scala | 2 +- .../comet/parquet/ParquetReadSuite.scala | 39 +++++++++---------- 20 files changed, 75 insertions(+), 134 deletions(-) diff --git a/native/core/src/execution/columnar_to_row.rs b/native/core/src/execution/columnar_to_row.rs index 9a3616bef7..14e115cba0 100644 --- a/native/core/src/execution/columnar_to_row.rs +++ b/native/core/src/execution/columnar_to_row.rs @@ -1052,10 +1052,10 @@ impl ColumnarToRowContext { }) } (DataType::Int32, DataType::Decimal128(precision, scale)) => { - // Parquet stores small-precision decimals as Int32 for efficiency. - // When COMET_USE_DECIMAL_128 is false, BatchReader produces these types. - // The Int32 value is already scaled (e.g., -1 means -0.01 for scale 2). - // We need to reinterpret (not cast) to Decimal128 preserving the value. + // Parquet stores small-precision decimals as Int32 for efficiency, and the + // reader may surface them as the physical Int32 type. The value is already + // scaled (e.g., -1 means -0.01 for scale 2). Reinterpret (not cast) to + // Decimal128 preserving the value. let int_array = array.as_any().downcast_ref::().ok_or_else(|| { CometError::Internal("Failed to downcast to Int32Array".to_string()) })?; @@ -2581,8 +2581,7 @@ mod tests { #[test] fn test_convert_int32_to_decimal128() { // Test that Int32 arrays are correctly cast to Decimal128 when schema expects Decimal128. - // This can happen when COMET_USE_DECIMAL_128 is false and the parquet reader produces - // Int32 for small-precision decimals. + // This can happen when the parquet reader surfaces small-precision decimals as Int32. // Create an Int32 array representing decimals: [-1, -2, -3] which at scale 2 means // [-0.01, -0.02, -0.03] @@ -2619,8 +2618,7 @@ mod tests { #[test] fn test_convert_int64_to_decimal128() { // Test that Int64 arrays are correctly cast to Decimal128 when schema expects Decimal128. - // This can happen when COMET_USE_DECIMAL_128 is false and the parquet reader produces - // Int64 for medium-precision decimals. + // This can happen when the parquet reader surfaces medium-precision decimals as Int64. // Create an Int64 array representing decimals let int_array: ArrayRef = Arc::new(Int64Array::from(vec![-100i64, -200, -300])); diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index 0e0e7c2a6e..06abe5b539 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -74,8 +74,6 @@ pub struct SparkParquetOptions { pub allow_incompat: bool, /// Support casting unsigned ints to signed ints (used by Parquet SchemaAdapter) pub allow_cast_unsigned_ints: bool, - /// Whether to always represent decimals using 128 bits. If false, the native reader may represent decimals using 32 or 64 bits, depending on the precision. - pub use_decimal_128: bool, /// Whether to read dates/timestamps that were written in the legacy hybrid Julian + Gregorian calendar as it is. If false, throw exceptions instead. If the spark type is TimestampNTZ, this should be true. pub use_legacy_date_timestamp_or_ntz: bool, // Whether schema field names are case sensitive @@ -105,7 +103,6 @@ impl SparkParquetOptions { timezone: timezone.to_string(), allow_incompat, allow_cast_unsigned_ints: false, - use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, return_null_struct_if_all_fields_missing: true, @@ -121,7 +118,6 @@ impl SparkParquetOptions { timezone: "".to_string(), allow_incompat, allow_cast_unsigned_ints: false, - use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, return_null_struct_if_all_fields_missing: true, diff --git a/spark/src/main/java/org/apache/comet/vector/CometDecodedVector.java b/spark/src/main/java/org/apache/comet/vector/CometDecodedVector.java index f699134f85..a37683df2c 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometDecodedVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometDecodedVector.java @@ -40,13 +40,12 @@ public abstract class CometDecodedVector extends CometVector { private byte validityByteCache; protected boolean isUuid; - protected CometDecodedVector(ValueVector vector, Field valueField, boolean useDecimal128) { - this(vector, valueField, useDecimal128, false); + protected CometDecodedVector(ValueVector vector, Field valueField) { + this(vector, valueField, false); } - protected CometDecodedVector( - ValueVector vector, Field valueField, boolean useDecimal128, boolean isUuid) { - super(Utils.fromArrowField(valueField), useDecimal128); + protected CometDecodedVector(ValueVector vector, Field valueField, boolean isUuid) { + super(Utils.fromArrowField(valueField)); this.valueVector = vector; this.numNulls = valueVector.getNullCount(); this.numValues = valueVector.getValueCount(); diff --git a/spark/src/main/java/org/apache/comet/vector/CometDelegateVector.java b/spark/src/main/java/org/apache/comet/vector/CometDelegateVector.java index 8874d11b70..287408796f 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometDelegateVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometDelegateVector.java @@ -33,15 +33,11 @@ public class CometDelegateVector extends CometVector { protected CometVector delegate; public CometDelegateVector(DataType dataType) { - this(dataType, null, false); + this(dataType, null); } - public CometDelegateVector(DataType dataType, boolean useDecimal128) { - this(dataType, null, useDecimal128); - } - - public CometDelegateVector(DataType dataType, CometVector delegate, boolean useDecimal128) { - super(dataType, useDecimal128); + public CometDelegateVector(DataType dataType, CometVector delegate) { + super(dataType); if (delegate instanceof CometDelegateVector) { throw new IllegalArgumentException("cannot have nested delegation"); } diff --git a/spark/src/main/java/org/apache/comet/vector/CometDictionaryVector.java b/spark/src/main/java/org/apache/comet/vector/CometDictionaryVector.java index a49255e7c6..15f50b1de9 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometDictionaryVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometDictionaryVector.java @@ -35,21 +35,17 @@ public class CometDictionaryVector extends CometDecodedVector { private final boolean isAlias; public CometDictionaryVector( - CometPlainVector indices, - CometDictionary values, - DictionaryProvider provider, - boolean useDecimal128) { - this(indices, values, provider, useDecimal128, false, false); + CometPlainVector indices, CometDictionary values, DictionaryProvider provider) { + this(indices, values, provider, false, false); } public CometDictionaryVector( CometPlainVector indices, CometDictionary values, DictionaryProvider provider, - boolean useDecimal128, boolean isAlias, boolean isUuid) { - super(indices.valueVector, values.getValueVector().getField(), useDecimal128, isUuid); + super(indices.valueVector, values.getValueVector().getField(), isUuid); Preconditions.checkArgument( indices.valueVector instanceof IntVector, "'indices' should be a IntVector"); this.values = values; @@ -131,11 +127,11 @@ byte[] getBinaryDecimal(int i) { public CometVector slice(int offset, int length) { TransferPair tp = indices.valueVector.getTransferPair(indices.valueVector.getAllocator()); tp.splitAndTransfer(offset, length); - CometPlainVector sliced = new CometPlainVector(tp.getTo(), useDecimal128); + CometPlainVector sliced = new CometPlainVector(tp.getTo()); // Set the alias flag to true so that the sliced vector will not close the dictionary vector. // Otherwise, if the dictionary is closed, the sliced vector will not be able to access the // dictionary. - return new CometDictionaryVector(sliced, values, provider, useDecimal128, true, isUuid); + return new CometDictionaryVector(sliced, values, provider, true, isUuid); } } diff --git a/spark/src/main/java/org/apache/comet/vector/CometListVector.java b/spark/src/main/java/org/apache/comet/vector/CometListVector.java index 93e8e8bf9f..f1e112ec1f 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometListVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometListVector.java @@ -33,14 +33,13 @@ public class CometListVector extends CometDecodedVector { final ColumnVector dataColumnVector; final DictionaryProvider dictionaryProvider; - public CometListVector( - ValueVector vector, boolean useDecimal128, DictionaryProvider dictionaryProvider) { - super(vector, vector.getField(), useDecimal128); + public CometListVector(ValueVector vector, DictionaryProvider dictionaryProvider) { + super(vector, vector.getField()); this.listVector = ((ListVector) vector); this.dataVector = listVector.getDataVector(); this.dictionaryProvider = dictionaryProvider; - this.dataColumnVector = getVector(dataVector, useDecimal128, dictionaryProvider); + this.dataColumnVector = getVector(dataVector, dictionaryProvider); } @Override @@ -57,6 +56,6 @@ public CometVector slice(int offset, int length) { TransferPair tp = this.valueVector.getTransferPair(this.valueVector.getAllocator()); tp.splitAndTransfer(offset, length); - return new CometListVector(tp.getTo(), useDecimal128, dictionaryProvider); + return new CometListVector(tp.getTo(), dictionaryProvider); } } diff --git a/spark/src/main/java/org/apache/comet/vector/CometMapVector.java b/spark/src/main/java/org/apache/comet/vector/CometMapVector.java index c5984a4dcb..4627a4408e 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometMapVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometMapVector.java @@ -37,16 +37,15 @@ public class CometMapVector extends CometDecodedVector { final ColumnVector keys; final ColumnVector values; - public CometMapVector( - ValueVector vector, boolean useDecimal128, DictionaryProvider dictionaryProvider) { - super(vector, vector.getField(), useDecimal128); + public CometMapVector(ValueVector vector, DictionaryProvider dictionaryProvider) { + super(vector, vector.getField()); this.mapVector = ((MapVector) vector); this.dataVector = mapVector.getDataVector(); this.dictionaryProvider = dictionaryProvider; if (dataVector instanceof StructVector) { - this.dataColumnVector = new CometStructVector(dataVector, useDecimal128, dictionaryProvider); + this.dataColumnVector = new CometStructVector(dataVector, dictionaryProvider); if (dataColumnVector.children.size() != 2) { throw new RuntimeException( @@ -77,6 +76,6 @@ public CometVector slice(int offset, int length) { TransferPair tp = this.valueVector.getTransferPair(this.valueVector.getAllocator()); tp.splitAndTransfer(offset, length); - return new CometMapVector(tp.getTo(), useDecimal128, dictionaryProvider); + return new CometMapVector(tp.getTo(), dictionaryProvider); } } diff --git a/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java b/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java index 2a30be1b1c..d8c4130406 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java @@ -40,17 +40,16 @@ public class CometPlainVector extends CometDecodedVector { private boolean isReused; - public CometPlainVector(ValueVector vector, boolean useDecimal128) { - this(vector, useDecimal128, false); + public CometPlainVector(ValueVector vector) { + this(vector, false); } - public CometPlainVector(ValueVector vector, boolean useDecimal128, boolean isUuid) { - this(vector, useDecimal128, isUuid, false); + public CometPlainVector(ValueVector vector, boolean isUuid) { + this(vector, isUuid, false); } - public CometPlainVector( - ValueVector vector, boolean useDecimal128, boolean isUuid, boolean isReused) { - super(vector, vector.getField(), useDecimal128, isUuid); + public CometPlainVector(ValueVector vector, boolean isUuid, boolean isReused) { + super(vector, vector.getField(), isUuid); // NullType doesn't have data buffer. if (vector instanceof NullVector) { this.valueBufferAddress = -1; @@ -184,7 +183,7 @@ public CometVector slice(int offset, int length) { TransferPair tp = this.valueVector.getTransferPair(this.valueVector.getAllocator()); tp.splitAndTransfer(offset, length); - return new CometPlainVector(tp.getTo(), useDecimal128); + return new CometPlainVector(tp.getTo()); } private static UUID convertToUuid(byte[] buf) { diff --git a/spark/src/main/java/org/apache/comet/vector/CometSelectionVector.java b/spark/src/main/java/org/apache/comet/vector/CometSelectionVector.java index 3c353976d9..72ce753d08 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometSelectionVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometSelectionVector.java @@ -71,8 +71,7 @@ public class CometSelectionVector extends CometVector { * @throws IllegalArgumentException if any index is out of bounds */ public CometSelectionVector(CometVector values, int[] indices, int numValues) { - // Use the values vector's datatype, useDecimal128, and dictionary provider - super(values.dataType(), values.useDecimal128); + super(values.dataType()); this.values = values; this.selectionIndices = indices; @@ -97,8 +96,7 @@ public CometSelectionVector(CometVector values, int[] indices, int numValues) { } indicesVector.setValueCount(numValues); - this.indices = - CometVector.getVector(indicesVector, values.useDecimal128, values.getDictionaryProvider()); + this.indices = CometVector.getVector(indicesVector, values.getDictionaryProvider()); } /** diff --git a/spark/src/main/java/org/apache/comet/vector/CometStructVector.java b/spark/src/main/java/org/apache/comet/vector/CometStructVector.java index fbf2764919..0e5bc3248a 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometStructVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometStructVector.java @@ -33,9 +33,8 @@ public class CometStructVector extends CometDecodedVector { final List children; final DictionaryProvider dictionaryProvider; - public CometStructVector( - ValueVector vector, boolean useDecimal128, DictionaryProvider dictionaryProvider) { - super(vector, vector.getField(), useDecimal128); + public CometStructVector(ValueVector vector, DictionaryProvider dictionaryProvider) { + super(vector, vector.getField()); StructVector structVector = ((StructVector) vector); @@ -44,7 +43,7 @@ public CometStructVector( for (int i = 0; i < size; ++i) { ValueVector value = structVector.getVectorById(i); - children.add(getVector(value, useDecimal128, dictionaryProvider)); + children.add(getVector(value, dictionaryProvider)); } this.children = children; this.dictionaryProvider = dictionaryProvider; @@ -60,6 +59,6 @@ public CometVector slice(int offset, int length) { TransferPair tp = this.valueVector.getTransferPair(this.valueVector.getAllocator()); tp.splitAndTransfer(offset, length); - return new CometStructVector(tp.getTo(), useDecimal128, dictionaryProvider); + return new CometStructVector(tp.getTo(), dictionaryProvider); } } diff --git a/spark/src/main/java/org/apache/comet/vector/CometVector.java b/spark/src/main/java/org/apache/comet/vector/CometVector.java index f922f2281c..24696dbacb 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometVector.java @@ -32,7 +32,6 @@ import org.apache.arrow.vector.types.pojo.DictionaryEncoding; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.IntegerType; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; import org.apache.spark.sql.vectorized.ColumnarMap; @@ -43,7 +42,6 @@ public abstract class CometVector extends ColumnVector { private static final int DECIMAL_BYTE_WIDTH = 16; private final byte[] DECIMAL_BYTES = new byte[DECIMAL_BYTE_WIDTH]; - protected final boolean useDecimal128; private static final long decimalValOffset; @@ -58,9 +56,8 @@ public abstract class CometVector extends ColumnVector { } } - public CometVector(DataType type, boolean useDecimal128) { + public CometVector(DataType type) { super(type); - this.useDecimal128 = useDecimal128; } /** @@ -86,10 +83,8 @@ public boolean isFixedLength() { @Override public Decimal getDecimal(int i, int precision, int scale) { if (isNullAt(i)) return null; - if (!useDecimal128 && precision <= Decimal.MAX_INT_DIGITS() && type instanceof IntegerType) { - return createDecimal(getInt(i), precision, scale); - } else if (precision <= Decimal.MAX_LONG_DIGITS()) { - return createDecimal(useDecimal128 ? getLongDecimal(i) : getLong(i), precision, scale); + if (precision <= Decimal.MAX_LONG_DIGITS()) { + return createDecimal(getLongDecimal(i), precision, scale); } else { byte[] bytes = getBinaryDecimal(i); BigInteger bigInteger = new BigInteger(bytes); @@ -230,37 +225,33 @@ public DictionaryProvider getDictionaryProvider() { * Returns a corresponding `CometVector` implementation based on the given Arrow `ValueVector`. * * @param vector Arrow `ValueVector` - * @param useDecimal128 Whether to use Decimal128 for decimal column * @return `CometVector` implementation */ - public static CometVector getVector( - ValueVector vector, boolean useDecimal128, DictionaryProvider dictionaryProvider) { + public static CometVector getVector(ValueVector vector, DictionaryProvider dictionaryProvider) { if (vector instanceof StructVector) { - return new CometStructVector(vector, useDecimal128, dictionaryProvider); + return new CometStructVector(vector, dictionaryProvider); } else if (vector instanceof MapVector) { - return new CometMapVector(vector, useDecimal128, dictionaryProvider); + return new CometMapVector(vector, dictionaryProvider); } else if (vector instanceof ListVector) { - return new CometListVector(vector, useDecimal128, dictionaryProvider); + return new CometListVector(vector, dictionaryProvider); } else { DictionaryEncoding dictionaryEncoding = vector.getField().getDictionary(); - CometPlainVector cometVector = new CometPlainVector(vector, useDecimal128); + CometPlainVector cometVector = new CometPlainVector(vector); if (dictionaryEncoding == null) { return cometVector; } else { Dictionary dictionary = dictionaryProvider.lookup(dictionaryEncoding.getId()); - CometPlainVector dictionaryVector = - new CometPlainVector(dictionary.getVector(), useDecimal128); + CometPlainVector dictionaryVector = new CometPlainVector(dictionary.getVector()); CometDictionary cometDictionary = new CometDictionary(dictionaryVector); - return new CometDictionaryVector( - cometVector, cometDictionary, dictionaryProvider, useDecimal128); + return new CometDictionaryVector(cometVector, cometDictionary, dictionaryProvider); } } } - protected static CometVector getVector(ValueVector vector, boolean useDecimal128) { - return getVector(vector, useDecimal128, null); + protected static CometVector getVector(ValueVector vector) { + return getVector(vector, null); } private UnsupportedOperationException notImplementedException() { diff --git a/spark/src/main/scala/org/apache/comet/CometConf.scala b/spark/src/main/scala/org/apache/comet/CometConf.scala index fdd1ae2073..82700a939e 100644 --- a/spark/src/main/scala/org/apache/comet/CometConf.scala +++ b/spark/src/main/scala/org/apache/comet/CometConf.scala @@ -699,16 +699,6 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_USE_DECIMAL_128: ConfigEntry[Boolean] = conf("spark.comet.use.decimal128") - .internal() - .category(CATEGORY_EXEC) - .doc("If true, Comet will always use 128 bits to represent a decimal value, regardless of " + - "its precision. If false, Comet will use 32, 64 and 128 bits respectively depending on " + - "the precision. N.B. this is NOT a user-facing config but should be inferred and set by " + - "Comet itself.") - .booleanConf - .createWithDefault(false) - val COMET_USE_LAZY_MATERIALIZATION: ConfigEntry[Boolean] = conf( "spark.comet.use.lazyMaterialization") .internal() diff --git a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala index 9a4f4bcc57..79a2af6837 100644 --- a/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala +++ b/spark/src/main/scala/org/apache/comet/codegen/CometBatchKernelCodegenInput.scala @@ -452,8 +452,7 @@ private[codegen] object CometBatchKernelCodegenInput { out: mutable.ArrayBuffer[String]): Unit = spec match { case sc: ScalarColumnSpec => if (wrapsInCometPlainVector(sc.vectorClass)) { - // `useDecimal128 = true` matches Spark's 128-bit decimal storage. - out += s"this.$path = new $cometPlainVectorName($source, true);" + out += s"this.$path = new $cometPlainVectorName($source);" } else { out += s"this.$path = (${sc.vectorClass.getName}) $source;" } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index c4fad7dde3..65f650f348 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -177,9 +177,6 @@ case class CometExecRule(session: SparkSession) case s: ShuffleExchangeExec => CometShuffleExchangeExec.shuffleSupported(s) match { case Some(CometNativeShuffle) => - // Switch to use Decimal128 regardless of precision, since Arrow native execution - // doesn't support Decimal32 and Decimal64 yet. - conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true") CometShuffleExchangeExec(s, shuffleType = CometNativeShuffle) case Some(CometColumnarShuffle) => CometShuffleExchangeExec(s, shuffleType = CometColumnarShuffle) diff --git a/spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 45245121a0..cfb9512e80 100644 --- a/spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/spark/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -258,11 +258,8 @@ class NativeUtil { val arrowSchema = schemas(i) val arrowArray = arrays(i) - // Native execution should always have 'useDecimal128' set to true since it doesn't support - // other cases. arrayVectors += CometVector.getVector( importer.importVector(arrowArray, arrowSchema, dictionaryProvider), - true, dictionaryProvider) } arrayVectors.toSeq @@ -305,8 +302,7 @@ object NativeUtil { def rootAsBatch(arrowRoot: VectorSchemaRoot, provider: DictionaryProvider): ColumnarBatch = { val vectors = (0 until arrowRoot.getFieldVectors.size()).map { i => val vector = arrowRoot.getFieldVectors.get(i) - // Native shuffle always uses decimal128. - CometVector.getVector(vector, true, provider) + CometVector.getVector(vector, provider) } new ColumnarBatch(vectors.toArray, arrowRoot.getRowCount) } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index d4ee4e4ccf..493c20f8b7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -243,9 +243,6 @@ object CometShuffleExchangeExec op: ShuffleExchangeExec): CometNativeExec = { shuffleSupported(op) match { case Some(CometNativeShuffle) if op.children.forall(_.isInstanceOf[CometNativeExec]) => - // Switch to use Decimal128 regardless of precision, since Arrow native execution - // doesn't support Decimal32 and Decimal64 yet. - conf.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true") CometSinkPlaceHolder( nativeOp, op, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index d71c8e10ba..4da5ad1f33 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -42,7 +42,6 @@ import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, HashAggregat import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructType, TimestampNTZType, TimestampType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -425,10 +424,6 @@ abstract class CometNativeExec extends CometExec { throw new CometRuntimeException( s"CometNativeExec should not be executed directly without a serialized plan: $this") case Some(serializedPlan) => - // Switch to use Decimal128 regardless of precision, since Arrow native execution - // doesn't support Decimal32 and Decimal64 yet. - SQLConf.get.setConfString(CometConf.COMET_USE_DECIMAL_128.key, "true") - val serializedPlanCopy = serializedPlan // TODO: support native metrics for all operators. val nativeMetrics = CometMetricNode.fromCometPlan(this) diff --git a/spark/src/test/java/org/apache/comet/parquet/TestColumnReader.java b/spark/src/test/java/org/apache/comet/parquet/TestColumnReader.java index 2052222ecd..8a09c14a54 100644 --- a/spark/src/test/java/org/apache/comet/parquet/TestColumnReader.java +++ b/spark/src/test/java/org/apache/comet/parquet/TestColumnReader.java @@ -39,15 +39,15 @@ public void testIsFixedLength() { BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); ValueVector vv = new IntVector("v1", allocator); - CometVector vector = new CometPlainVector(vv, false); + CometVector vector = new CometPlainVector(vv); assertTrue(vector.isFixedLength()); vv = new FixedSizeBinaryVector("v2", allocator, 12); - vector = new CometPlainVector(vv, false); + vector = new CometPlainVector(vv); assertTrue(vector.isFixedLength()); vv = new VarBinaryVector("v3", allocator); - vector = new CometPlainVector(vv, false); + vector = new CometPlainVector(vv); assertFalse(vector.isFixedLength()); } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 17e73c6053..86c6a6aa4b 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -526,7 +526,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar } } - test("fix: StreamReader should always set useDecimal128 as true") { + test("fix: StreamReader should read shuffled decimal columns as Decimal128") { Seq(10, 201).foreach { numPartitions => withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") { withTempPath { dir => diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index a88613a9d0..8365229f44 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -49,27 +49,24 @@ abstract class ParquetReadSuite extends CometTestBase { import testImplicits._ testStandardAndLegacyModes("decimals") { - Seq(true, false).foreach { useDecimal128 => - Seq(16, 1024).foreach { batchSize => - withSQLConf( - CometConf.COMET_EXEC_ENABLED.key -> false.toString, - CometConf.COMET_USE_DECIMAL_128.key -> useDecimal128.toString, - CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) { - var combinations = Seq((5, 2), (1, 0), (18, 10), (18, 17), (19, 0), (38, 37)) - // If ANSI mode is on, the combination (1, 1) will cause a runtime error. Otherwise, the - // decimal RDD contains all null values and should be able to read back from Parquet. - - if (!SQLConf.get.ansiEnabled) { - combinations = combinations ++ Seq((1, 1)) - } - for ((precision, scale) <- combinations; useDictionary <- Seq(false, true)) { - withTempPath { dir => - val data = makeDecimalRDD(1000, DecimalType(precision, scale), useDictionary) - data.write.parquet(dir.getCanonicalPath) - readParquetFile(dir.getCanonicalPath) { df => - { - checkAnswer(df, data.collect().toSeq) - } + Seq(16, 1024).foreach { batchSize => + withSQLConf( + CometConf.COMET_EXEC_ENABLED.key -> false.toString, + CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) { + var combinations = Seq((5, 2), (1, 0), (18, 10), (18, 17), (19, 0), (38, 37)) + // If ANSI mode is on, the combination (1, 1) will cause a runtime error. Otherwise, the + // decimal RDD contains all null values and should be able to read back from Parquet. + + if (!SQLConf.get.ansiEnabled) { + combinations = combinations ++ Seq((1, 1)) + } + for ((precision, scale) <- combinations; useDictionary <- Seq(false, true)) { + withTempPath { dir => + val data = makeDecimalRDD(1000, DecimalType(precision, scale), useDictionary) + data.write.parquet(dir.getCanonicalPath) + readParquetFile(dir.getCanonicalPath) { df => + { + checkAnswer(df, data.collect().toSeq) } } }