Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/paimon/format/parquet/page_filtered_row_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,18 +316,20 @@ std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRange
for (int32_t col_idx : column_indices) {
auto col_chunk = rg_metadata->ColumnChunk(col_idx);
int64_t data_page_offset = col_chunk->data_page_offset();
int64_t total_compressed_size = col_chunk->total_compressed_size();
int64_t chunk_end = data_page_offset + total_compressed_size;

int64_t data_page_compressed_size = col_chunk->total_compressed_size();
// Dictionary page: always include if present
if (col_chunk->has_dictionary_page()) {
int64_t dict_offset = col_chunk->dictionary_page_offset();
int64_t dict_size = data_page_offset - dict_offset;
// if dictionary exists, the data page size should be reduced by the dictionary
data_page_compressed_size -= dict_size;
if (dict_size > 0) {
ranges.push_back({dict_offset, dict_size});
}
}

int64_t chunk_end = data_page_offset + data_page_compressed_size;
Comment on lines 318 to +331

// Try to get OffsetIndex for page-level ranges
std::shared_ptr<::parquet::OffsetIndex> offset_index;
if (rg_page_index_reader) {
Expand All @@ -336,7 +338,7 @@ std::vector<::arrow::io::ReadRange> PageFilteredRowGroupReader::ComputePageRange

if (!offset_index) {
// No OffsetIndex: fall back to entire column chunk
ranges.push_back({data_page_offset, total_compressed_size});
ranges.push_back({data_page_offset, data_page_compressed_size});
continue;
}

Expand Down
107 changes: 107 additions & 0 deletions src/paimon/format/parquet/page_filtered_row_group_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,4 +719,111 @@ TEST_F(PageFilteredRowGroupReaderTest, EndToEndPageLevelPreBuffer) {
ASSERT_EQ(10, offset);
}

/// Test: ComputePageRanges with dictionary encoding produces correct chunk_end.
///
/// When dictionary encoding is enabled, the column chunk layout is:
/// [Dictionary Page] [Data Page 0] [Data Page 1] ... [Data Page N]
/// And total_compressed_size covers the entire chunk starting from dictionary_page_offset.
///
/// The bug: chunk_end = data_page_offset + total_compressed_size is wrong because
/// total_compressed_size already includes the dictionary page size. The correct
/// chunk_end should be dictionary_page_offset + total_compressed_size.
///
/// This test verifies that the last page's computed range does not exceed the
/// actual column chunk boundary.
TEST_F(PageFilteredRowGroupReaderTest, ComputePageRangesWithDictionaryEncoding) {
std::string file_name = dir_->Str() + "/compute_ranges_dict.parquet";

// Use low-cardinality data to ensure dictionary encoding is actually used.
// 100 rows with values cycling through 0..9 → dictionary will have 10 entries.
arrow::Int32Builder val_builder;
ASSERT_TRUE(val_builder.Reserve(100).ok());
for (int32_t i = 0; i < 100; ++i) {
val_builder.UnsafeAppend(i % 10);
}
auto val_array = val_builder.Finish().ValueOrDie();
auto field = arrow::field("val", arrow::int32());
auto struct_array = arrow::StructArray::Make({val_array}, {field}).ValueOrDie();
Comment on lines +744 to +746

// Write with dictionary encoding enabled (the key difference from other tests).
auto data_type = struct_array->struct_type();
auto data_schema = arrow::schema(data_type->fields());
auto data_arrow_array = std::make_unique<ArrowArray>();
ASSERT_TRUE(arrow::ExportArray(*struct_array, data_arrow_array.get()).ok());
ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> out,
fs_->Create(file_name, /*overwrite=*/false));
Comment on lines +753 to +754
::parquet::WriterProperties::Builder builder;
builder.write_batch_size(10);
builder.max_row_group_length(100);
builder.enable_dictionary(); // Enable dictionary → triggers the bug
builder.enable_write_page_index();
builder.data_pagesize(1); // Force small pages
auto writer_properties = builder.build();
ASSERT_OK_AND_ASSIGN(
auto format_writer,
ParquetFormatWriter::Create(out, data_schema, writer_properties,
DEFAULT_PARQUET_WRITER_MAX_MEMORY_USE, arrow_pool_));
ASSERT_OK(format_writer->AddBatch(data_arrow_array.get()));
ASSERT_OK(format_writer->Finish());
ASSERT_OK(out->Close());

// Open the file and verify metadata confirms dictionary page presence
ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> in, fs_->Open(file_name));
ASSERT_OK_AND_ASSIGN(uint64_t length, in->Length());
auto in_stream = std::make_shared<ArrowInputStreamAdapter>(in, arrow_pool_, length);
auto parquet_reader = ::parquet::ParquetFileReader::Open(in_stream);
ASSERT_TRUE(parquet_reader);

auto file_metadata = parquet_reader->metadata();
auto rg_metadata = file_metadata->RowGroup(0);
auto col_chunk = rg_metadata->ColumnChunk(0);

// Precondition: dictionary page must exist for this test to be meaningful
ASSERT_TRUE(col_chunk->has_dictionary_page())
<< "Dictionary page not present - test setup error";

int64_t dict_offset = col_chunk->dictionary_page_offset();
int64_t data_page_offset = col_chunk->data_page_offset();
int64_t total_compressed_size = col_chunk->total_compressed_size();

// The true chunk end is dict_offset + total_compressed_size
int64_t true_chunk_end = dict_offset + total_compressed_size;
// The buggy chunk end would be data_page_offset + total_compressed_size
int64_t buggy_chunk_end = data_page_offset + total_compressed_size;

// Sanity: dict page is before data pages, so buggy end > true end
ASSERT_LT(dict_offset, data_page_offset)
<< "Dictionary offset should be before data page offset";
ASSERT_GT(buggy_chunk_end, true_chunk_end)
<< "Buggy chunk_end should exceed true chunk_end when dictionary is present";

// Now call ComputePageRanges with all rows matching
RowRanges row_ranges;
row_ranges.Add(RowRanges::Range(0, 99));

auto ranges = PageFilteredRowGroupReader::ComputePageRanges(
parquet_reader.get(), /*row_group_index=*/0, row_ranges, /*column_indices=*/{0});

ASSERT_FALSE(ranges.empty());

// The critical check: no range should extend beyond the true chunk end.
// With the bug, the last data page's range would use chunk_end = data_page_offset +
// total_compressed_size, which overshoots by the dictionary page size.
for (size_t i = 0; i < ranges.size(); ++i) {
int64_t range_end = ranges[i].offset + ranges[i].length;
EXPECT_LE(range_end, true_chunk_end)
<< "Range " << i << " [offset=" << ranges[i].offset << ", length=" << ranges[i].length
<< "] exceeds true chunk end (" << true_chunk_end << "). "
<< "This indicates chunk_end is computed as data_page_offset + "
"total_compressed_size instead of dictionary_page_offset + "
"total_compressed_size.";
}
Comment on lines +807 to +820

// Also verify total covered range does not exceed file size
for (const auto& range : ranges) {
EXPECT_LE(range.offset + range.length, static_cast<int64_t>(length))
<< "Range exceeds file size";
}
}

} // namespace paimon::parquet::test
Loading