From 0ac67d56fbd432e7e51d7403ba16fe0afd8eca5f Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Thu, 28 May 2026 17:42:47 +0800 Subject: [PATCH 1/5] fix: Reuse RowGroupPageIndexReader for multiple columns in PageFilteredRowGroupReader::ReadFilteredRowGroup to avoid performance drop on wide tables --- .../parquet/page_filtered_row_group_reader.cpp | 17 ++++++++++------- .../parquet/page_filtered_row_group_reader.h | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index 6a372e2e5..77ea92868 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -138,7 +138,7 @@ std::pair PageFilteredRowGroupReader::ComputeCompressedRowRa Result> PageFilteredRowGroupReader::ReadFilteredColumn( const std::shared_ptr<::parquet::RowGroupReader>& row_group_reader, ::parquet::ParquetFileReader* parquet_reader, - const std::shared_ptr<::parquet::PageIndexReader>& page_index_reader, int32_t row_group_index, + const std::shared_ptr<::parquet::RowGroupPageIndexReader>& rg_page_index_reader, int32_t row_group_index, int32_t column_index, const RowRanges& row_ranges, const std::shared_ptr& field, int64_t row_group_row_count, ::arrow::MemoryPool* pool) { auto file_metadata = parquet_reader->metadata(); @@ -149,11 +149,8 @@ Result> PageFilteredRowGroupReader::ReadFil int64_t effective_row_count = row_group_row_count; std::shared_ptr<::parquet::OffsetIndex> offset_index; - if (page_index_reader) { - auto rg_page_index_reader = page_index_reader->RowGroup(row_group_index); - if (rg_page_index_reader) { - offset_index = rg_page_index_reader->GetOffsetIndex(column_index); - } + if (rg_page_index_reader) { + offset_index = rg_page_index_reader->GetOffsetIndex(column_index); } auto page_reader = row_group_reader->GetColumnPageReader(column_index); @@ -263,6 +260,12 @@ Result> PageFilteredRowGroupReader::Re int64_t row_group_row_count = rg_metadata->num_rows(); auto page_index_reader = parquet_reader->GetPageIndexReader(); + // reuse RowGroupPageIndexReader for multiple columns in the same row group to avoid redundant metadata reads + std::shared_ptr<::parquet::RowGroupPageIndexReader> rg_page_index_reader; + if (page_index_reader) { + rg_page_index_reader = page_index_reader->RowGroup(row_group_index); + } + // Read each column with page filtering std::vector> columns; columns.reserve(column_indices.size()); @@ -270,7 +273,7 @@ Result> PageFilteredRowGroupReader::Re for (size_t i = 0; i < column_indices.size(); ++i) { PAIMON_ASSIGN_OR_RAISE( std::shared_ptr chunked_array, - ReadFilteredColumn(row_group_reader, parquet_reader, page_index_reader, row_group_index, + ReadFilteredColumn(row_group_reader, parquet_reader, rg_page_index_reader, row_group_index, column_indices[i], row_ranges, arrow_schema->field(static_cast(i)), row_group_row_count, pool)); diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.h b/src/paimon/format/parquet/page_filtered_row_group_reader.h index 466f664c7..24c82e253 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.h +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.h @@ -90,7 +90,7 @@ class PageFilteredRowGroupReader { static Result> ReadFilteredColumn( const std::shared_ptr<::parquet::RowGroupReader>& row_group_reader, ::parquet::ParquetFileReader* parquet_reader, - const std::shared_ptr<::parquet::PageIndexReader>& page_index_reader, + const std::shared_ptr<::parquet::RowGroupPageIndexReader>& rg_page_index_reader, int32_t row_group_index, int32_t column_index, const RowRanges& row_ranges, const std::shared_ptr& field, int64_t row_group_row_count, ::arrow::MemoryPool* pool); From f7c3ea2c00c6932ad5b4585ddac73560dc9fc2ec Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Thu, 28 May 2026 18:24:16 +0800 Subject: [PATCH 2/5] style: clang-format --- .../parquet/page_filtered_row_group_reader.cpp | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp index 77ea92868..4594717f0 100644 --- a/src/paimon/format/parquet/page_filtered_row_group_reader.cpp +++ b/src/paimon/format/parquet/page_filtered_row_group_reader.cpp @@ -138,9 +138,10 @@ std::pair PageFilteredRowGroupReader::ComputeCompressedRowRa Result> PageFilteredRowGroupReader::ReadFilteredColumn( const std::shared_ptr<::parquet::RowGroupReader>& row_group_reader, ::parquet::ParquetFileReader* parquet_reader, - const std::shared_ptr<::parquet::RowGroupPageIndexReader>& rg_page_index_reader, int32_t row_group_index, - int32_t column_index, const RowRanges& row_ranges, const std::shared_ptr& field, - int64_t row_group_row_count, ::arrow::MemoryPool* pool) { + const std::shared_ptr<::parquet::RowGroupPageIndexReader>& rg_page_index_reader, + int32_t row_group_index, int32_t column_index, const RowRanges& row_ranges, + const std::shared_ptr& field, int64_t row_group_row_count, + ::arrow::MemoryPool* pool) { auto file_metadata = parquet_reader->metadata(); const auto* col_descriptor = file_metadata->schema()->Column(column_index); @@ -260,7 +261,8 @@ Result> PageFilteredRowGroupReader::Re int64_t row_group_row_count = rg_metadata->num_rows(); auto page_index_reader = parquet_reader->GetPageIndexReader(); - // reuse RowGroupPageIndexReader for multiple columns in the same row group to avoid redundant metadata reads + // reuse RowGroupPageIndexReader for multiple columns in the same row group to avoid redundant + // metadata reads std::shared_ptr<::parquet::RowGroupPageIndexReader> rg_page_index_reader; if (page_index_reader) { rg_page_index_reader = page_index_reader->RowGroup(row_group_index); @@ -273,8 +275,8 @@ Result> PageFilteredRowGroupReader::Re for (size_t i = 0; i < column_indices.size(); ++i) { PAIMON_ASSIGN_OR_RAISE( std::shared_ptr chunked_array, - ReadFilteredColumn(row_group_reader, parquet_reader, rg_page_index_reader, row_group_index, - column_indices[i], row_ranges, + ReadFilteredColumn(row_group_reader, parquet_reader, rg_page_index_reader, + row_group_index, column_indices[i], row_ranges, arrow_schema->field(static_cast(i)), row_group_row_count, pool)); From ee3bdf146d03049f3c612048e2d32e50f9479b42 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Mon, 1 Jun 2026 15:07:19 +0800 Subject: [PATCH 3/5] fix: patch arrow::InputStream::Advance with virtual and override it in arrow::CachedInputStream --- cmake_modules/arrow.diff | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/cmake_modules/arrow.diff b/cmake_modules/arrow.diff index f61b61cae..c6f3a43af 100644 --- a/cmake_modules/arrow.diff +++ b/cmake_modules/arrow.diff @@ -220,7 +220,7 @@ index 4d3acb491e..3906ff3c59 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc -@@ -207,6 +207,100 @@ +@@ -207,6 +207,109 @@ return {col_start, col_length}; } @@ -308,6 +308,16 @@ index 4d3acb491e..3906ff3c59 100644 + return std::shared_ptr<::arrow::Buffer>(std::move(buf)); + } + ++ // Override Advance to avoid real I/O for skipped pages. ++ // The default InputStream::Advance() calls Read() and discards the result, ++ // which would trigger source_->ReadAt() on cache miss — defeating page-level ++ // I/O skipping via data_page_filter. Since Advance() is only used to skip ++ // over data that will not be consumed, we can safely just move the position. ++ ::arrow::Status Advance(int64_t nbytes) override { ++ position_ += std::min(nbytes, length_ - position_); ++ return ::arrow::Status::OK(); ++ } ++ + private: + std::shared_ptr<::arrow::io::internal::ReadRangeCache> cache_; + std::shared_ptr source_; @@ -410,3 +420,13 @@ diff --git a/cpp/cmake_modules/BuildUtils.cmake b/cpp/cmake_modules/BuildUtils.c message(FATAL_ERROR "libtool found appears to be the incompatible GNU libtool: ${LIBTOOL_MACOS}" ) endif() + +diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h +--- a/cpp/src/arrow/io/interfaces.h ++++ b/cpp/src/arrow/io/interfaces.h +@@ -211,7 +211,7 @@ + /// \brief Advance or skip stream indicated number of bytes + /// \param[in] nbytes the number to move forward + /// \return Status +- Status Advance(int64_t nbytes); ++ virtual Status Advance(int64_t nbytes); From 69784e4c75bf71c47a28d03dda9dd5226b0289f4 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Mon, 1 Jun 2026 16:11:55 +0800 Subject: [PATCH 4/5] fix: fix build error and and negative check in CachedInputStream::Advance --- cmake_modules/arrow.diff | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/cmake_modules/arrow.diff b/cmake_modules/arrow.diff index c6f3a43af..f195ff4cc 100644 --- a/cmake_modules/arrow.diff +++ b/cmake_modules/arrow.diff @@ -220,10 +220,10 @@ index 4d3acb491e..3906ff3c59 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc -@@ -207,6 +207,109 @@ +@@ -207,6 +207,117 @@ return {col_start, col_length}; } - + +// CachedInputStream: InputStream adapter that reads through ReadRangeCache with +// zero-cost skip for non-cached pages. Used for page-level caching where only +// specific pages are pre-buffered. @@ -314,7 +314,14 @@ index 4d3acb491e..3906ff3c59 100644 + // I/O skipping via data_page_filter. Since Advance() is only used to skip + // over data that will not be consumed, we can safely just move the position. + ::arrow::Status Advance(int64_t nbytes) override { -+ position_ += std::min(nbytes, length_ - position_); ++ if (nbytes <= 0) { ++ return ::arrow::Status::OK(); ++ } ++ int64_t remaining = length_ - position_; ++ if (remaining <= 0) { ++ return ::arrow::Status::OK(); ++ } ++ position_ += std::min(nbytes, remaining); + return ::arrow::Status::OK(); + } + @@ -331,7 +338,7 @@ index 4d3acb491e..3906ff3c59 100644 // RowGroupReader::Contents implementation for the Parquet file specification class SerializedRowGroup : public RowGroupReader::Contents { public: -@@ -242,6 +336,11 @@ +@@ -242,6 +343,11 @@ // segments. PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range)); stream = std::make_shared<::arrow::io::BufferReader>(buffer); @@ -343,7 +350,7 @@ index 4d3acb491e..3906ff3c59 100644 } else { stream = properties_.GetStream(source_, col_range.offset, col_range.length); } -@@ -417,6 +516,26 @@ +@@ -417,6 +523,26 @@ return cached_source_->WaitFor(ranges); } @@ -370,7 +377,7 @@ index 4d3acb491e..3906ff3c59 100644 // Metadata/footer parsing. Divided up to separate sync/async paths, and to use // exceptions for error handling (with the async path converting to Future/Status). -@@ -911,6 +1030,22 @@ +@@ -911,6 +1037,22 @@ return file->WhenBuffered(row_groups, column_indices); } From 36509594e282c5422ae2cc9ce16cbfede21d2ba4 Mon Sep 17 00:00:00 2001 From: zhouhongfeng Date: Mon, 1 Jun 2026 17:48:00 +0800 Subject: [PATCH 5/5] style: re-format arrow.diff --- cmake_modules/arrow.diff | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake_modules/arrow.diff b/cmake_modules/arrow.diff index f195ff4cc..ae7752097 100644 --- a/cmake_modules/arrow.diff +++ b/cmake_modules/arrow.diff @@ -223,7 +223,7 @@ index 4d3acb491e..3906ff3c59 100644 @@ -207,6 +207,117 @@ return {col_start, col_length}; } - + +// CachedInputStream: InputStream adapter that reads through ReadRangeCache with +// zero-cost skip for non-cached pages. Used for page-level caching where only +// specific pages are pre-buffered.