From ef254af84f591dc6ecf27902fe419673c1706ce6 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Sat, 14 Feb 2026 19:49:52 -0800 Subject: [PATCH 1/4] initial impl completed --- cpp/src/arrow/dataset/file_orc.cc | 154 ++++++++++++++++++-- cpp/src/arrow/dataset/file_orc.h | 41 ++++++ cpp/src/arrow/dataset/file_orc_test.cc | 188 +++++++++++++++++++++++++ 3 files changed, 370 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index 1393df57f9d..d5df9f79a5b 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -69,7 +69,8 @@ class OrcScanTask { struct Impl { static Result Make(const FileSource& source, const FileFormat& format, - const ScanOptions& scan_options) { + const ScanOptions& scan_options, + const std::vector& stripe_ids) { ARROW_ASSIGN_OR_RAISE( auto reader, OpenORCReader(source, std::make_shared(scan_options))); @@ -85,26 +86,97 @@ class OrcScanTask { included_fields.push_back(schema->field(match.indices()[0])->name()); } - std::shared_ptr record_batch_reader; - ARROW_ASSIGN_OR_RAISE( - record_batch_reader, - reader->GetRecordBatchReader(scan_options.batch_size, included_fields)); + // If stripe_ids is empty, read all stripes (backward compatible) + if (stripe_ids.empty()) { + std::shared_ptr record_batch_reader; + ARROW_ASSIGN_OR_RAISE( + record_batch_reader, + reader->GetRecordBatchReader(scan_options.batch_size, included_fields)); + + return RecordBatchIterator(Impl{std::move(record_batch_reader)}); + } - return RecordBatchIterator(Impl{std::move(record_batch_reader)}); + // Convert field names to indices using Schema::GetFieldIndex + std::vector included_indices; + for (const auto& field_name : included_fields) { + int idx = schema->GetFieldIndex(field_name); + if (idx >= 0) { + included_indices.push_back(idx); + } + } + + // Read only selected stripes + return RecordBatchIterator( + Impl{std::move(reader), stripe_ids, std::move(included_indices), + scan_options.batch_size}); } + // Constructor for full file read + explicit Impl(std::shared_ptr reader) + : record_batch_reader_(std::move(reader)), stripe_mode_(false) {} + + // Constructor for stripe-filtered read + Impl(std::unique_ptr reader, + std::vector stripe_ids, std::vector included_indices, + int64_t batch_size) + : orc_reader_(std::move(reader)), + stripe_ids_(std::move(stripe_ids)), + included_indices_(std::move(included_indices)), + batch_size_(batch_size), + current_stripe_idx_(0), + stripe_mode_(true) {} + Result> Next() { - std::shared_ptr batch; - RETURN_NOT_OK(record_batch_reader_->ReadNext(&batch)); - return batch; + if (!stripe_mode_) { + std::shared_ptr batch; + RETURN_NOT_OK(record_batch_reader_->ReadNext(&batch)); + return batch; + } + + // Stripe-filtered mode + while (true) { + if (record_batch_reader_) { + std::shared_ptr batch; + RETURN_NOT_OK(record_batch_reader_->ReadNext(&batch)); + if (batch) { + return batch; + } + record_batch_reader_.reset(); + current_stripe_idx_++; + } + + if (current_stripe_idx_ >= stripe_ids_.size()) { + return nullptr; + } + + // Seek to the next stripe and get a reader for it + int64_t stripe_id = stripe_ids_[current_stripe_idx_]; + // Get stripe information to find the first row of this stripe + auto stripe_info = orc_reader_->GetStripeInformation(stripe_id); + RETURN_NOT_OK(orc_reader_->Seek(stripe_info.first_row_id)); + ARROW_ASSIGN_OR_RAISE( + record_batch_reader_, + orc_reader_->NextStripeReader(batch_size_, included_indices_)); + } } std::shared_ptr record_batch_reader_; + std::unique_ptr orc_reader_; + std::vector stripe_ids_; + std::vector included_indices_; + int64_t batch_size_; + size_t current_stripe_idx_; + bool stripe_mode_; }; + std::vector stripe_ids; + if (auto* orc_fragment = dynamic_cast(fragment_.get())) { + stripe_ids = orc_fragment->stripe_ids(); + } + return Impl::Make(fragment_->source(), *checked_pointer_cast(fragment_)->format(), - *options_); + *options_, stripe_ids); } private: @@ -208,13 +280,20 @@ Future> OrcFileFormat::CountRows( return DeferNotOk(options->io_context.executor()->Submit( [self, file]() -> Result> { ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(file->source())); + auto* orc_fragment = dynamic_cast(file.get()); + if (orc_fragment && !orc_fragment->stripe_ids().empty()) { + int64_t count = 0; + for (int stripe_id : orc_fragment->stripe_ids()) { + auto stripe_info = reader->GetStripeInformation(stripe_id); + count += stripe_info.num_rows; + } + return count; + } return reader->NumberOfRows(); })); } -// // -// // OrcFileWriter, OrcFileWriteOptions -// // +// OrcFileWriter, OrcFileWriteOptions std::shared_ptr OrcFileFormat::DefaultWriteOptions() { // TODO (https://issues.apache.org/jira/browse/ARROW-13796) @@ -229,5 +308,54 @@ Result> OrcFileFormat::MakeWriter( return Status::NotImplemented("ORC writer not yet implemented."); } +// OrcFileFragment + +OrcFileFragment::OrcFileFragment(FileSource source, std::shared_ptr format, + compute::Expression partition_expression, + std::shared_ptr physical_schema, + std::optional> stripe_ids) + : FileFragment(std::move(source), std::move(format), + std::move(partition_expression), std::move(physical_schema)), + stripe_ids_(std::move(stripe_ids)) {} + +Result> OrcFileFragment::Subset(std::vector stripe_ids) { + return std::shared_ptr( + new OrcFileFragment(source_, format_, + partition_expression(), physical_schema_, + std::move(stripe_ids))); +} + +Result> OrcFileFormat::MakeFragment( + FileSource source, compute::Expression partition_expression, + std::shared_ptr physical_schema) { + // Return OrcFileFragment with no stripe filter (reads all stripes) + return std::shared_ptr( + new OrcFileFragment(std::move(source), shared_from_this(), + std::move(partition_expression), + std::move(physical_schema), std::nullopt)); +} + +Result> OrcFileFormat::MakeFragment( + FileSource source, compute::Expression partition_expression, + std::shared_ptr physical_schema, std::vector stripe_ids) { + // Validate stripe IDs + if (!stripe_ids.empty()) { + ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(source)); + int64_t num_stripes = reader->NumberOfStripes(); + for (int stripe_id : stripe_ids) { + if (stripe_id < 0 || stripe_id >= num_stripes) { + return Status::IndexError("Stripe ID ", stripe_id, + " is out of range. File has ", num_stripes, + " stripe(s), valid range is [0, ", + num_stripes - 1, "]"); + } + } + } + return std::shared_ptr( + new OrcFileFragment(std::move(source), shared_from_this(), + std::move(partition_expression), + std::move(physical_schema), std::move(stripe_ids))); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_orc.h b/cpp/src/arrow/dataset/file_orc.h index 5bfefd1e02b..8582587b9fa 100644 --- a/cpp/src/arrow/dataset/file_orc.h +++ b/cpp/src/arrow/dataset/file_orc.h @@ -20,7 +20,9 @@ #pragma once #include +#include #include +#include #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" @@ -67,6 +69,45 @@ class ARROW_DS_EXPORT OrcFileFormat : public FileFormat { fs::FileLocator destination_locator) const override; std::shared_ptr DefaultWriteOptions() override; + + using FileFormat::MakeFragment; + + Result> MakeFragment( + FileSource source, compute::Expression partition_expression, + std::shared_ptr physical_schema) override; + + Result> MakeFragment( + FileSource source, compute::Expression partition_expression, + std::shared_ptr physical_schema, std::vector stripe_ids); +}; + +/// \brief A FileFragment with ORC-specific logic for stripe-level subsetting. +/// +/// OrcFileFragment provides the ability to scan ORC files at stripe granularity, +/// enabling parallel processing of sub-file splits. The caller can provide an +/// optional list of selected stripe IDs to limit the scan to specific stripes. +class ARROW_DS_EXPORT OrcFileFragment : public FileFragment { + public: + /// \brief Return the stripe IDs selected by this fragment. + /// Empty vector means all stripes. + const std::vector& stripe_ids() const { + if (stripe_ids_) return *stripe_ids_; + static std::vector empty; + return empty; + } + + /// \brief Return fragment which selects a subset of this fragment's stripes. + Result> Subset(std::vector stripe_ids); + + private: + OrcFileFragment(FileSource source, std::shared_ptr format, + compute::Expression partition_expression, + std::shared_ptr physical_schema, + std::optional> stripe_ids); + + std::optional> stripe_ids_; + + friend class OrcFileFormat; }; /// @} diff --git a/cpp/src/arrow/dataset/file_orc_test.cc b/cpp/src/arrow/dataset/file_orc_test.cc index 17be015de51..803b645c935 100644 --- a/cpp/src/arrow/dataset/file_orc_test.cc +++ b/cpp/src/arrow/dataset/file_orc_test.cc @@ -92,5 +92,193 @@ INSTANTIATE_TEST_SUITE_P(TestScan, TestOrcFileFormatScan, ::testing::ValuesIn(TestFormatParams::Values()), TestFormatParams::ToTestNameString); +class TestOrcFileFragment : public ::testing::Test { + public: + void SetUp() override { + format_ = std::make_shared(); + opts_ = std::make_shared(); + opts_->dataset_schema = schema({field("f64", float64())}); + SetSchema(opts_->dataset_schema->fields()); + } + + void SetSchema(std::vector> fields) { + opts_->dataset_schema = schema(std::move(fields)); + ASSERT_OK_AND_ASSIGN(input_, GetBufferFromNumBatches(4, /*batch_size=*/512)); + } + + Result> GetBufferFromNumBatches(int num_batches, + int batch_size) { + std::vector> batches; + for (int i = 0; i < num_batches; i++) { + batches.push_back( + ConstantArrayGenerator::Zeroes(batch_size, opts_->dataset_schema)); + } + ARROW_ASSIGN_OR_RAISE(auto table, + Table::FromRecordBatches(opts_->dataset_schema, batches)); + ARROW_ASSIGN_OR_RAISE(auto sink, io::BufferOutputStream::Create()); + ARROW_ASSIGN_OR_RAISE(auto writer, adapters::orc::ORCFileWriter::Open(sink.get())); + RETURN_NOT_OK(writer->Write(*table)); + RETURN_NOT_OK(writer->Close()); + return sink->Finish(); + } + + Result> MakeFragment(FileSource source) { + ARROW_ASSIGN_OR_RAISE(auto fragment, + format_->MakeFragment(std::move(source), literal(true), + opts_->dataset_schema)); + return std::dynamic_pointer_cast(fragment); + } + + Result> MakeFragment( + FileSource source, std::vector stripe_ids) { + return format_->MakeFragment(std::move(source), literal(true), + opts_->dataset_schema, std::move(stripe_ids)); + } + + void AssertScanEquals(std::shared_ptr fragment, int64_t expected_rows) { + int64_t total_rows = 0; + ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); + for (auto maybe_scan_task : scan_task_it) { + ASSERT_OK_AND_ASSIGN(auto scan_task, maybe_scan_task); + ASSERT_OK_AND_ASSIGN(auto rb_it, scan_task->Execute()); + for (auto maybe_batch : rb_it) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + total_rows += batch->num_rows(); + } + } + ASSERT_EQ(total_rows, expected_rows); + } + + protected: + std::shared_ptr input_; + std::shared_ptr opts_; + std::shared_ptr format_; +}; + +TEST_F(TestOrcFileFragment, Basics) { + // Test that MakeFragment returns OrcFileFragment + auto source = FileSource(input_); + ASSERT_OK_AND_ASSIGN(auto fragment, MakeFragment(source)); + ASSERT_NE(fragment, nullptr); + ASSERT_TRUE(fragment->stripe_ids().empty()); +} + +TEST_F(TestOrcFileFragment, MakeFragmentWithStripeIds) { + // Test that MakeFragment with stripe_ids works + auto source = FileSource(input_); + std::vector stripe_ids = {0, 1}; + ASSERT_OK_AND_ASSIGN(auto fragment, MakeFragment(source, stripe_ids)); + ASSERT_NE(fragment, nullptr); + ASSERT_EQ(fragment->stripe_ids(), stripe_ids); +} + +TEST_F(TestOrcFileFragment, Subset) { + // Test that Subset creates a new fragment with specified stripes + auto source = FileSource(input_); + ASSERT_OK_AND_ASSIGN(auto fragment, MakeFragment(source)); + + std::vector stripe_ids = {0}; + ASSERT_OK_AND_ASSIGN(auto subset_fragment, fragment->Subset(stripe_ids)); + ASSERT_NE(subset_fragment, nullptr); + + auto* orc_subset = dynamic_cast(subset_fragment.get()); + ASSERT_NE(orc_subset, nullptr); + ASSERT_EQ(orc_subset->stripe_ids(), stripe_ids); +} + +TEST_F(TestOrcFileFragment, ScanSubset) { + // Test that scanning a subset reads fewer rows than full scan + // Create a file with multiple stripes (4 batches of 512 rows each = 2048 rows) + auto source = FileSource(input_); + + // Full scan + ASSERT_OK_AND_ASSIGN(auto full_fragment, MakeFragment(source)); + AssertScanEquals(full_fragment, 2048); + + // Scan single stripe - should read fewer rows + // ORC may combine batches into stripes, so subset might still read all if it's 1 stripe + // For a more robust test, create larger file or check stripe count first + ASSERT_OK_AND_ASSIGN(auto reader_for_info, + adapters::orc::ORCFileReader::Open( + std::make_shared(input_), + default_memory_pool())); + int64_t num_stripes = reader_for_info->NumberOfStripes(); + + if (num_stripes > 1) { + // Test reading just first stripe + std::vector first_stripe = {0}; + ASSERT_OK_AND_ASSIGN(auto subset_fragment, MakeFragment(source, first_stripe)); + + // Count rows in first stripe + auto stripe_info = reader_for_info->GetStripeInformation(0); + int64_t expected_rows = stripe_info.num_rows; + + AssertScanEquals(subset_fragment, expected_rows); + + // Verify it's less than full file + ASSERT_LT(expected_rows, 2048); + } +} + +TEST_F(TestOrcFileFragment, InvalidStripeIdOutOfRange) { + auto source = FileSource(input_); + // Find how many stripes the file has + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open( + std::make_shared(input_), + default_memory_pool())); + int64_t num_stripes = reader->NumberOfStripes(); + + // Stripe ID equal to num_stripes should fail + std::vector invalid_ids = {static_cast(num_stripes)}; + ASSERT_RAISES(IndexError, MakeFragment(source, invalid_ids)); + + // Stripe ID way out of range should also fail + std::vector very_invalid_ids = {9999}; + ASSERT_RAISES(IndexError, MakeFragment(source, very_invalid_ids)); +} + +TEST_F(TestOrcFileFragment, InvalidStripeIdNegative) { + auto source = FileSource(input_); + std::vector negative_ids = {-1}; + ASSERT_RAISES(IndexError, MakeFragment(source, negative_ids)); +} + +TEST_F(TestOrcFileFragment, CountRowsWithStripeSubset) { + auto source = FileSource(input_); + ASSERT_OK_AND_ASSIGN(auto reader, + adapters::orc::ORCFileReader::Open( + std::make_shared(input_), + default_memory_pool())); + int64_t num_stripes = reader->NumberOfStripes(); + + if (num_stripes > 1) { + // Create fragment with first stripe only + std::vector first_stripe = {0}; + ASSERT_OK_AND_ASSIGN(auto fragment, MakeFragment(source, first_stripe)); + + auto stripe_info = reader->GetStripeInformation(0); + int64_t expected_rows = stripe_info.num_rows; + + // CountRows should return the stripe's row count, not the full file + auto count_result = format_->CountRows( + fragment, literal(true), opts_); + ASSERT_OK_AND_ASSIGN(auto count, count_result.result()); + ASSERT_TRUE(count.has_value()); + ASSERT_EQ(count.value(), expected_rows); + + // Full fragment should return total rows + ASSERT_OK_AND_ASSIGN(auto full_fragment, MakeFragment(source)); + auto full_count_result = format_->CountRows( + full_fragment, literal(true), opts_); + ASSERT_OK_AND_ASSIGN(auto full_count, full_count_result.result()); + ASSERT_TRUE(full_count.has_value()); + ASSERT_EQ(full_count.value(), 2048); + + // Verify subset count is less than full count + ASSERT_LT(count.value(), full_count.value()); + } +} + } // namespace dataset } // namespace arrow From d3c4972d624830a465fb49987643a21b20bb2b4a Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Sat, 14 Feb 2026 19:50:03 -0800 Subject: [PATCH 2/4] fixes and improvements --- cpp/src/arrow/dataset/file_orc.h | 2 + cpp/src/arrow/dataset/file_orc_test.cc | 132 ++++++++++++------------- 2 files changed, 64 insertions(+), 70 deletions(-) diff --git a/cpp/src/arrow/dataset/file_orc.h b/cpp/src/arrow/dataset/file_orc.h index 8582587b9fa..2105da6a738 100644 --- a/cpp/src/arrow/dataset/file_orc.h +++ b/cpp/src/arrow/dataset/file_orc.h @@ -39,6 +39,8 @@ namespace dataset { constexpr char kOrcTypeName[] = "orc"; +class OrcFileFragment; + /// \brief A FileFormat implementation that reads from and writes to ORC files class ARROW_DS_EXPORT OrcFileFormat : public FileFormat { public: diff --git a/cpp/src/arrow/dataset/file_orc_test.cc b/cpp/src/arrow/dataset/file_orc_test.cc index 803b645c935..93c74b9d44f 100644 --- a/cpp/src/arrow/dataset/file_orc_test.cc +++ b/cpp/src/arrow/dataset/file_orc_test.cc @@ -25,6 +25,7 @@ #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/partition.h" +#include "arrow/dataset/scanner.h" #include "arrow/dataset/test_util_internal.h" #include "arrow/io/memory.h" #include "arrow/record_batch.h" @@ -103,21 +104,24 @@ class TestOrcFileFragment : public ::testing::Test { void SetSchema(std::vector> fields) { opts_->dataset_schema = schema(std::move(fields)); - ASSERT_OK_AND_ASSIGN(input_, GetBufferFromNumBatches(4, /*batch_size=*/512)); + // Write batches individually with a tiny stripe size to force multiple stripes. + ASSERT_OK_AND_ASSIGN(input_, WriteMultiStripeBuffer(4, /*batch_size=*/512)); } - Result> GetBufferFromNumBatches(int num_batches, - int batch_size) { - std::vector> batches; + /// Write an ORC buffer with a small stripe size so each batch becomes its own stripe. + Result> WriteMultiStripeBuffer(int num_batches, + int batch_size) { + adapters::orc::WriteOptions write_opts; + write_opts.stripe_size = 1024; // 1 KiB -- forces a new stripe per batch + + ARROW_ASSIGN_OR_RAISE(auto sink, io::BufferOutputStream::Create()); + ARROW_ASSIGN_OR_RAISE(auto writer, + adapters::orc::ORCFileWriter::Open(sink.get(), write_opts)); for (int i = 0; i < num_batches; i++) { - batches.push_back( - ConstantArrayGenerator::Zeroes(batch_size, opts_->dataset_schema)); + auto batch = + ConstantArrayGenerator::Zeroes(batch_size, opts_->dataset_schema); + RETURN_NOT_OK(writer->Write(*batch)); } - ARROW_ASSIGN_OR_RAISE(auto table, - Table::FromRecordBatches(opts_->dataset_schema, batches)); - ARROW_ASSIGN_OR_RAISE(auto sink, io::BufferOutputStream::Create()); - ARROW_ASSIGN_OR_RAISE(auto writer, adapters::orc::ORCFileWriter::Open(sink.get())); - RETURN_NOT_OK(writer->Write(*table)); RETURN_NOT_OK(writer->Close()); return sink->Finish(); } @@ -136,17 +140,12 @@ class TestOrcFileFragment : public ::testing::Test { } void AssertScanEquals(std::shared_ptr fragment, int64_t expected_rows) { - int64_t total_rows = 0; - ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); - for (auto maybe_scan_task : scan_task_it) { - ASSERT_OK_AND_ASSIGN(auto scan_task, maybe_scan_task); - ASSERT_OK_AND_ASSIGN(auto rb_it, scan_task->Execute()); - for (auto maybe_batch : rb_it) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); - total_rows += batch->num_rows(); - } - } - ASSERT_EQ(total_rows, expected_rows); + auto dataset = std::make_shared( + opts_->dataset_schema, FragmentVector{std::move(fragment)}); + ScannerBuilder builder(dataset, opts_); + ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable()); + ASSERT_EQ(table->num_rows(), expected_rows); } protected: @@ -187,37 +186,33 @@ TEST_F(TestOrcFileFragment, Subset) { } TEST_F(TestOrcFileFragment, ScanSubset) { - // Test that scanning a subset reads fewer rows than full scan - // Create a file with multiple stripes (4 batches of 512 rows each = 2048 rows) + // The fixture writes 4 batches of 512 rows with a tiny stripe size, + // producing multiple stripes. auto source = FileSource(input_); - // Full scan - ASSERT_OK_AND_ASSIGN(auto full_fragment, MakeFragment(source)); - AssertScanEquals(full_fragment, 2048); - - // Scan single stripe - should read fewer rows - // ORC may combine batches into stripes, so subset might still read all if it's 1 stripe - // For a more robust test, create larger file or check stripe count first - ASSERT_OK_AND_ASSIGN(auto reader_for_info, + ASSERT_OK_AND_ASSIGN(auto reader, adapters::orc::ORCFileReader::Open( std::make_shared(input_), default_memory_pool())); - int64_t num_stripes = reader_for_info->NumberOfStripes(); + int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 1) << "Test file should have multiple stripes"; - if (num_stripes > 1) { - // Test reading just first stripe - std::vector first_stripe = {0}; - ASSERT_OK_AND_ASSIGN(auto subset_fragment, MakeFragment(source, first_stripe)); + // Full scan should read all 2048 rows + ASSERT_OK_AND_ASSIGN(auto full_fragment, MakeFragment(source)); + AssertScanEquals(full_fragment, 2048); - // Count rows in first stripe - auto stripe_info = reader_for_info->GetStripeInformation(0); - int64_t expected_rows = stripe_info.num_rows; + // Read just the first stripe via MakeFragment (validated path) + std::vector first_stripe = {0}; + ASSERT_OK_AND_ASSIGN(auto subset_fragment, MakeFragment(source, first_stripe)); - AssertScanEquals(subset_fragment, expected_rows); + auto stripe_info = reader->GetStripeInformation(0); + int64_t expected_rows = stripe_info.num_rows; + ASSERT_LT(expected_rows, 2048); + AssertScanEquals(subset_fragment, expected_rows); - // Verify it's less than full file - ASSERT_LT(expected_rows, 2048); - } + // Also test Subset() (the unvalidated fast path) + ASSERT_OK_AND_ASSIGN(auto subset_via_subset, full_fragment->Subset(first_stripe)); + AssertScanEquals(subset_via_subset, expected_rows); } TEST_F(TestOrcFileFragment, InvalidStripeIdOutOfRange) { @@ -251,33 +246,30 @@ TEST_F(TestOrcFileFragment, CountRowsWithStripeSubset) { std::make_shared(input_), default_memory_pool())); int64_t num_stripes = reader->NumberOfStripes(); + ASSERT_GT(num_stripes, 1) << "Test file should have multiple stripes"; - if (num_stripes > 1) { - // Create fragment with first stripe only - std::vector first_stripe = {0}; - ASSERT_OK_AND_ASSIGN(auto fragment, MakeFragment(source, first_stripe)); - - auto stripe_info = reader->GetStripeInformation(0); - int64_t expected_rows = stripe_info.num_rows; - - // CountRows should return the stripe's row count, not the full file - auto count_result = format_->CountRows( - fragment, literal(true), opts_); - ASSERT_OK_AND_ASSIGN(auto count, count_result.result()); - ASSERT_TRUE(count.has_value()); - ASSERT_EQ(count.value(), expected_rows); - - // Full fragment should return total rows - ASSERT_OK_AND_ASSIGN(auto full_fragment, MakeFragment(source)); - auto full_count_result = format_->CountRows( - full_fragment, literal(true), opts_); - ASSERT_OK_AND_ASSIGN(auto full_count, full_count_result.result()); - ASSERT_TRUE(full_count.has_value()); - ASSERT_EQ(full_count.value(), 2048); - - // Verify subset count is less than full count - ASSERT_LT(count.value(), full_count.value()); - } + // Create fragment with first stripe only + std::vector first_stripe = {0}; + ASSERT_OK_AND_ASSIGN(auto fragment, MakeFragment(source, first_stripe)); + + auto stripe_info = reader->GetStripeInformation(0); + int64_t expected_rows = stripe_info.num_rows; + + // CountRows should return the stripe's row count, not the full file + auto count_result = format_->CountRows(fragment, literal(true), opts_); + ASSERT_OK_AND_ASSIGN(auto count, count_result.result()); + ASSERT_TRUE(count.has_value()); + ASSERT_EQ(count.value(), expected_rows); + + // Full fragment should return total rows + ASSERT_OK_AND_ASSIGN(auto full_fragment, MakeFragment(source)); + auto full_count_result = format_->CountRows(full_fragment, literal(true), opts_); + ASSERT_OK_AND_ASSIGN(auto full_count, full_count_result.result()); + ASSERT_TRUE(full_count.has_value()); + ASSERT_EQ(full_count.value(), 2048); + + // Verify subset count is less than full count + ASSERT_LT(count.value(), full_count.value()); } } // namespace dataset From e947a85787ee0d6db0b687015f5cad49b555b92a Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Sat, 14 Feb 2026 20:59:05 -0800 Subject: [PATCH 3/4] cleaup code --- cpp/src/arrow/dataset/file_orc.cc | 11 ----------- cpp/src/arrow/dataset/file_orc_test.cc | 21 +-------------------- 2 files changed, 1 insertion(+), 31 deletions(-) diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index d5df9f79a5b..7f101e5eb0e 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -76,7 +76,6 @@ class OrcScanTask { OpenORCReader(source, std::make_shared(scan_options))); auto materialized_fields = scan_options.MaterializedFields(); - // filter out virtual columns std::vector included_fields; ARROW_ASSIGN_OR_RAISE(auto schema, reader->ReadSchema()); for (const auto& ref : materialized_fields) { @@ -86,7 +85,6 @@ class OrcScanTask { included_fields.push_back(schema->field(match.indices()[0])->name()); } - // If stripe_ids is empty, read all stripes (backward compatible) if (stripe_ids.empty()) { std::shared_ptr record_batch_reader; ARROW_ASSIGN_OR_RAISE( @@ -96,7 +94,6 @@ class OrcScanTask { return RecordBatchIterator(Impl{std::move(record_batch_reader)}); } - // Convert field names to indices using Schema::GetFieldIndex std::vector included_indices; for (const auto& field_name : included_fields) { int idx = schema->GetFieldIndex(field_name); @@ -105,17 +102,14 @@ class OrcScanTask { } } - // Read only selected stripes return RecordBatchIterator( Impl{std::move(reader), stripe_ids, std::move(included_indices), scan_options.batch_size}); } - // Constructor for full file read explicit Impl(std::shared_ptr reader) : record_batch_reader_(std::move(reader)), stripe_mode_(false) {} - // Constructor for stripe-filtered read Impl(std::unique_ptr reader, std::vector stripe_ids, std::vector included_indices, int64_t batch_size) @@ -133,7 +127,6 @@ class OrcScanTask { return batch; } - // Stripe-filtered mode while (true) { if (record_batch_reader_) { std::shared_ptr batch; @@ -149,9 +142,7 @@ class OrcScanTask { return nullptr; } - // Seek to the next stripe and get a reader for it int64_t stripe_id = stripe_ids_[current_stripe_idx_]; - // Get stripe information to find the first row of this stripe auto stripe_info = orc_reader_->GetStripeInformation(stripe_id); RETURN_NOT_OK(orc_reader_->Seek(stripe_info.first_row_id)); ARROW_ASSIGN_OR_RAISE( @@ -328,7 +319,6 @@ Result> OrcFileFragment::Subset(std::vector strip Result> OrcFileFormat::MakeFragment( FileSource source, compute::Expression partition_expression, std::shared_ptr physical_schema) { - // Return OrcFileFragment with no stripe filter (reads all stripes) return std::shared_ptr( new OrcFileFragment(std::move(source), shared_from_this(), std::move(partition_expression), @@ -338,7 +328,6 @@ Result> OrcFileFormat::MakeFragment( Result> OrcFileFormat::MakeFragment( FileSource source, compute::Expression partition_expression, std::shared_ptr physical_schema, std::vector stripe_ids) { - // Validate stripe IDs if (!stripe_ids.empty()) { ARROW_ASSIGN_OR_RAISE(auto reader, OpenORCReader(source)); int64_t num_stripes = reader->NumberOfStripes(); diff --git a/cpp/src/arrow/dataset/file_orc_test.cc b/cpp/src/arrow/dataset/file_orc_test.cc index 93c74b9d44f..0725bd84710 100644 --- a/cpp/src/arrow/dataset/file_orc_test.cc +++ b/cpp/src/arrow/dataset/file_orc_test.cc @@ -55,8 +55,6 @@ class OrcFormatHelper { class TestOrcFileFormat : public FileFormatFixtureMixin {}; -// TEST_F(TestOrcFileFormat, WriteRecordBatchReader) { TestWrite(); } - TEST_F(TestOrcFileFormat, InspectFailureWithRelevantError) { TestInspectFailureWithRelevantError(StatusCode::IOError, "ORC"); } @@ -104,15 +102,13 @@ class TestOrcFileFragment : public ::testing::Test { void SetSchema(std::vector> fields) { opts_->dataset_schema = schema(std::move(fields)); - // Write batches individually with a tiny stripe size to force multiple stripes. ASSERT_OK_AND_ASSIGN(input_, WriteMultiStripeBuffer(4, /*batch_size=*/512)); } - /// Write an ORC buffer with a small stripe size so each batch becomes its own stripe. Result> WriteMultiStripeBuffer(int num_batches, int batch_size) { adapters::orc::WriteOptions write_opts; - write_opts.stripe_size = 1024; // 1 KiB -- forces a new stripe per batch + write_opts.stripe_size = 1024; ARROW_ASSIGN_OR_RAISE(auto sink, io::BufferOutputStream::Create()); ARROW_ASSIGN_OR_RAISE(auto writer, @@ -155,7 +151,6 @@ class TestOrcFileFragment : public ::testing::Test { }; TEST_F(TestOrcFileFragment, Basics) { - // Test that MakeFragment returns OrcFileFragment auto source = FileSource(input_); ASSERT_OK_AND_ASSIGN(auto fragment, MakeFragment(source)); ASSERT_NE(fragment, nullptr); @@ -163,7 +158,6 @@ TEST_F(TestOrcFileFragment, Basics) { } TEST_F(TestOrcFileFragment, MakeFragmentWithStripeIds) { - // Test that MakeFragment with stripe_ids works auto source = FileSource(input_); std::vector stripe_ids = {0, 1}; ASSERT_OK_AND_ASSIGN(auto fragment, MakeFragment(source, stripe_ids)); @@ -172,7 +166,6 @@ TEST_F(TestOrcFileFragment, MakeFragmentWithStripeIds) { } TEST_F(TestOrcFileFragment, Subset) { - // Test that Subset creates a new fragment with specified stripes auto source = FileSource(input_); ASSERT_OK_AND_ASSIGN(auto fragment, MakeFragment(source)); @@ -186,8 +179,6 @@ TEST_F(TestOrcFileFragment, Subset) { } TEST_F(TestOrcFileFragment, ScanSubset) { - // The fixture writes 4 batches of 512 rows with a tiny stripe size, - // producing multiple stripes. auto source = FileSource(input_); ASSERT_OK_AND_ASSIGN(auto reader, @@ -197,11 +188,9 @@ TEST_F(TestOrcFileFragment, ScanSubset) { int64_t num_stripes = reader->NumberOfStripes(); ASSERT_GT(num_stripes, 1) << "Test file should have multiple stripes"; - // Full scan should read all 2048 rows ASSERT_OK_AND_ASSIGN(auto full_fragment, MakeFragment(source)); AssertScanEquals(full_fragment, 2048); - // Read just the first stripe via MakeFragment (validated path) std::vector first_stripe = {0}; ASSERT_OK_AND_ASSIGN(auto subset_fragment, MakeFragment(source, first_stripe)); @@ -210,25 +199,21 @@ TEST_F(TestOrcFileFragment, ScanSubset) { ASSERT_LT(expected_rows, 2048); AssertScanEquals(subset_fragment, expected_rows); - // Also test Subset() (the unvalidated fast path) ASSERT_OK_AND_ASSIGN(auto subset_via_subset, full_fragment->Subset(first_stripe)); AssertScanEquals(subset_via_subset, expected_rows); } TEST_F(TestOrcFileFragment, InvalidStripeIdOutOfRange) { auto source = FileSource(input_); - // Find how many stripes the file has ASSERT_OK_AND_ASSIGN(auto reader, adapters::orc::ORCFileReader::Open( std::make_shared(input_), default_memory_pool())); int64_t num_stripes = reader->NumberOfStripes(); - // Stripe ID equal to num_stripes should fail std::vector invalid_ids = {static_cast(num_stripes)}; ASSERT_RAISES(IndexError, MakeFragment(source, invalid_ids)); - // Stripe ID way out of range should also fail std::vector very_invalid_ids = {9999}; ASSERT_RAISES(IndexError, MakeFragment(source, very_invalid_ids)); } @@ -248,27 +233,23 @@ TEST_F(TestOrcFileFragment, CountRowsWithStripeSubset) { int64_t num_stripes = reader->NumberOfStripes(); ASSERT_GT(num_stripes, 1) << "Test file should have multiple stripes"; - // Create fragment with first stripe only std::vector first_stripe = {0}; ASSERT_OK_AND_ASSIGN(auto fragment, MakeFragment(source, first_stripe)); auto stripe_info = reader->GetStripeInformation(0); int64_t expected_rows = stripe_info.num_rows; - // CountRows should return the stripe's row count, not the full file auto count_result = format_->CountRows(fragment, literal(true), opts_); ASSERT_OK_AND_ASSIGN(auto count, count_result.result()); ASSERT_TRUE(count.has_value()); ASSERT_EQ(count.value(), expected_rows); - // Full fragment should return total rows ASSERT_OK_AND_ASSIGN(auto full_fragment, MakeFragment(source)); auto full_count_result = format_->CountRows(full_fragment, literal(true), opts_); ASSERT_OK_AND_ASSIGN(auto full_count, full_count_result.result()); ASSERT_TRUE(full_count.has_value()); ASSERT_EQ(full_count.value(), 2048); - // Verify subset count is less than full count ASSERT_LT(count.value(), full_count.value()); } From 6f9c86973ca60fce7f8092ef7aaf72157a1715e5 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Sun, 15 Feb 2026 09:29:58 -0800 Subject: [PATCH 4/4] fix comment --- cpp/src/arrow/dataset/file_orc.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index 7f101e5eb0e..b1a435da6f6 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -284,7 +284,9 @@ Future> OrcFileFormat::CountRows( })); } -// OrcFileWriter, OrcFileWriteOptions +// // +// // OrcFileWriter, OrcFileWriteOptions +// // std::shared_ptr OrcFileFormat::DefaultWriteOptions() { // TODO (https://issues.apache.org/jira/browse/ARROW-13796)