Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,31 @@
/** The array which is made up by several rows. */
public class DataEvolutionArray implements InternalArray {

/** Sentinel for "no fallback"; positions with rowOffsets[pos] < 0 stay null. */
public static final long NO_MISSING_FIELD_FALLBACK = Long.MIN_VALUE;

private final InternalArray[] rows;
private final int[] rowOffsets;
private final int[] fieldOffsets;

/**
* Value to return from getLong(pos) when {@code rowOffsets[pos] < 0}. Used by data-evolution
* null-count arrays to encode "field not physically present in any file in the group" as "all
* rowCount rows are null" instead of "unknown stats". {@link #NO_MISSING_FIELD_FALLBACK}
* disables the fallback.
*/
private final long missingFieldLong;

public DataEvolutionArray(int rowNumber, int[] rowOffsets, int[] fieldOffsets) {
this(rowNumber, rowOffsets, fieldOffsets, NO_MISSING_FIELD_FALLBACK);
}

public DataEvolutionArray(
int rowNumber, int[] rowOffsets, int[] fieldOffsets, long missingFieldLong) {
this.rows = new InternalArray[rowNumber];
this.rowOffsets = rowOffsets;
this.fieldOffsets = fieldOffsets;
this.missingFieldLong = missingFieldLong;
}

public void setRow(int pos, InternalArray row) {
Expand Down Expand Up @@ -73,6 +90,16 @@ private int offsetInRow(int pos) {

@Override
public boolean isNullAt(int pos) {
// rowOffsets[pos] == -1: field is absent from every file in the group, so every
// logical row is null for it; with missingFieldLong set this is encoded as a
// known count rather than "unknown stats" (isNullAt=false), so non-IS-NULL
// predicates can prune the file.
// rowOffsets[pos] == -2: field exists in a file but its stats were not captured
// (e.g. valueStatsCols did not include it). Treat as unknown stats so callers
// stay conservative.
if (rowOffsets[pos] == -1 && missingFieldLong != NO_MISSING_FIELD_FALLBACK) {
return false;
}
if (rowOffsets[pos] < 0) {
return true;
}
Expand Down Expand Up @@ -101,6 +128,9 @@ public int getInt(int pos) {

@Override
public long getLong(int pos) {
if (rowOffsets[pos] == -1 && missingFieldLong != NO_MISSING_FIELD_FALLBACK) {
return missingFieldLong;
}
return chooseArray(pos).getLong(offsetInRow(pos));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
Expand All @@ -60,16 +62,18 @@
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
import static org.apache.paimon.types.VectorType.isVectorStoreFile;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** {@link FileStoreScan} for data-evolution enabled table. */
public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {

private final ConcurrentMap<Pair<Long, List<String>>, List<String>> fileFields;

private boolean dropStats = false;
@Nullable private RowType readType;

// Cache file's physical field id set per (schemaId, writeCols) to avoid recomputing during
// per-file column pruning in postFilterManifestEntries.
private final ConcurrentMap<Pair<Long, List<String>>, Set<Integer>> fileFieldIdsCache =
new ConcurrentHashMap<>();

public DataEvolutionFileStoreScan(
ManifestsReader manifestsReader,
BucketSelectConverter bucketSelectConverter,
Expand All @@ -90,8 +94,6 @@ public DataEvolutionFileStoreScan(
false,
deletionVectorsEnabled,
true);

this.fileFields = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -175,21 +177,22 @@ public Iterator<ManifestEntry> readManifestEntries(

@Override
protected boolean postFilterManifestEntriesEnabled() {
return inputFilter != null;
// Always enable post-filtering. The list filterByStats handles predicate-based pruning
// and pruneByReadType strips per-file columns that are not requested — both
// need row-id-range grouping that single filterByStats(ManifestEntry) cannot see.
return inputFilter != null || readType != null;
}

@Override
protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> entries) {
checkNotNull(inputFilter);

// group by row id range
RangeHelper<ManifestEntry> rangeHelper =
new RangeHelper<>(e -> e.file().nonNullRowIdRange());
List<List<ManifestEntry>> splitByRowId = rangeHelper.mergeOverlappingRanges(entries);

return splitByRowId.stream()
.filter(this::filterByStats)
.flatMap(Collection::stream)
.filter(group -> inputFilter == null || filterByStats(group))
.flatMap(group -> pruneByReadType(group).stream())
.map(entry -> dropStats ? dropStats(entry) : entry)
.collect(Collectors.toList());
}
Expand All @@ -200,6 +203,62 @@ private boolean filterByStats(List<ManifestEntry> entries) {
stats.rowCount(), stats.minValues(), stats.maxValues(), stats.nullCounts());
}

/**
* Per-file column pruning within a row-id-range group: drop files whose physical columns have
* no overlap with the query's {@code readType}. Necessary for columnar-split DE scenarios where
* a logical row is reconstructed from multiple files in the same row id range — a query that
* does not reference a file's columns has no reason to read it.
*
* <p>When every file in the group lacks a requested column (e.g. an ADD COLUMN projection over
* a row-disjoint pre-ALTER group), one file is kept as a row-count representative so the reader
* can emit the right number of NULL-filled rows.
*/
private List<ManifestEntry> pruneByReadType(List<ManifestEntry> group) {
if (readType == null || group.size() <= 1) {
return group;
}
Set<Integer> readFieldIds = new HashSet<>();
for (DataField f : readType.getFields()) {
readFieldIds.add(f.id());
}
List<ManifestEntry> kept = new ArrayList<>(group.size());
for (ManifestEntry entry : group) {
Set<Integer> fileIds = fileFieldIdsForEntry(entry);
for (int id : readFieldIds) {
if (fileIds.contains(id)) {
kept.add(entry);
break;
}
}
}
// Group must contribute at least one file so the reader sees rowCount and can NULL-fill
// missing columns for the projection's rows.
return kept.isEmpty() ? Collections.singletonList(group.get(0)) : kept;
}

private Set<Integer> fileFieldIdsForEntry(ManifestEntry entry) {
return fileFieldIdsCache.computeIfAbsent(
Pair.of(entry.file().schemaId(), entry.file().writeCols()),
pair -> computeFileFieldIds(this::scanTableSchema, entry.file()));
}

/**
* Field ids of the columns physically present in {@code file}, resolved through the file's own
* schema (i.e. the schema the file was written under). Field id, not field name, is the stable
* identity across schemas — necessary so a renamed column matches an old file written under the
* pre-rename name.
*/
@VisibleForTesting
static Set<Integer> computeFileFieldIds(
Function<Long, TableSchema> scanTableSchema, DataFileMeta file) {
Set<Integer> ids = new HashSet<>();
for (DataField f :
scanTableSchema.apply(file.schemaId()).project(file.writeCols()).fields()) {
ids.add(f.id());
}
return ids;
}

/** TODO: Optimize implementation of this method. */
@VisibleForTesting
static EvolutionStats evolutionStats(
Expand Down Expand Up @@ -279,44 +338,34 @@ static EvolutionStats evolutionStats(
}
}

long groupRowCount = metas.get(0).file().rowCount();
DataEvolutionRow finalMin = new DataEvolutionRow(metas.size(), rowOffsets, fieldOffsets);
DataEvolutionRow finalMax = new DataEvolutionRow(metas.size(), rowOffsets, fieldOffsets);
// For null-count specifically, a field absent from every file in the group means every
// logical row is null for that field — encode as groupRowCount so stats predicates can
// prune non-null comparisons (e.g. `extra2 = 'x'`) instead of falling back to
// "unknown stats -> keep" in LeafPredicate.test.
DataEvolutionArray finalNullCounts =
new DataEvolutionArray(metas.size(), rowOffsets, fieldOffsets);
new DataEvolutionArray(metas.size(), rowOffsets, fieldOffsets, groupRowCount);

finalMin.setRows(min);
finalMax.setRows(max);
finalNullCounts.setRows(nullCounts);
return new EvolutionStats(
metas.get(0).file().rowCount(), finalMin, finalMax, finalNullCounts);
return new EvolutionStats(groupRowCount, finalMin, finalMax, finalNullCounts);
}

/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
DataFileMeta file = entry.file();

if (readType != null) {
boolean containsReadCol = false;
List<String> fileFieldNmes =
fileFields.computeIfAbsent(
Pair.of(file.schemaId(), file.writeCols()),
pair ->
scanTableSchema(file.schemaId())
.project(file.writeCols())
.logicalRowType()
.getFieldNames());

for (String field : readType.getFieldNames()) {
if (fileFieldNmes.contains(field)) {
containsReadCol = true;
break;
}
}
if (!containsReadCol) {
return false;
}
}
// Do not drop a file based on read-column intersection. For data-evolution
// tables a field absent from a file is an implicit NULL across rowCount()
// rows, and predicates such as `new_col IS NULL` should still match those
// rows. Predicate-based stats pruning runs in
// filterByStats(List<ManifestEntry>), which evolves stats per file via
// DataEvolutionRow / DataEvolutionArray and correctly reports missing
// fields as null.

// If rowRanges is null, all entries should be kept
if (this.rowRangeIndex == null) {
Expand Down
Loading