Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 130 additions & 11 deletions cpp/src/arrow/dataset/file_orc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ class OrcScanTask {
struct Impl {
static Result<RecordBatchIterator> Make(const FileSource& source,
const FileFormat& format,
const ScanOptions& scan_options) {
const ScanOptions& scan_options,
const std::vector<int>& stripe_ids) {
ARROW_ASSIGN_OR_RAISE(
auto reader,
OpenORCReader(source, std::make_shared<ScanOptions>(scan_options)));

auto materialized_fields = scan_options.MaterializedFields();
// filter out virtual columns
std::vector<std::string> included_fields;
ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema());
for (const auto& ref : materialized_fields) {
Expand All @@ -85,26 +85,89 @@ class OrcScanTask {
included_fields.push_back(schema->field(match.indices()[0])->name());
}

std::shared_ptr<RecordBatchReader> record_batch_reader;
ARROW_ASSIGN_OR_RAISE(
record_batch_reader,
reader->GetRecordBatchReader(scan_options.batch_size, included_fields));
if (stripe_ids.empty()) {
std::shared_ptr<RecordBatchReader> record_batch_reader;
ARROW_ASSIGN_OR_RAISE(
record_batch_reader,
reader->GetRecordBatchReader(scan_options.batch_size, included_fields));

return RecordBatchIterator(Impl{std::move(record_batch_reader)});
}

return RecordBatchIterator(Impl{std::move(record_batch_reader)});
std::vector<int> included_indices;
for (const auto& field_name : included_fields) {
int idx = schema->GetFieldIndex(field_name);
if (idx >= 0) {
included_indices.push_back(idx);
}
}

return RecordBatchIterator(
Impl{std::move(reader), stripe_ids, std::move(included_indices),
scan_options.batch_size});
}

explicit Impl(std::shared_ptr<RecordBatchReader> reader)
: record_batch_reader_(std::move(reader)), stripe_mode_(false) {}

Impl(std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader,
std::vector<int> stripe_ids, std::vector<int> included_indices,
int64_t batch_size)
: orc_reader_(std::move(reader)),
stripe_ids_(std::move(stripe_ids)),
included_indices_(std::move(included_indices)),
batch_size_(batch_size),
current_stripe_idx_(0),
stripe_mode_(true) {}

Result<std::shared_ptr<RecordBatch>> Next() {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(record_batch_reader_->ReadNext(&batch));
return batch;
if (!stripe_mode_) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(record_batch_reader_->ReadNext(&batch));
return batch;
}

while (true) {
if (record_batch_reader_) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(record_batch_reader_->ReadNext(&batch));
if (batch) {
return batch;
}
record_batch_reader_.reset();
current_stripe_idx_++;
}

if (current_stripe_idx_ >= stripe_ids_.size()) {
return nullptr;
}

int64_t stripe_id = stripe_ids_[current_stripe_idx_];
auto stripe_info = orc_reader_->GetStripeInformation(stripe_id);
RETURN_NOT_OK(orc_reader_->Seek(stripe_info.first_row_id));
ARROW_ASSIGN_OR_RAISE(
record_batch_reader_,
orc_reader_->NextStripeReader(batch_size_, included_indices_));
}
}

std::shared_ptr<RecordBatchReader> record_batch_reader_;
std::unique_ptr<arrow::adapters::orc::ORCFileReader> orc_reader_;
std::vector<int> stripe_ids_;
std::vector<int> included_indices_;
int64_t batch_size_;
size_t current_stripe_idx_;
bool stripe_mode_;
};

std::vector<int> stripe_ids;
if (auto* orc_fragment = dynamic_cast<OrcFileFragment*>(fragment_.get())) {
stripe_ids = orc_fragment->stripe_ids();
}

return Impl::Make(fragment_->source(),
*checked_pointer_cast<FileFragment>(fragment_)->format(),
*options_);
*options_, stripe_ids);
}

