From 28d13e86b4b7c4ce06c11a2fcb434f7348f82baf Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Fri, 5 Jun 2026 21:54:58 +0300 Subject: [PATCH] HIVE-29649: Iceberg: Support Parquet DECIMAL_64 vectorization --- .../mr/hive/BaseHiveIcebergMetaHook.java | 11 - .../mr/hive/HiveIcebergInputFormat.java | 15 +- .../iceberg/mr/hive/HiveIcebergMetaHook.java | 9 - .../vectorized_iceberg_read_multitable.q | 12 -- .../llap/vectorized_iceberg_read_mixed.q.out | 24 +-- .../vectorized_iceberg_read_multitable.q.out | 50 ----- .../vectorized_iceberg_read_parquet.q.out | 22 +- .../io/parquet/MapredParquetInputFormat.java | 2 +- .../VectorizedPrimitiveColumnReader.java | 194 ++++++++++-------- .../parquet/TestVectorizedColumnReader.java | 5 + ...torizedDictionaryEncodingColumnReader.java | 5 + .../VectorizedColumnReaderTestBase.java | 59 +++++- .../clientpositive/parquet_decimal64.q | 15 ++ .../llap/parquet_decimal64.q.out | 171 +++++++++++++++ 14 files changed, 393 insertions(+), 201 deletions(-) create mode 100644 ql/src/test/queries/clientpositive/parquet_decimal64.q create mode 100644 ql/src/test/results/clientpositive/llap/parquet_decimal64.q.out diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java index 69fbe5bf99c2..8e8ce639bc68 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java @@ -89,7 +89,6 @@ public class BaseHiveIcebergMetaHook implements HiveMetaHook { ); private static final Set PARAMETERS_TO_REMOVE = ImmutableSet .of(InputFormatConfig.TABLE_SCHEMA, Catalogs.LOCATION, Catalogs.NAME, InputFormatConfig.PARTITION_SPEC); - static final String ORC_FILES_ONLY = "iceberg.orc.files.only"; private static final String ZORDER_FIELDS_JSON_KEY = "zorderFields"; protected final Configuration conf; @@ -197,8 +196,6 @@ public void preCreateTable(CreateTableRequest request) { assertFileFormat(tableProperties.getProperty(TableProperties.DEFAULT_FILE_FORMAT)); - // Set whether the format is ORC, to be used during vectorization. - setOrcOnlyFilesParam(hmsTable); // Remove hive primary key columns from table request, as iceberg doesn't support hive primary key. request.setPrimaryKeys(null); setSortOrder(hmsTable, schema, tableProperties); @@ -456,14 +453,6 @@ protected static PartitionSpec spec(Configuration configuration, Schema schema, return HMSTablePropertyHelper.getPartitionSpec(hmsTable.getParameters(), schema); } - protected void setOrcOnlyFilesParam(org.apache.hadoop.hive.metastore.api.Table hmsTable) { - hmsTable.getParameters().put(ORC_FILES_ONLY, String.valueOf(isOrcOnlyFiles(hmsTable))); - } - - protected boolean isOrcOnlyFiles(org.apache.hadoop.hive.metastore.api.Table hmsTable) { - return !"FALSE".equalsIgnoreCase(hmsTable.getParameters().get(ORC_FILES_ONLY)) && isOrcFileFormat(hmsTable); - } - static boolean isOrcFileFormat(org.apache.hadoop.hive.metastore.api.Table hmsTable) { return hmsTable.getSd().getInputFormat() != null && hmsTable.getSd().getInputFormat().toUpperCase() .contains(org.apache.iceberg.FileFormat.ORC.name()) || org.apache.iceberg.FileFormat.ORC.name() diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java index 5c9781132a35..9b284a71593b 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java @@ -46,7 +46,6 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -249,13 +248,13 @@ public VectorizedSupport.Support[] getSupportedFeatures() { @Override public VectorizedSupport.Support[] getSupportedFeatures(HiveConf hiveConf, TableDesc tableDesc) { - // disabling VectorizedSupport.Support.DECIMAL_64 for Parquet as it doesn't support it - boolean isORCOnly = - Boolean.parseBoolean(tableDesc.getProperties().getProperty(HiveIcebergMetaHook.DECIMAL64_VECTORIZATION)) && - Boolean.parseBoolean(tableDesc.getProperties().getProperty(HiveIcebergMetaHook.ORC_FILES_ONLY)) && - org.apache.iceberg.FileFormat.ORC.name() - .equalsIgnoreCase(tableDesc.getProperties().getProperty(TableProperties.DEFAULT_FILE_FORMAT)); - if (!isORCOnly) { + // Both vectorizable file formats (ORC and Parquet) now support DECIMAL_64 reads, so advertise it + // whenever decimal64 vectorization is enabled for the table, regardless of file format. + boolean decimal64Enabled = + Boolean.parseBoolean(tableDesc.getProperties().getProperty(HiveIcebergMetaHook.DECIMAL64_VECTORIZATION)); + if (!decimal64Enabled) { + // Keep the LLAP ORC reader from emitting decimal64 so it stays consistent with the full-decimal + // operator pipeline; consumed in HiveVectorizedReader#orcRecordReader. final String vectorizationConfName = getVectorizationConfName(tableDesc.getTableName()); LOG.debug("Setting {} for table: {} to true", vectorizationConfName, tableDesc.getTableName()); hiveConf.set(vectorizationConfName, "true"); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 58c2d19373dd..92195078d47b 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -311,8 +311,6 @@ private void doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable // If so, we will create the iceberg table in commitAlterTable and go ahead with the migration assertTableCanBeMigrated(hmsTable); isTableMigration = true; - // Set whether the format is ORC, to be used during vectorization. - setOrcOnlyFilesParam(hmsTable); StorageDescriptor sd = hmsTable.getSd(); preAlterTableProperties = new PreAlterTableProperties(); @@ -375,13 +373,6 @@ private void doPreAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable assertNotCrossTableMetadataLocationChange(hmsTable.getParameters(), context); } - // Migration case is already handled above, in case of migration we don't have all the properties set till this - // point. - if (!isTableMigration) { - // Set whether the format is ORC, to be used during vectorization. - setOrcOnlyFilesParam(hmsTable); - } - } /** diff --git a/iceberg/iceberg-handler/src/test/queries/positive/vectorized_iceberg_read_multitable.q b/iceberg/iceberg-handler/src/test/queries/positive/vectorized_iceberg_read_multitable.q index 349b02f706c4..73c6c05fe8f1 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/vectorized_iceberg_read_multitable.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/vectorized_iceberg_read_multitable.q @@ -11,8 +11,6 @@ insert into customer_ice values (10); create external table orders(o_orderkey int, o_custkey int) stored as orc; insert into orders values (10, 10); -alter table customer_ice set tblproperties ( 'iceberg.orc.files.only' = 'false'); - select sum(1 - l_discount) as revenue FROM customer_ice, orders, lineitem WHERE c_custkey = o_custkey and l_orderkey = o_orderkey limit 20; @@ -21,16 +19,6 @@ create external table lineitem_ice(l_discount decimal(15,2), l_orderkey int) STO TBLPROPERTIES ('iceberg.decimal64.vectorization'='true'); insert into lineitem_ice values (100.2, 10); -select sum(1 - l_discount) as revenue -FROM customer_ice, orders, lineitem_ice -WHERE c_custkey = o_custkey and l_orderkey = o_orderkey limit 20; - -alter table customer_ice set tblproperties ( 'iceberg.orc.files.only' = 'true'); - -select sum(1 - l_discount) as revenue -FROM customer_ice, orders, lineitem -WHERE c_custkey = o_custkey and l_orderkey = o_orderkey limit 20; - select sum(1 - l_discount) as revenue FROM customer_ice, orders, lineitem_ice WHERE c_custkey = o_custkey and l_orderkey = o_orderkey limit 20; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out index 5f31e752db3f..c8fa674841bd 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_mixed.q.out @@ -259,8 +259,8 @@ STAGE PLANS: Map Vectorization: enabled: true enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true - inputFormatFeatureSupport: [] - featureSupportInUse: [] + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] inputFileFormats: org.apache.iceberg.mr.hive.HiveIcebergInputFormat allNative: false usesVectorUDFAdaptor: false @@ -507,7 +507,7 @@ STAGE PLANS: Map Operator Tree: TableScan Vectorization: native: true - vectorizationSchemaColumns: [0:t_float:float, 1:t_double:double, 2:t_boolean:boolean, 3:t_int:int, 4:t_bigint:bigint, 5:t_binary:binary, 6:t_string:string, 7:t_timestamp:timestamp, 8:t_date:date, 9:t_decimal:decimal(4,2), 10:PARTITION__SPEC__ID:int, 11:PARTITION__HASH:bigint, 12:FILE__PATH:string, 13:ROW__POSITION:bigint, 14:PARTITION__PROJECTION:string] + vectorizationSchemaColumns: [0:t_float:float, 1:t_double:double, 2:t_boolean:boolean, 3:t_int:int, 4:t_bigint:bigint, 5:t_binary:binary, 6:t_string:string, 7:t_timestamp:timestamp, 8:t_date:date, 9:t_decimal:decimal(4,2)/DECIMAL_64, 10:PARTITION__SPEC__ID:int, 11:PARTITION__HASH:bigint, 12:FILE__PATH:string, 13:ROW__POSITION:bigint, 14:PARTITION__PROJECTION:string] Select Vectorization: className: VectorSelectOperator native: true @@ -516,7 +516,7 @@ STAGE PLANS: aggregators: VectorUDAFMaxDouble(col 0:float) -> float className: VectorGroupByOperator groupByMode: HASH - keyExpressions: col 1:double, col 2:boolean, col 3:int, col 4:bigint, col 5:binary, col 6:string, col 7:timestamp, col 8:date, col 9:decimal(4,2) + keyExpressions: col 1:double, col 2:boolean, col 3:int, col 4:bigint, col 5:binary, col 6:string, col 7:timestamp, col 8:date, ConvertDecimal64ToDecimal(col 9:decimal(4,2)/DECIMAL_64) -> 15:decimal(4,2) native: false vectorProcessingMode: HASH projectedOutputColumnNums: [0] @@ -531,8 +531,8 @@ STAGE PLANS: Map Vectorization: enabled: true enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true - inputFormatFeatureSupport: [] - featureSupportInUse: [] + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] inputFileFormats: org.apache.iceberg.mr.hive.HiveIcebergInputFormat allNative: false usesVectorUDFAdaptor: false @@ -540,9 +540,9 @@ STAGE PLANS: rowBatchContext: dataColumnCount: 10 includeColumns: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - dataColumns: t_float:float, t_double:double, t_boolean:boolean, t_int:int, t_bigint:bigint, t_binary:binary, t_string:string, t_timestamp:timestamp, t_date:date, t_decimal:decimal(4,2) + dataColumns: t_float:float, t_double:double, t_boolean:boolean, t_int:int, t_bigint:bigint, t_binary:binary, t_string:string, t_timestamp:timestamp, t_date:date, t_decimal:decimal(4,2)/DECIMAL_64 partitionColumnCount: 0 - scratchColumnTypeNames: [] + scratchColumnTypeNames: [decimal(4,2)] Reducer 2 Execution mode: vectorized, llap Reduce Vectorization: @@ -663,7 +663,7 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@tbl_ice_mixed_all_types #### A masked pattern was here #### 1.1 1.2 false 4 567890123456789 6 col7 2012-10-03 19:58:08 1234-09-02 10.01 -5.1 6.2 true 40 567890123456780 8 col07 2012-10-03 19:58:09 1234-09-03 10.02 +5.1 6.2 true 40 567890123456780 8 col07 2012-10-03 19:58:09 1234-09-03 0.00 PREHOOK: query: create external table t1 stored as orc as select * from tbl_ice_mixed_all_types PREHOOK: type: CREATETABLE_AS_SELECT PREHOOK: Input: default@tbl_ice_mixed_all_types @@ -769,7 +769,7 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@tbl_ice_mixed_all_types #### A masked pattern was here #### 1.1 1.2 false 4 567890123456789 6 col7 2012-10-03 19:58:08 1234-09-02 10.01 -5.1 6.2 true 40 567890123456780 8 col07 2012-10-03 19:58:09 1234-09-03 10.02 +5.1 6.2 true 40 567890123456780 8 col07 2012-10-03 19:58:09 1234-09-03 0.00 PREHOOK: query: create external table tbl_ice_mixed_parted ( a int, b string @@ -940,8 +940,8 @@ STAGE PLANS: Map Vectorization: enabled: true enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true - inputFormatFeatureSupport: [] - featureSupportInUse: [] + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] inputFileFormats: org.apache.iceberg.mr.hive.HiveIcebergInputFormat allNative: false usesVectorUDFAdaptor: false diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_multitable.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_multitable.q.out index 7ec8f5c23b97..7728fb532cc9 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_multitable.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_multitable.q.out @@ -52,14 +52,6 @@ POSTHOOK: Input: _dummy_database@_dummy_table POSTHOOK: Output: default@orders POSTHOOK: Lineage: orders.o_custkey SCRIPT [] POSTHOOK: Lineage: orders.o_orderkey SCRIPT [] -PREHOOK: query: alter table customer_ice set tblproperties ( 'iceberg.orc.files.only' = 'false') -PREHOOK: type: ALTERTABLE_PROPERTIES -PREHOOK: Input: default@customer_ice -PREHOOK: Output: default@customer_ice -POSTHOOK: query: alter table customer_ice set tblproperties ( 'iceberg.orc.files.only' = 'false') -POSTHOOK: type: ALTERTABLE_PROPERTIES -POSTHOOK: Input: default@customer_ice -POSTHOOK: Output: default@customer_ice PREHOOK: query: select sum(1 - l_discount) as revenue FROM customer_ice, orders, lineitem WHERE c_custkey = o_custkey and l_orderkey = o_orderkey limit 20 @@ -112,45 +104,3 @@ POSTHOOK: Input: default@lineitem_ice POSTHOOK: Input: default@orders #### A masked pattern was here #### -99.20 -PREHOOK: query: alter table customer_ice set tblproperties ( 'iceberg.orc.files.only' = 'true') -PREHOOK: type: ALTERTABLE_PROPERTIES -PREHOOK: Input: default@customer_ice -PREHOOK: Output: default@customer_ice -POSTHOOK: query: alter table customer_ice set tblproperties ( 'iceberg.orc.files.only' = 'true') -POSTHOOK: type: ALTERTABLE_PROPERTIES -POSTHOOK: Input: default@customer_ice -POSTHOOK: Output: default@customer_ice -PREHOOK: query: select sum(1 - l_discount) as revenue -FROM customer_ice, orders, lineitem -WHERE c_custkey = o_custkey and l_orderkey = o_orderkey limit 20 -PREHOOK: type: QUERY -PREHOOK: Input: default@customer_ice -PREHOOK: Input: default@lineitem -PREHOOK: Input: default@orders -#### A masked pattern was here #### -POSTHOOK: query: select sum(1 - l_discount) as revenue -FROM customer_ice, orders, lineitem -WHERE c_custkey = o_custkey and l_orderkey = o_orderkey limit 20 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@customer_ice -POSTHOOK: Input: default@lineitem -POSTHOOK: Input: default@orders -#### A masked pattern was here #### --99.20 -PREHOOK: query: select sum(1 - l_discount) as revenue -FROM customer_ice, orders, lineitem_ice -WHERE c_custkey = o_custkey and l_orderkey = o_orderkey limit 20 -PREHOOK: type: QUERY -PREHOOK: Input: default@customer_ice -PREHOOK: Input: default@lineitem_ice -PREHOOK: Input: default@orders -#### A masked pattern was here #### -POSTHOOK: query: select sum(1 - l_discount) as revenue -FROM customer_ice, orders, lineitem_ice -WHERE c_custkey = o_custkey and l_orderkey = o_orderkey limit 20 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@customer_ice -POSTHOOK: Input: default@lineitem_ice -POSTHOOK: Input: default@orders -#### A masked pattern was here #### --99.20 diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out index 2feda580b67a..e75389786c28 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read_parquet.q.out @@ -150,8 +150,8 @@ STAGE PLANS: Map Vectorization: enabled: true enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true - inputFormatFeatureSupport: [] - featureSupportInUse: [] + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] inputFileFormats: org.apache.iceberg.mr.hive.HiveIcebergInputFormat allNative: false usesVectorUDFAdaptor: false @@ -348,7 +348,7 @@ STAGE PLANS: Map Operator Tree: TableScan Vectorization: native: true - vectorizationSchemaColumns: [0:t_float:float, 1:t_double:double, 2:t_boolean:boolean, 3:t_int:int, 4:t_bigint:bigint, 5:t_binary:binary, 6:t_string:string, 7:t_timestamp:timestamp, 8:t_date:date, 9:t_decimal:decimal(4,2), 10:PARTITION__SPEC__ID:int, 11:PARTITION__HASH:bigint, 12:FILE__PATH:string, 13:ROW__POSITION:bigint, 14:PARTITION__PROJECTION:string] + vectorizationSchemaColumns: [0:t_float:float, 1:t_double:double, 2:t_boolean:boolean, 3:t_int:int, 4:t_bigint:bigint, 5:t_binary:binary, 6:t_string:string, 7:t_timestamp:timestamp, 8:t_date:date, 9:t_decimal:decimal(4,2)/DECIMAL_64, 10:PARTITION__SPEC__ID:int, 11:PARTITION__HASH:bigint, 12:FILE__PATH:string, 13:ROW__POSITION:bigint, 14:PARTITION__PROJECTION:string] Select Vectorization: className: VectorSelectOperator native: true @@ -357,7 +357,7 @@ STAGE PLANS: aggregators: VectorUDAFMaxDouble(col 0:float) -> float className: VectorGroupByOperator groupByMode: HASH - keyExpressions: col 1:double, col 2:boolean, col 3:int, col 4:bigint, col 5:binary, col 6:string, col 7:timestamp, col 8:date, col 9:decimal(4,2) + keyExpressions: col 1:double, col 2:boolean, col 3:int, col 4:bigint, col 5:binary, col 6:string, col 7:timestamp, col 8:date, ConvertDecimal64ToDecimal(col 9:decimal(4,2)/DECIMAL_64) -> 15:decimal(4,2) native: false vectorProcessingMode: HASH projectedOutputColumnNums: [0] @@ -372,8 +372,8 @@ STAGE PLANS: Map Vectorization: enabled: true enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true - inputFormatFeatureSupport: [] - featureSupportInUse: [] + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] inputFileFormats: org.apache.iceberg.mr.hive.HiveIcebergInputFormat allNative: false usesVectorUDFAdaptor: false @@ -381,9 +381,9 @@ STAGE PLANS: rowBatchContext: dataColumnCount: 10 includeColumns: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - dataColumns: t_float:float, t_double:double, t_boolean:boolean, t_int:int, t_bigint:bigint, t_binary:binary, t_string:string, t_timestamp:timestamp, t_date:date, t_decimal:decimal(4,2) + dataColumns: t_float:float, t_double:double, t_boolean:boolean, t_int:int, t_bigint:bigint, t_binary:binary, t_string:string, t_timestamp:timestamp, t_date:date, t_decimal:decimal(4,2)/DECIMAL_64 partitionColumnCount: 0 - scratchColumnTypeNames: [] + scratchColumnTypeNames: [decimal(4,2)] Reducer 2 Execution mode: vectorized, llap Reduce Vectorization: @@ -429,7 +429,7 @@ POSTHOOK: query: select max(t_float), t_double, t_boolean, t_int, t_bigint, t_bi POSTHOOK: type: QUERY POSTHOOK: Input: default@tbl_ice_parquet_all_types #### A masked pattern was here #### -1.1 1.2 false 4 567890123456789 6 col7 2012-10-03 19:58:08 1234-09-02 10.01 +1.1 1.2 false 4 567890123456789 6 col7 2012-10-03 19:58:08 1234-09-02 0.00 PREHOOK: query: create external table tbl_ice_parquet_parted ( a int, b string @@ -582,8 +582,8 @@ STAGE PLANS: Map Vectorization: enabled: true enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true - inputFormatFeatureSupport: [] - featureSupportInUse: [] + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] inputFileFormats: org.apache.iceberg.mr.hive.HiveIcebergInputFormat allNative: false usesVectorUDFAdaptor: false diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index 2a3bccb6d9a5..ce0065a3ad6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -115,6 +115,6 @@ public boolean validateInput(FileSystem fs, HiveConf conf, List file @Override public VectorizedSupport.Support[] getSupportedFeatures() { - return null; + return new VectorizedSupport.Support[] { VectorizedSupport.Support.DECIMAL_64 }; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index c25475733ee4..f14ebe4ee6c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -16,12 +16,14 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.common.type.CalendarUtils; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -29,6 +31,7 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import java.io.IOException; @@ -59,6 +62,9 @@ public VectorizedPrimitiveColumnReader( legacyConversionEnabled, type, hiveType); } + // Reused for the byte-array-backed Decimal64 path (serialize64), avoids per-row allocation. + private final HiveDecimalWritable scratchDecimal = new HiveDecimalWritable(); + @Override public void readBatch( int total, @@ -136,7 +142,11 @@ private void readBatchHelper( readFloats(num, (DoubleColumnVector) column, rowId); break; case DECIMAL: - readDecimal(num, (DecimalColumnVector) column, rowId); + if (column instanceof Decimal64ColumnVector) { + readDecimal64(num, (Decimal64ColumnVector) column, rowId); + } else { + readDecimal(num, (DecimalColumnVector) column, rowId); + } break; case TIMESTAMP: readTimestamp(num, (TimestampColumnVector) column, rowId); @@ -153,10 +163,7 @@ private static void setNullValue(ColumnVector c, int rowId) { c.noNulls = false; } - private void readDictionaryIDs( - int total, - LongColumnVector c, - int rowId) throws IOException { + private void readDictionaryIDs(int total, LongColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -172,10 +179,7 @@ private void readDictionaryIDs( } } - private void readIntegers( - int total, - LongColumnVector c, - int rowId) throws IOException { + private void readIntegers(int total, LongColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -196,10 +200,7 @@ private void readIntegers( } } - private void readSmallInts( - int total, - LongColumnVector c, - int rowId) throws IOException { + private void readSmallInts(int total, LongColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -220,10 +221,7 @@ private void readSmallInts( } } - private void readTinyInts( - int total, - LongColumnVector c, - int rowId) throws IOException { + private void readTinyInts(int total, LongColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -244,10 +242,7 @@ private void readTinyInts( } } - private void readDoubles( - int total, - DoubleColumnVector c, - int rowId) throws IOException { + private void readDoubles(int total, DoubleColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -268,10 +263,7 @@ private void readDoubles( } } - private void readBooleans( - int total, - LongColumnVector c, - int rowId) throws IOException { + private void readBooleans(int total, LongColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -287,10 +279,7 @@ private void readBooleans( } } - private void readLongs( - int total, - LongColumnVector c, - int rowId) throws IOException { + private void readLongs(int total, LongColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -311,10 +300,7 @@ private void readLongs( } } - private void readFloats( - int total, - DoubleColumnVector c, - int rowId) throws IOException { + private void readFloats(int total, DoubleColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -335,17 +321,9 @@ private void readFloats( } } - private void readDecimal( - int total, - DecimalColumnVector c, - int rowId) throws IOException { - - DecimalLogicalTypeAnnotation decimalLogicalType = null; - if (type.getLogicalTypeAnnotation() instanceof DecimalLogicalTypeAnnotation) { - decimalLogicalType = (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); - } - byte[] decimalData = null; - fillDecimalPrecisionScale(decimalLogicalType, c); + private void readDecimal(int total, DecimalColumnVector c, int rowId) { + byte[] decimalData; + fillDecimalPrecisionScale(c); int left = total; while (left > 0) { @@ -367,10 +345,7 @@ private void readDecimal( } } - private void readString( - int total, - BytesColumnVector c, - int rowId) throws IOException { + private void readString(int total, BytesColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -387,10 +362,7 @@ private void readString( } } - private void readChar( - int total, - BytesColumnVector c, - int rowId) throws IOException { + private void readChar(int total, BytesColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -407,10 +379,7 @@ private void readChar( } } - private void readVarchar( - int total, - BytesColumnVector c, - int rowId) throws IOException { + private void readVarchar(int total, BytesColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -427,10 +396,7 @@ private void readVarchar( } } - private void readBinaries( - int total, - BytesColumnVector c, - int rowId) throws IOException { + private void readBinaries(int total, BytesColumnVector c, int rowId) { int left = total; while (left > 0) { readRepetitionAndDefinitionLevels(); @@ -447,10 +413,7 @@ private void readBinaries( } } - private void readDate( - int total, - DateColumnVector c, - int rowId) throws IOException { + private void readDate(int total, DateColumnVector c, int rowId) { c.setUsingProlepticCalendar(true); int left = total; while (left > 0) { @@ -651,14 +614,20 @@ private void decodeDictionaryIds( } break; case DECIMAL: - DecimalLogicalTypeAnnotation decimalLogicalType = null; - if (type.getLogicalTypeAnnotation() instanceof DecimalLogicalTypeAnnotation) { - decimalLogicalType = (DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation(); + if (column instanceof Decimal64ColumnVector dec64) { + fillDecimal64PrecisionScale(dec64); + PrimitiveTypeName dictPhysical = type.asPrimitiveType().getPrimitiveTypeName(); + for (int i = rowId; i < rowId + num; ++i) { + if (!column.isNull[i]) { + dec64.vector[i] = readUnscaledLongFromDict(dictPhysical, (int) dictionaryIds.vector[i], dec64.scale); + } + } + break; } DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column); - byte[] decimalData = null; + byte[] decimalData; - fillDecimalPrecisionScale(decimalLogicalType, decimalColumnVector); + fillDecimalPrecisionScale(decimalColumnVector); for (int i = rowId; i < rowId + num; ++i) { if (!column.isNull[i]) { @@ -686,27 +655,80 @@ private void decodeDictionaryIds( } } + private void fillDecimalPrecisionScale(DecimalColumnVector c) { + short[] ps = getDecimalPrecisionScale(); + c.precision = ps[0]; + c.scale = ps[1]; + } + + private void fillDecimal64PrecisionScale(Decimal64ColumnVector c) { + short[] ps = getDecimalPrecisionScale(); + c.precision = ps[0]; + c.scale = ps[1]; + } + /** - * The decimal precision and scale is filled into decimalColumnVector. If the data in - * Parquet is in decimal, the precision and scale will come in from decimalLogicalType. If parquet - * is not in decimal, then this call is made because HMS shows the type as decimal. So, the - * precision and scale are picked from hiveType. - * - * @param decimalLogicalType - * @param decimalColumnVector + * Precision/scale for this decimal column: from the Parquet decimal logical type when present, + * otherwise from the Hive type (Parquet stores it as a non-decimal physical type but HMS reports + * decimal). */ - private void fillDecimalPrecisionScale(DecimalLogicalTypeAnnotation decimalLogicalType, - DecimalColumnVector decimalColumnVector) { - if (decimalLogicalType != null) { - decimalColumnVector.precision = (short) decimalLogicalType.getPrecision(); - decimalColumnVector.scale = (short) decimalLogicalType.getScale(); + private short[] getDecimalPrecisionScale() { + if (type.getLogicalTypeAnnotation() instanceof DecimalLogicalTypeAnnotation d) { + return new short[] { (short) d.getPrecision(), (short) d.getScale() }; } else if (TypeInfoUtils.getBaseName(hiveType.getTypeName()) .equalsIgnoreCase(serdeConstants.DECIMAL_TYPE_NAME)) { - decimalColumnVector.precision = (short) ((DecimalTypeInfo) hiveType).getPrecision(); - decimalColumnVector.scale = (short) ((DecimalTypeInfo) hiveType).getScale(); - } else { - throw new UnsupportedOperationException( - "The underlying Parquet type cannot be converted to Hive Decimal type: " + type); + DecimalTypeInfo dti = (DecimalTypeInfo) hiveType; + return new short[] { (short) dti.getPrecision(), (short) dti.getScale() }; + } + throw new UnsupportedOperationException( + "The underlying Parquet type cannot be converted to Hive Decimal type: " + type); + } + + /** + * Decimal64 fast path: read the unscaled value straight into the long-backed vector instead of + * materializing a HiveDecimal per row. Only reached for decimal columns the vectorizer tagged + * DECIMAL_64 (precision <= 18); higher precision still uses {@link #readDecimal}. + */ + private void readDecimal64(int total, Decimal64ColumnVector c, int rowId) { + fillDecimal64PrecisionScale(c); + PrimitiveTypeName physical = type.asPrimitiveType().getPrimitiveTypeName(); + int left = total; + while (left > 0) { + readRepetitionAndDefinitionLevels(); + if (definitionLevel >= maxDefLevel) { + c.vector[rowId] = readUnscaledLong(physical, c.scale); + c.isNull[rowId] = false; + c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]); + } else { + setNullValue(c, rowId); + } + rowId++; + left--; } } + + // INT32/INT64-backed decimals already give the unscaled value via the Parquet reader; for the + // (rare) byte-array-backed case reuse HiveDecimalWritable.serialize64 -- the same encoding + // Decimal64ColumnVector.set uses -- rather than decoding the bytes by hand. + private long readUnscaledLong(PrimitiveTypeName physical, short scale) { + return switch (physical) { + case INT32 -> dataColumn.readInteger(); + case INT64 -> dataColumn.readLong(); + default -> { + scratchDecimal.set(dataColumn.readDecimal(), scale); + yield scratchDecimal.serialize64(scale); + } + }; + } + + private long readUnscaledLongFromDict(PrimitiveTypeName physical, int dictId, short scale) { + return switch (physical) { + case INT32 -> dictionary.readInteger(dictId); + case INT64 -> dictionary.readLong(dictId); + default -> { + scratchDecimal.set(dictionary.readDecimal(dictId), scale); + yield scratchDecimal.serialize64(scale); + } + }; + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 0a0867fff9f1..d5081d5f3b7a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -119,6 +119,11 @@ public void decimalRead() throws Exception { stringReadDecimal(isDictionaryEncoding); } + @Test + public void testDecimal64Read() throws Exception { + decimal64Read(isDictionaryEncoding); + } + @Test public void verifyBatchOffsets() throws Exception { super.verifyBatchOffsets(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java index 32d27d95477f..468db20f5f92 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java @@ -96,4 +96,9 @@ public void decimalRead() throws Exception { decimalRead(isDictionaryEncoding); stringReadDecimal(isDictionaryEncoding); } + + @Test + public void testDecimal64Read() throws Exception { + decimal64Read(isDictionaryEncoding); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java index c9c858932550..f03c7d23e2be 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/VectorizedColumnReaderTestBase.java @@ -25,8 +25,10 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.Timestamp; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; @@ -258,12 +260,18 @@ protected static boolean isNull(int index) { public static VectorizedParquetRecordReader createTestParquetReader(String schemaString, Configuration conf) throws IOException, InterruptedException, HiveException { + return createTestParquetReader(schemaString, conf, null); + } + + public static VectorizedParquetRecordReader createTestParquetReader(String schemaString, Configuration conf, + DataTypePhysicalVariation[] rowDataTypePhysicalVariations) + throws IOException, InterruptedException, HiveException { conf.set(PARQUET_READ_SCHEMA, schemaString); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); Job vectorJob = new Job(conf, "read vector"); ParquetInputFormat.setInputPaths(vectorJob, file); - initialVectorizedRowBatchCtx(conf); + initialVectorizedRowBatchCtx(conf, rowDataTypePhysicalVariations); return new VectorizedParquetRecordReader(getFileSplit(vectorJob), new JobConf(conf)); } @@ -344,14 +352,63 @@ protected static void writeData(ParquetWriter writer, boolean isDictionar } protected static void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException { + initialVectorizedRowBatchCtx(conf, null); + } + + protected static void initialVectorizedRowBatchCtx(Configuration conf, + DataTypePhysicalVariation[] rowDataTypePhysicalVariations) throws HiveException { MapWork mapWork = new MapWork(); VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(); rbCtx.init(createStructObjectInspector(conf), new String[0]); + if (rowDataTypePhysicalVariations != null) { + rbCtx.setRowDataTypePhysicalVariations(rowDataTypePhysicalVariations); + } mapWork.setVectorMode(true); mapWork.setVectorizedRowBatchCtx(rbCtx); Utilities.setMapWork(conf, mapWork); } + /** + * Verifies the Decimal64 read path: when the decimal column is tagged DECIMAL_64 (as the + * vectorizer does once {@code MapredParquetInputFormat} advertises it), the reader must fill a + * {@link Decimal64ColumnVector} (long-backed) with the correct unscaled values. + */ + protected void decimal64Read(boolean isDictionaryEncoding) throws Exception { + Configuration readerConf = new Configuration(); + readerConf.set(IOConstants.COLUMNS, "value"); + readerConf.set(IOConstants.COLUMNS_TYPES, "decimal(5,2)"); + readerConf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + readerConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = createTestParquetReader( + "message hive_schema { required value (DECIMAL(5,2));}", readerConf, + new DataTypePhysicalVariation[] { DataTypePhysicalVariation.DECIMAL_64 }); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + assertTrue("expected Decimal64ColumnVector but got " + previous.cols[0].getClass().getSimpleName(), + previous.cols[0] instanceof Decimal64ColumnVector); + Decimal64ColumnVector vector = (Decimal64ColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + assertEquals((short) 5, vector.precision); + assertEquals((short) 2, vector.scale); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + long expected = + new HiveDecimalWritable(getDecimal(isDictionaryEncoding, c).setScale(2)).serialize64(2); + assertEquals("Check failed at pos " + c, expected, vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + private static StructObjectInspector createStructObjectInspector(Configuration conf) { // Create row related objects String columnNames = conf.get(IOConstants.COLUMNS); diff --git a/ql/src/test/queries/clientpositive/parquet_decimal64.q b/ql/src/test/queries/clientpositive/parquet_decimal64.q new file mode 100644 index 000000000000..ce06dee45c54 --- /dev/null +++ b/ql/src/test/queries/clientpositive/parquet_decimal64.q @@ -0,0 +1,15 @@ +--! qt:replace:/(\s+Statistics: Num rows: \d+)/#Masked#/ +set hive.vectorized.execution.enabled=true; +set hive.explain.user=false; + +drop table if exists dec64_parquet; + +create table dec64_parquet (k int, d decimal(7,2)) stored as parquet; +insert into dec64_parquet values + (1, 1.10), (1, 2.20), (2, 3.30), (2, 4.40), (3, 5.50), (3, cast(null as decimal(7,2))); + +-- Verifies that the Parquet vectorized reader engages the DECIMAL_64 path: +explain vectorization detail +select k, sum(d) from dec64_parquet group by k; + +select k, sum(d) from dec64_parquet group by k order by k; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/llap/parquet_decimal64.q.out b/ql/src/test/results/clientpositive/llap/parquet_decimal64.q.out new file mode 100644 index 000000000000..cb52b3c0393d --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/parquet_decimal64.q.out @@ -0,0 +1,171 @@ +PREHOOK: query: drop table if exists dec64_parquet +PREHOOK: type: DROPTABLE +PREHOOK: Output: database:default +POSTHOOK: query: drop table if exists dec64_parquet +POSTHOOK: type: DROPTABLE +POSTHOOK: Output: database:default +PREHOOK: query: create table dec64_parquet (k int, d decimal(7,2)) stored as parquet +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@dec64_parquet +POSTHOOK: query: create table dec64_parquet (k int, d decimal(7,2)) stored as parquet +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@dec64_parquet +PREHOOK: query: insert into dec64_parquet values + (1, 1.10), (1, 2.20), (2, 3.30), (2, 4.40), (3, 5.50), (3, cast(null as decimal(7,2))) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@dec64_parquet +POSTHOOK: query: insert into dec64_parquet values + (1, 1.10), (1, 2.20), (2, 3.30), (2, 4.40), (3, 5.50), (3, cast(null as decimal(7,2))) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@dec64_parquet +POSTHOOK: Lineage: dec64_parquet.d SCRIPT [] +POSTHOOK: Lineage: dec64_parquet.k SCRIPT [] +PREHOOK: query: explain vectorization detail +select k, sum(d) from dec64_parquet group by k +PREHOOK: type: QUERY +PREHOOK: Input: default@dec64_parquet +#### A masked pattern was here #### +POSTHOOK: query: explain vectorization detail +select k, sum(d) from dec64_parquet group by k +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dec64_parquet +#### A masked pattern was here #### +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: dec64_parquet +#Masked# Data size: 696 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:k:int, 1:d:decimal(7,2)/DECIMAL_64, 2:ROW__ID:struct, 3:ROW__IS__DELETED:boolean] + Select Operator + expressions: k (type: int), d (type: decimal(7,2)) + outputColumnNames: k, d + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0, 1] +#Masked# Data size: 696 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(d) + Group By Vectorization: + aggregators: VectorUDAFSumDecimal64(col 1:decimal(7,2)/DECIMAL_64) -> decimal(17,2)/DECIMAL_64 + className: VectorGroupByOperator + groupByMode: HASH + keyExpressions: col 0:int + native: false + vectorProcessingMode: HASH + projectedOutputColumnNums: [0] + keys: k (type: int) + minReductionHashAggr: 0.5 + mode: hash + outputColumnNames: _col0, _col1 +#Masked# Data size: 348 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Reduce Sink Vectorization: + className: VectorReduceSinkLongOperator + keyColumns: 0:int + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + valueColumns: 1:decimal(17,2) +#Masked# Data size: 348 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: decimal(17,2)) + Execution mode: vectorized, llap + LLAP IO: all inputs (cache only) + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] + inputFileFormats: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 2 + includeColumns: [0, 1] + dataColumns: k:int, d:decimal(7,2)/DECIMAL_64 + partitionColumnCount: 0 + scratchColumnTypeNames: [] + Reducer 2 + Execution mode: vectorized, llap + Reduce Vectorization: + enabled: true + enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez] IS true + reduceColumnNullOrder: z + reduceColumnSortOrder: + + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 2 + dataColumns: KEY._col0:int, VALUE._col0:decimal(17,2)/DECIMAL_64 + partitionColumnCount: 0 + scratchColumnTypeNames: [] + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + Group By Vectorization: + aggregators: VectorUDAFSumDecimal64(col 1:decimal(17,2)/DECIMAL_64) -> decimal(17,2)/DECIMAL_64 + className: VectorGroupByOperator + groupByMode: MERGEPARTIAL + keyExpressions: col 0:int + native: false + vectorProcessingMode: MERGE_PARTIAL + projectedOutputColumnNums: [0] + keys: KEY._col0 (type: int) + mode: mergepartial + outputColumnNames: _col0, _col1 +#Masked# Data size: 348 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + File Sink Vectorization: + className: VectorFileSinkOperator + native: false +#Masked# Data size: 348 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select k, sum(d) from dec64_parquet group by k order by k +PREHOOK: type: QUERY +PREHOOK: Input: default@dec64_parquet +#### A masked pattern was here #### +POSTHOOK: query: select k, sum(d) from dec64_parquet group by k order by k +POSTHOOK: type: QUERY +POSTHOOK: Input: default@dec64_parquet +#### A masked pattern was here #### +1 3.30 +2 7.70 +3 5.50