From 5feb8932c06bee2695e1abe69cccc7396619828d Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Thu, 4 Jun 2026 18:41:25 +0300 Subject: [PATCH 1/2] HIVE-29647: Parallelize Parquet split generation directory listing on blob storage --- .../io/parquet/MapredParquetInputFormat.java | 98 ++++++++++++++++--- 1 file changed, 82 insertions(+), 16 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index 2a3bccb6d9a5..e9accf68343a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -14,45 +14,46 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.BlobStorageUtils; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.FileMetadataCache; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport; -import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface; -import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.JobConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; - +import org.apache.hadoop.security.UserGroupInformation; import org.apache.parquet.hadoop.ParquetInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * - * A Parquet InputFormat for Hive (with the deprecated package mapred) - * - * NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types - * are not currently supported. Removing the interface turns off vectorization. + * A Parquet InputFormat for Hive (with the deprecated package mapred). */ public class MapredParquetInputFormat extends FileInputFormat implements InputFormatChecker, VectorizedInputFormatInterface, LlapCacheOnlyInputFormatInterface { @@ -72,6 +73,71 @@ protected MapredParquetInputFormat(final ParquetInputFormat input vectorizedSelf = new VectorizedParquetInputFormat(); } + /** + * On blob storage with multiple recursive input directories, list them in parallel instead of the + * default serial per-directory listing that dominates split generation. Listed files flow through + * the inherited {@link FileInputFormat#getSplits} unchanged; all other cases defer to the default. + */ + @Override + protected FileStatus[] listStatus(JobConf job) throws IOException { + Path[] dirs = getInputPaths(job); + // Only the recursive case (the Tez default) takes the parallel path; non-recursive listing has + // subtler sub-directory semantics, so defer to the default. + if (dirs.length <= 1 + || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false) + || !BlobStorageUtils.isBlobStorageFileSystem(job, dirs[0].getFileSystem(job))) { + return super.listStatus(job); + } + + long start = System.currentTimeMillis(); + // List as the caller's end-user, not the pool threads' login user; FileSystem.get is UGI-keyed. + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + int numThreads = Math.max(2, HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS)); + ExecutorService pool = newWorkerPool(numThreads); + CompletionService> completionService = new ExecutorCompletionService<>(pool); + + List files = new ArrayList<>(); + try { + for (Path dir : dirs) { + completionService.submit(() -> ugi.doAs( + (PrivilegedExceptionAction>) () -> { + FileSystem dirFs = dir.getFileSystem(job); + List dirFiles = new ArrayList<>(); + FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true, 0, 0, 0, dir), dirFiles); + return dirFiles; + })); + } + for (int resultsLeft = dirs.length; resultsLeft > 0; resultsLeft--) { + files.addAll(completionService.take().get()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while listing input directories", e); + + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + throw new IOException("Failed to list input directories", cause); + } finally { + pool.shutdownNow(); + } + + LOG.info("Parquet parallel listStatus: {} files from {} dirs in {} ms ({} threads)", + files.size(), dirs.length, System.currentTimeMillis() - start, numThreads); + return files.toArray(new FileStatus[0]); + } + + private static ExecutorService newWorkerPool(int numThreads) { + return Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("PARQUET_GET_SPLITS #%d") + .build()); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public org.apache.hadoop.mapred.RecordReader getRecordReader( From 09b3b9f674c30218741990300c729908b6e100cc Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Thu, 4 Jun 2026 19:39:07 +0300 Subject: [PATCH 2/2] review comments #1 --- .../io/parquet/MapredParquetInputFormat.java | 54 +++++++++++++------ 1 file changed, 38 insertions(+), 16 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index e9accf68343a..ba9e1ca54606 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -60,6 +61,8 @@ public class MapredParquetInputFormat extends FileInputFormat realInput; private final transient VectorizedParquetInputFormat vectorizedSelf; @@ -94,35 +97,35 @@ protected FileStatus[] listStatus(JobConf job) throws IOException { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); int numThreads = Math.max(2, HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS)); - ExecutorService pool = newWorkerPool(numThreads); + ExecutorService pool = getThreadPool(numThreads); CompletionService> completionService = new ExecutorCompletionService<>(pool); + List>> pathFutures = new ArrayList<>(dirs.length); List files = new ArrayList<>(); try { for (Path dir : dirs) { - completionService.submit(() -> ugi.doAs( - (PrivilegedExceptionAction>) () -> { - FileSystem dirFs = dir.getFileSystem(job); - List dirFiles = new ArrayList<>(); - FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true, 0, 0, 0, dir), dirFiles); - return dirFiles; - })); + pathFutures.add(completionService.submit(() -> listDirectory(job, dir, ugi))); } for (int resultsLeft = dirs.length; resultsLeft > 0; resultsLeft--) { files.addAll(completionService.take().get()); } } catch (InterruptedException e) { + cancelFutures(pathFutures); Thread.currentThread().interrupt(); throw new IOException("Interrupted while listing input directories", e); } catch (ExecutionException e) { + cancelFutures(pathFutures); Throwable cause = e.getCause(); + + if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while listing input directories", cause); + } if (cause instanceof IOException) { throw (IOException) cause; } throw new IOException("Failed to list input directories", cause); - } finally { - pool.shutdownNow(); } LOG.info("Parquet parallel listStatus: {} files from {} dirs in {} ms ({} threads)", @@ -130,12 +133,31 @@ protected FileStatus[] listStatus(JobConf job) throws IOException { return files.toArray(new FileStatus[0]); } - private static ExecutorService newWorkerPool(int numThreads) { - return Executors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("PARQUET_GET_SPLITS #%d") - .build()); + private static List listDirectory(JobConf job, Path dir, UserGroupInformation ugi) + throws IOException, InterruptedException { + return ugi.doAs((PrivilegedExceptionAction>) () -> { + FileSystem dirFs = dir.getFileSystem(job); + List dirFiles = new ArrayList<>(); + FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true, 0, 0, 0, dir), dirFiles); + return dirFiles; + }); + } + + private static synchronized ExecutorService getThreadPool(int numThreads) { + if (threadPool == null) { + threadPool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("PARQUET_GET_SPLITS #%d") + .build()); + } + return threadPool; + } + + private static void cancelFutures(List> futures) { + for (Future future : futures) { + future.cancel(true); + } } @SuppressWarnings({ "unchecked", "rawtypes" })