private:
Expand Down Expand Up @@ -208,6 +271,15 @@ Future<std::optional<int64_t>> OrcFileFormat::CountRows(
return DeferNotOk(options->io_context.executor()->Submit(
[self, file]() -> Result<std::optional<int64_t>> {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(file->source()));
auto* orc_fragment = dynamic_cast<OrcFileFragment*>(file.get());
if (orc_fragment && !orc_fragment->stripe_ids().empty()) {
int64_t count = 0;
for (int stripe_id : orc_fragment->stripe_ids()) {
auto stripe_info = reader->GetStripeInformation(stripe_id);
count += stripe_info.num_rows;
}
return count;
}
return reader->NumberOfRows();
}));
}
Expand All @@ -229,5 +301,52 @@ Result<std::shared_ptr<FileWriter>> OrcFileFormat::MakeWriter(
return Status::NotImplemented("ORC writer not yet implemented.");
}

// OrcFileFragment

OrcFileFragment::OrcFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema,
std::optional<std::vector<int>> stripe_ids)
: FileFragment(std::move(source), std::move(format),
std::move(partition_expression), std::move(physical_schema)),
stripe_ids_(std::move(stripe_ids)) {}

Result<std::shared_ptr<Fragment>> OrcFileFragment::Subset(std::vector<int> stripe_ids) {
return std::shared_ptr<Fragment>(
new OrcFileFragment(source_, format_,
partition_expression(), physical_schema_,
std::move(stripe_ids)));
}

Result<std::shared_ptr<FileFragment>> OrcFileFormat::MakeFragment(
FileSource source, compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema) {
return std::shared_ptr<FileFragment>(
new OrcFileFragment(std::move(source), shared_from_this(),
std::move(partition_expression),
std::move(physical_schema), std::nullopt));
}

Result<std::shared_ptr<OrcFileFragment>> OrcFileFormat::MakeFragment(
FileSource source, compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema, std::vector<int> stripe_ids) {
if (!stripe_ids.empty()) {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(source));
int64_t num_stripes = reader->NumberOfStripes();
for (int stripe_id : stripe_ids) {
if (stripe_id < 0 || stripe_id >= num_stripes) {
return Status::IndexError("Stripe ID ", stripe_id,
" is out of range. File has ", num_stripes,
" stripe(s), valid range is [0, ",
num_stripes - 1, "]");
}
}
}
return std::shared_ptr<OrcFileFragment>(
new OrcFileFragment(std::move(source), shared_from_this(),
std::move(partition_expression),
std::move(physical_schema), std::move(stripe_ids)));
}

} // namespace dataset
} // namespace arrow
43 changes: 43 additions & 0 deletions cpp/src/arrow/dataset/file_orc.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#pragma once

#include <memory>
#include <optional>
#include <string>
#include <vector>

#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
Expand All @@ -37,6 +39,8 @@ namespace dataset {

constexpr char kOrcTypeName[] = "orc";

class OrcFileFragment;

/// \brief A FileFormat implementation that reads from and writes to ORC files
class ARROW_DS_EXPORT OrcFileFormat : public FileFormat {
public:
Expand Down Expand Up @@ -67,6 +71,45 @@ class ARROW_DS_EXPORT OrcFileFormat : public FileFormat {
fs::FileLocator destination_locator) const override;

std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;

using FileFormat::MakeFragment;

Result<std::shared_ptr<FileFragment>> MakeFragment(
FileSource source, compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema) override;

Result<std::shared_ptr<OrcFileFragment>> MakeFragment(
FileSource source, compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema, std::vector<int> stripe_ids);
};

/// \brief A FileFragment with ORC-specific logic for stripe-level subsetting.
///
/// OrcFileFragment provides the ability to scan ORC files at stripe granularity,
/// enabling parallel processing of sub-file splits. The caller can provide an
/// optional list of selected stripe IDs to limit the scan to specific stripes.
class ARROW_DS_EXPORT OrcFileFragment : public FileFragment {
public:
/// \brief Return the stripe IDs selected by this fragment.
/// Empty vector means all stripes.
const std::vector<int>& stripe_ids() const {
if (stripe_ids_) return *stripe_ids_;
static std::vector<int> empty;
return empty;
}

/// \brief Return fragment which selects a subset of this fragment's stripes.
Result<std::shared_ptr<Fragment>> Subset(std::vector<int> stripe_ids);

private:
OrcFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema,
std::optional<std::vector<int>> stripe_ids);

std::optional<std::vector<int>> stripe_ids_;

friend class OrcFileFormat;
};

/// @}
Expand Down
Loading