Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,51 +14,55 @@
package org.apache.hadoop.hive.ql.io.parquet;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.BlobStorageUtils;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.FileMetadataCache;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.ThreadFactoryBuilder;


/**
*
* A Parquet InputFormat for Hive (with the deprecated package mapred)
*
* NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types
* are not currently supported. Removing the interface turns off vectorization.
* A Parquet InputFormat for Hive (with the deprecated package mapred).
*/
public class MapredParquetInputFormat extends FileInputFormat<NullWritable, ArrayWritable>
implements InputFormatChecker, VectorizedInputFormatInterface, LlapCacheOnlyInputFormatInterface {

private static final Logger LOG = LoggerFactory.getLogger(MapredParquetInputFormat.class);

private static ExecutorService threadPool;

private final ParquetInputFormat<ArrayWritable> realInput;

private final transient VectorizedParquetInputFormat vectorizedSelf;
Expand All @@ -72,6 +76,90 @@ protected MapredParquetInputFormat(final ParquetInputFormat<ArrayWritable> input
vectorizedSelf = new VectorizedParquetInputFormat();
}

/**
* On blob storage with multiple recursive input directories, list them in parallel instead of the
* default serial per-directory listing that dominates split generation. Listed files flow through
* the inherited {@link FileInputFormat#getSplits} unchanged; all other cases defer to the default.
*/
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException {
Path[] dirs = getInputPaths(job);
Comment thread
deniskuzZ marked this conversation as resolved.
// Only the recursive case (the Tez default) takes the parallel path; non-recursive listing has
// subtler sub-directory semantics, so defer to the default.
if (dirs.length <= 1
|| !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
|| !BlobStorageUtils.isBlobStorageFileSystem(job, dirs[0].getFileSystem(job))) {
return super.listStatus(job);
}
Comment thread
deniskuzZ marked this conversation as resolved.

long start = System.currentTimeMillis();
// List as the caller's end-user, not the pool threads' login user; FileSystem.get is UGI-keyed.
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

int numThreads = Math.max(2, HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
ExecutorService pool = getThreadPool(numThreads);
CompletionService<List<FileStatus>> completionService = new ExecutorCompletionService<>(pool);

Comment thread
deniskuzZ marked this conversation as resolved.
List<Future<List<FileStatus>>> pathFutures = new ArrayList<>(dirs.length);
List<FileStatus> files = new ArrayList<>();
try {
for (Path dir : dirs) {
pathFutures.add(completionService.submit(() -> listDirectory(job, dir, ugi)));
}
for (int resultsLeft = dirs.length; resultsLeft > 0; resultsLeft--) {
files.addAll(completionService.take().get());
}
} catch (InterruptedException e) {
cancelFutures(pathFutures);
Thread.currentThread().interrupt();
throw new IOException("Interrupted while listing input directories", e);

} catch (ExecutionException e) {
cancelFutures(pathFutures);
Throwable cause = e.getCause();

if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while listing input directories", cause);
}
if (cause instanceof IOException) {
throw (IOException) cause;
}
throw new IOException("Failed to list input directories", cause);
}

LOG.info("Parquet parallel listStatus: {} files from {} dirs in {} ms ({} threads)",
files.size(), dirs.length, System.currentTimeMillis() - start, numThreads);
return files.toArray(new FileStatus[0]);
}

private static List<FileStatus> listDirectory(JobConf job, Path dir, UserGroupInformation ugi)
throws IOException, InterruptedException {
return ugi.doAs((PrivilegedExceptionAction<List<FileStatus>>) () -> {
FileSystem dirFs = dir.getFileSystem(job);
List<FileStatus> dirFiles = new ArrayList<>();
FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true, 0, 0, 0, dir), dirFiles);
return dirFiles;
});
}

private static synchronized ExecutorService getThreadPool(int numThreads) {
if (threadPool == null) {
threadPool = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("PARQUET_GET_SPLITS #%d")
.build());
}
return threadPool;
}

private static <T> void cancelFutures(List<Future<T>> futures) {
for (Future<T> future : futures) {
future.cancel(true);
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> getRecordReader(
Expand Down
Loading