Skip to content

feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366

Draft
schenksj wants to merge 81 commits into
apache:mainfrom
schenksj:contrib-delta-direct
Draft

feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366
schenksj wants to merge 81 commits into
apache:mainfrom
schenksj:contrib-delta-direct

Conversation

@schenksj
Copy link
Copy Markdown

@schenksj schenksj commented May 19, 2026

Briefing

This PR lands a native Delta Lake scan for Comet. It supersedes #3932 — the
SPI/registry design discussed there was rejected in favor of the Iceberg-style
contrib pattern this PR uses (typed proto variant + ~40 lines of feature-gated
core touchpoints + standalone contrib/delta/ tree). Default builds are
entirely unaware of this code: no SPI lookups, no ServiceLoader scans, no
contrib surface at runtime. Only when the -Pcontrib-delta Maven profile (and
parallel contrib-delta Cargo feature) is activated do the contrib classes
land on the classpath and the reflection bridge resolve.

The integration reads Delta metadata via delta-kernel-rs on the driver,
encodes the resolved file list (with column mappings, DV info, partition
values, row-tracking baseRowId) into a typed OpStruct::DeltaScan proto, and
executes via DataFusion's parquet reader on each executor.

Coverage

Supported, fully native (broad):

  • Deletion vectors — kernel resolves the bitmap on the driver, DeltaDvFilterExec filters rows on executors. DV filter is chained AFTER synthetic emission (so row_index reflects original file positions) when both are needed
  • Column mapping both name AND id mode. name rewrites logical→physical names in the planner; id translates Delta's delta.columnMapping.id to parquet's PARQUET:field_id on every StructField (including nested struct/array/map) so the parquet reader matches by ID
  • Type widening — DataFusion's parquet schema adapter handles the read-time cast
  • Row tracking — supported in three modes:
    • Materialised: rewritten to read the physical _row-id-col-<uuid> column from parquet
    • Unmaterialised: row_id = base_row_id + physical_row_index per file, all synthesised natively — base_row_id is emitted as a per-file Int64 constant from AddFile.baseRowId and _row-id-col-<uuid> is emitted as all-NULL so Delta's GenerateRowIDs Project falls back to the computed expression
    • Mixed within one query: the wrapped exec's emit order matches scan.requiredSchema ordinal-by-ordinal so the upstream Filter(__delta_internal_is_row_deleted = 0) binds correctly
  • Native synthesis of Delta's internal columns __delta_internal_row_index / __delta_internal_is_row_deleted for UPDATE/DELETE/MERGE flows. is_row_deleted is emitted as Int8 (matching Delta's ByteType) to avoid DataFusion's interval-propagator panicking on Int32 vs Int8 mismatches in stats pushdown
  • Native synthesis of Spark _metadata.* virtual columns (file_path / file_name / file_size / file_block_start / file_block_length / file_modification_time) detected from scan.output even when not in scan.requiredSchema
  • Output column reorder when synthetics aren't already a canonical suffix — proto carries final_output_indices, native dispatcher wraps with a ProjectionExec so downstream operators that bind by ordinal don't silently misread one synthetic as another
  • General-purpose Parquet field-ID matching when spark.sql.parquet.fieldId.read.enabled=true (same wiring as CM-id)
  • Partition pruning, including DPP (resolved after AQE broadcasts are ready)
  • Predicate pushdown into parquet (with synthetic-column filters excluded — those are handled by the upstream Filter after synthetic emission)
  • Multi-task-per-partition packing for cluster utilisation
  • input_file_name() and friends — one-task-per-partition + a per-task InputFileBlockHolder hook in CometExecRDD + CometDeltaNativeScanExec plumbs per-partition file paths through to the RDD
  • FAILED_READ_FILE.NO_HINT exception wrapping with file path
  • Encryption that routes through the shared CometParquetUtils config check
  • _delta_log, _change_data, and _commits parquet reads via the same scan
  • S3A Hadoop credential chain (SimpleAWS / TemporaryAWS / AssumedRole / IAMInstance) resolved Scala-side at planning time so kernel log replay authenticates under the same chain as data reads. Reflective lookup against S3AUtils.createAWSCredentialProviderList; cached Method handles
  • checkLatestSchemaOnRead=false — our path is pinned to a single snapshot version via extractSnapshotVersion(relation) so the Delta-side at-read check doesn't apply to us
  • Live snapshot refresh — deltaLog.update(stalenessAcceptable=false) + snapshot.filesForScan(...).files for PreparedDeltaFileIndex so DV descriptors written after the FileIndex was constructed get picked up
  • All public S3 / Azure / GCS / OSS schemes; local file://

Falls back to Spark's reader (with withInfo reason surfaced in explain-fallback):

Correctness fallbacks — load-bearing, do not remove:

  • DV materialisation failure (DV file missing / unsupported version / read error) — kernel can't give us the deleted-row indexes, so we can't filter
  • Reflective AddFile extraction failure — no file list, nothing to scan
  • Kernel-rs log-replay error — same
  • Phase 6 reader-feature gate — currently an empty list; defense-in-depth for any future kernel-rs return of unsupported feature names

Shared Comet limits (apply to any native scan, not Delta-specific) — each is its own per-case work in core:

  • Encryption with unsupported KMS config — shared CometParquetUtils.isEncryptionConfigSupported
  • Custom Hadoop FS schemes (fake:// etc.) — object_store has no Hadoop FS plugin layer; would need a bridge
  • CometScanTypeChecker rejections (ShortType under default config, string collation, variant struct) — each is a Comet-core feature gap, not a Delta-contrib problem. Variant in particular: arrow-rs has parquet-variant crates but Comet hasn't integrated them yet

External:

  • TahoeLogFileIndexWithCloudFetch — Databricks-proprietary file index, not in OSS Delta. Defensive guard for DBR users only

Workaround tracked upstream:

  • CreateArray with mismatched element types — caller-side decline for apache/datafusion#22366. Removable once upstream lands

User off-switches:

  • spark.comet.scan.deltaNative.enabled=false, spark.comet.exec.enabled=false

Shape

Layer Path Lives in
Typed proto variant delta_scan = 117 native/proto/src/proto/operator.proto Core
Reflection bridge spark/.../comet/rules/DeltaIntegration.scala Core
Scan-rule arm spark/.../comet/rules/CometScanRule.scala Core (one block)
Exec-rule arm spark/.../comet/rules/CometExecRule.scala Core (one case)
PlanDataInjector.opStructCase spark/.../sql/comet/operators.scala Core (one method)
Per-partition file paths CometExecRDD, CometNativeScanExec, CometExecIterator, ShimSparkErrorConverter Core (load-bearing for input_file_name() and FAILED_READ_FILE.NO_HINT wrapping in any native scan)
Native dispatcher arm (DV / synthetic / reorder / CM-id) contrib/delta/native/src/core_glue.rs (compiled into core via #[path]; see "Why the dispatcher file lives in contrib but compiles in core" below) Compiled into core, file homed in contrib
Delta scan rule, exec, serde contrib/delta/src/main/scala/... Contrib
Kernel-rs engine + cache, scan, DV filter, synthetic columns, planner contrib/delta/native/src/*.rs Contrib
Maven profile, Cargo feature spark/pom.xml, contrib/delta/native/Cargo.toml, native/core/Cargo.toml Build
Build-gate verification dev/verify-contrib-delta-gate.sh Build
Regression harness contrib/delta/dev/run-regression.sh + dev/diffs/delta/4.1.0.diff Contrib

Key design decisions

Iceberg-style contrib, not SPI. Static helper objects with stable names
(DeltaScanRule.transformV1IfDelta, CometDeltaNativeScan.MODULE$); a single
reflection bridge in core resolves and caches Method handles once per JVM.
No registry, no ServiceLoader, no extension points beyond what core already
exposes. The contrib is just classpath-or-not.

Typed proto, not an envelope. OpStruct::DeltaScan is a first-class
variant. Avoids the ContribOp { kind, payload } envelope discussed in #3932;
PlanDataInjector keys by OpStructCase for O(1) dispatch.

Split-mode plan serialization. CometDeltaNativeScan.convert emits a
DeltaScan proto with the common block only (schemas, table root, filters);
each partition's tasks ride in a per-partition byte array via
PlanDataInjector at execution time. Avoids closure-capturing every file in
every partition.

Native synthetic-column synthesis. DeltaSyntheticColumnsExec (in
contrib/delta/native/src/synthetic_columns.rs) emits the standard four
Delta internals (__delta_internal_row_index as Int64, __delta_internal_is_row_deleted
as Int8, row_id, row_commit_version) PLUS Spark _metadata.* virtual
columns PLUS row-tracking-specific synthetics (base_row_id per-file
constant from AddFile.baseRowId, _row-id-col-<uuid> / _row-commit-version-col-<uuid>
as NULL-filled). When emit is on, each file gets its own FileGroup so the
per-file row offset / baseRowId arithmetic is well-defined.

Synthetic-suffix ordering matters. The wrapped exec's output ordering is
checked against scan.requiredSchema AND the canonical native emit order. If
the synthetic block isn't already in canonical order at the right ordinals,
the proto carries final_output_indices and the native dispatcher wraps with
a ProjectionExec to reorder. Without this, an upstream
Filter(__delta_internal_is_row_deleted = 0) binding by ordinal would silently
misread row_index as is_row_deleted (caught and fixed mid-PR; the
DV-after-DELETE test bisected the bug to a one-ordinal swap).

DV filter chained after synthetic emission, not mutually exclusive. When
both synthetics and a DV are present, we previously chose one wrapper or the
other — which meant any read that surfaced _tmp_metadata_row_index got
NO DV filtering applied. The wrappers are now chained:
parquet → DeltaSyntheticColumnsExecDeltaDvFilterExec (skipped when
emit_is_row_deleted is on so UPDATE/DELETE/MERGE writers still see every row).

CM-name rename before synthetics. Synthetic columns have fixed names
(never CM-renamed) and are appended AFTER the parquet read; the rename
projection has to apply to the parquet output BEFORE the append so the
length-match check works correctly.

Spark _metadata.* driven from scan.output, not just scan.requiredSchema.
Delta's PreprocessTableWithDVs strategy can append _metadata.file_path to
scan.output without putting it in scan.requiredSchema. The synthetic
exec detects these from scan.output so the wrapped exec's output schema
includes them and downstream attribute resolution works.

is_row_deleted is Int8, not Int32. Delta declares the column as
ByteType. Emitting Int32 trips DataFusion's interval propagator with
Only intervals with the same data type are intersectable, lhs:Int32, rhs:Int8
whenever the upstream Filter pushes stats. Caught by the CM + DV combined
coverage test.

InputFileBlockHolder thread-local hook in CometExecRDD.compute.
Comet's native scans bypass Spark's FileScanRDD, so the standard
input_file_name() thread-local would otherwise be empty for any native
scan (not just Delta). One small but load-bearing core change that fixes
both Delta's UPDATE/DELETE/MERGE flows AND the FAILED_READ_FILE.NO_HINT
error wrapping. CometDeltaNativeScanExec plumbs its per-partition file
paths through to CometExecRDD so InputFileBlockHolder.set(path) fires
correctly.

Live snapshot refresh on PreparedDeltaFileIndex. preparedScan.files
caches the AddFile list at FileIndex construction time. Consecutive DML
on the same path (e.g. two DELETEs in the same SparkSession) yields stale
DV descriptors when the FileIndex is reused. DeltaReflection.refreshedSnapshotFiles
calls deltaLog.update(stalenessAcceptable=false) then
snapshot.filesForScan(Nil, false).files to pick up the latest descriptors,
falling back to the cached preparedScan.files if reflection fails.

Engine cache by (scheme, authority, DeltaStorageConfig). kernel-rs's
DefaultEngine<TokioBackgroundExecutor> spawns one OS thread per executor.
Without caching, hundreds of scans/min was leaking threads faster than tokio
reaped them, tripping pthread_create EAGAIN ~2h into regression. The cache
bounds live thread count by table-storage diversity instead of by request
count.

DV filter ordering safeguards. DeltaDvFilterExec tracks
current_row_offset across batches, which assumes physical-order input.
Overrides maintains_input_order() = [true] and
benefits_from_input_partitioning() = [false] so any future optimizer that
wants to insert a RepartitionExec is forced to bail rather than silently
re-order rows.

One new core trait method. PlanDataInjector.opStructCase is the only
core trait addition. It keys the existing injector map for O(1) dispatch.

Why the dispatcher file lives in contrib but compiles in core

contrib/delta/native/src/core_glue.rs is physically co-located with the
rest of the Delta integration but is compiled as a module of the core crate
via #[cfg(feature = "contrib-delta")] #[path = "../../../../contrib/delta/native/src/core_glue.rs"] mod contrib_delta_scan;. The reason: this file implements
PhysicalPlanner::plan_delta_scan and reaches into core's pub(crate)
helpers (create_expr, init_datasource_exec,
prepare_object_store_with_configs). A true cross-crate impl block is
forbidden by Rust, and a contrib → core cargo dependency would create a
cycle with core's optional contrib-delta dep on contrib, so #[path] is
the available tool that lets the FILE's home be with Delta while its
COMPILATION unit stays in core. Build gate (cfg(feature = "contrib-delta"))
is preserved exactly — default builds carry zero Delta surface (see
"Validation" below).

Audit of remaining Delta references in core

After moving the dispatcher body into contrib/, every Delta reference left
in native/core/src/ is either feature-gated or a structural one-line arm
in an exhaustive match OpStruct:

File Reference Why it's there
planner.rs:33-35 mod contrib_delta_scan; The #[path]-relocated module declaration. #[cfg(feature = "contrib-delta")].
planner.rs:1512-1527 OpStruct::DeltaScan dispatcher arm Both halves feature-gated. Default-build half returns "Received a DeltaScan operator but core was built without the contrib-delta Cargo feature" so a misconfigured driver gets a clear error.
jni_api.rs:op_name OpStruct::DeltaScan(_) => "DeltaScan" Exhaustive enum match; returns a string for tracing. No contrib logic.
planner/operator_registry.rs:to_operator_type OpStruct::DeltaScan(_) => None Exhaustive enum match; signals "not in OperatorType enum". No contrib logic.

OpStruct is a proto-generated enum (in datafusion-comet-proto); Rust
requires exhaustive matches everywhere it appears. Keeping the structural
arms un-gated is intentional — it lets default builds identify a misrouted
DeltaScan operator by name in the error message.

Validation

The build gate is enforced by dev/verify-contrib-delta-gate.sh, which runs
6 independent checks across 3 layers and exits non-zero on the first
failure. Designed to be wired into CI.

# Requires a JDK ≥17 on PATH (and as JAVA_HOME for the Maven sub-runs).
dev/verify-contrib-delta-gate.sh

What the script asserts:

Layer Check
Cargo cargo tree -p datafusion-comet --no-default-features has zero comet-contrib-delta / delta_kernel entries
Cargo cargo tree -p datafusion-comet --features contrib-delta correctly pulls both (catches accidental off)
Maven mvn -Pspark-4.1 dependency:list has zero io.delta:* deps
Maven mvn -Pspark-4.1,contrib-delta dependency:list correctly pulls io.delta:delta-spark
Maven Default test-compile produces no org/apache/comet/contrib/**.class and no CometDeltaNativeScan* / DeltaScanRule* / DeltaReflection* classes (only the always-present DeltaIntegration reflection bridge)
Native Default libcomet.dylib is meaningfully smaller (~57 MB delta on macOS arm64 debug build) AND has zero comet_contrib_delta / delta_kernel / DeltaDvFilter* / DeltaSynthetic* external symbols

Current run on this branch: all 6 PASS.

Running the contrib Scala test suite

49 tests across four suites (24 coverage + 25 feature/native/column-mapping):

# JDK 17, contrib + spark-4.1 profiles
JAVA_HOME=$(/usr/libexec/java_home -v 17) \
  mvn -Pspark-4.1,contrib-delta -pl spark -am test \
    -Dsuites='org.apache.comet.contrib.delta.CometDeltaFeaturesSuite,
              org.apache.comet.contrib.delta.CometDeltaNativeSuite,
              org.apache.comet.contrib.delta.CometDeltaColumnMappingSuite,
              org.apache.comet.contrib.delta.CometDeltaCoverageSuite' \
    -Djava.version=17 -Dmaven.compiler.source=17 -Dmaven.compiler.target=17

Current run: 49/49 pass.

CometDeltaCoverageSuite is the accelerator-coverage matrix — each test
asserts BOTH (a) the executed plan contains CometDeltaNativeScanExec
(actually engaged, no silent fall-back) AND (b) the rows match vanilla
Spark+Delta exactly. Covers: SELECT */column-prune/arithmetic/LIMIT/DISTINCT,
filters (eq/neq/IN/IS NULL/BETWEEN/LIKE/AND/OR/NOT), ORDER BY, aggregates
(count/sum/avg/min/max/GROUP BY/HAVING/COUNT DISTINCT), joins
(self/inner/left/leftsemi/leftanti), set ops (UNION/INTERSECT/EXCEPT),
window functions, scalar + IN subqueries, CTEs, partition-pruned reads,
column-mapping reads, DV-bearing reads, nested data (struct/array/map).

Running the contrib Rust test suite

cargo test -p comet-contrib-delta
# Plus the integration tests that exercise plan_delta_scan against a
# real parquet + _delta_log tree:
cargo test -p datafusion-comet --features contrib-delta

What the in-PR validation looks like end-to-end

  1. dev/verify-contrib-delta-gate.sh — proves default builds carry zero Delta surface.
  2. Contrib Scala suite (4 suites, 49 tests) — proves accelerator engages and matches vanilla across the SQL surface area.
  3. Contrib Rust unit + integration tests — proves the kernel-rs engine cache, DV filter, synthetic columns, predicate, and CM-rewriter behave correctly in isolation.
  4. Full Delta 4.1 regression (contrib/delta/dev/run-regression.sh against dev/diffs/delta/4.1.0.diff) — proves we don't regress anything in Delta's own test suite.

Review strategy

Suggested order with different bars:

  1. Core touchpoints (~10 minutes, high bar). New core surface area is
    small but ships in default builds:

    • native/proto/src/proto/operator.proto (one OpStruct variant + DeltaScan messages)
    • The dispatcher arm in native/core/src/execution/planner.rs:1512-1527 (the actual body lives in contrib/delta/native/src/core_glue.rs; see "Why the dispatcher file lives in contrib but compiles in core" above)
    • spark/.../comet/rules/DeltaIntegration.scala (whole file — reflection bridge)
    • The new arm in CometScanRule.transformV1Scan and the new case in CometExecRule.transform
    • CometExecRDD + CometExecIterator + CometNativeScanExec diffs (per-partition file paths, InputFileBlockHolder hook)
    • ShimSparkErrorConverter.wrapNativeParquetError
    • spark/.../comet/serde/arrays.scala (CreateArray decline — references the upstream issue)
  2. Contrib Scala (~30 minutes, contrib bar):

    • DeltaScanRule.scala — entry point, gates documented under "Coverage" above
    • CometDeltaNativeScan.scala — split serde, kernel-rs call, task prune/split/pack, column-mapping fixup, synthetic-column detection + suffix reorder, CM-id field-ID translator, S3A credential chain resolution
    • CometDeltaNativeScanExec.scala — exec wrapper, DPP partition pruning, metric reporting, per-partition file paths plumbed to InputFileBlockHolder
    • DeltaPlanDataInjector.scala, DeltaInputFileBlockHolder.scala — small
    • DeltaReflection.scala — reflection bridge into Delta internals (incl. refreshedSnapshotFiles for snapshot staleness)
    • RowTrackingAugmentedFileIndex.scala — small
    • CometDeltaCoverageSuite.scala — the accelerator-coverage matrix
  3. Contrib Rust (~30 minutes, contrib bar):

    • contrib/delta/native/src/engine.rs — kernel-rs engine + cache
    • contrib/delta/native/src/scan.rsplan_delta_scan, DV row-index resolution, extract_row_tracking_for_selected (reads fileConstantValues from raw RecordBatch)
    • contrib/delta/native/src/synthetic_columns.rsDeltaSyntheticColumnsExec (emits row_index Int64 + is_row_deleted Int8 + row_id + row_commit_version + Spark _metadata.* + row-tracking synthetics; per-batch row offset counter; DV-walk for is_row_deleted)
    • contrib/delta/native/src/dv_filter.rsDeltaDvFilterExec (chained after synthetic emission when DV+synthetics both needed)
    • contrib/delta/native/src/planner.rsbuild_delta_partitioned_files, SessionTimezone, ColumnMappingFilterRewriter
    • contrib/delta/native/src/core_glue.rs — the in-core dispatcher body (homed here, compiled into core via #[path])
    • contrib/delta/native/src/jni.rsplanDeltaScan JNI entry
  4. Build / regression infra (~5 minutes):

    • spark/pom.xml -Pcontrib-delta profile
    • native/core/Cargo.toml contrib-delta feature
    • contrib/delta/native/Cargo.toml (standalone, not in workspace — intentional to avoid arrow-57 / arrow-58 cross-contamination)
    • dev/verify-contrib-delta-gate.sh — build-gate enforcement
    • contrib/delta/dev/run-regression.sh + dev/diffs/delta/4.1.0.diff

git log --oneline main..HEAD is also a useful walk — commits are labeled by
phase (P7a..P7z) and each commit message documents the specific concern it
addresses. Two prior comprehensive reviews are reflected in commits 43768c1c
(first review) and 2d13a147 (review of the gate-unblock work).

Follow-ups (not in this PR)

  • Variant type native support — arrow-rs has parquet-variant crates but Comet hasn't integrated them; would unblock CometScanTypeChecker.isVariantStruct for all native scans
  • String collation native support in expression evaluators
  • ProjectionExec column-mapping rename pushdown into ParquetSource's schema adapter (perf item from in-PR sweep)
  • Engine cache TTL / credential-rotation eviction (fine for validation; would block long-lived production drivers using STS)
  • Filter-rewriter linear field lookup → name→index HashMap (perf audit item; per-filter not per-batch)
  • Extract a ContribPlannerCtx trait in a small shared crate so the core_glue.rs body can compile in the contrib crate proper (eliminates the #[path] indirection at the cost of a new crate). Tracked as a separate task.

Test plan

  • Default builds (no -Pcontrib-delta): mvn -pl spark -am test-compile green
  • -Pcontrib-delta builds green (Maven + Cargo)
  • dev/verify-contrib-delta-gate.sh passes all 6 build-gate checks
  • Contrib Scala test suite: 49/49 pass across CometDeltaFeaturesSuite / CometDeltaNativeSuite / CometDeltaColumnMappingSuite / CometDeltaCoverageSuite
  • Contrib Rust unit tests pass
  • Two comprehensive code reviews completed; both rounds of findings addressed
  • Targeted retest of every cluster surfaced during validation, all pass:
    • DescribeDeltaHistorySuite "replaceWhere on data column" — 8/8
    • DeltaTableHadoopOptionsSuite "dropFeatureSupport - with filesystem options" — 1/1
    • SnapshotManagementSuite "should not recover when the current checkpoint is broken..." — 2/2
    • DeltaColumnMappingSuite "physical data and partition schema" + "write/merge df to table" (CM-id + CM-name) — 2/2
  • Engine-cache fix verified end-to-end (no more pthread_create EAGAIN)
  • Full Delta 4.1 regression with all gate-unblock commits in place
  • CI: default + -Pcontrib-delta build paths exercised + dev/verify-contrib-delta-gate.sh wired

Upstream issue

apache/datafusion#22366
filed for make_array element-type strictness. The CometCreateArray
decline in this PR is a caller-side workaround until upstream relaxes.

🤖 Generated with Claude Code

schenksj and others added 19 commits May 18, 2026 20:01
Initial scaffolding for the direct Delta integration that replaces the
generic contrib SPI proposed in apache#4339. Mirrors Iceberg's pattern:

  - native/proto/src/proto/operator.proto: typed `DeltaScan delta_scan = 117`
    variant on `OpStruct`, with the six message definitions (DeltaScanCommon,
    DeltaScan, DeltaScanTask, DeltaPartitionValue, DeltaScanTaskList,
    DeltaColumnMapping) inlined next to the IcebergScan group. Field numbers
    preserved from the contrib-delta-pr2 branch.

  - native/core/src/execution/planner.rs: unconditional `OpStruct::DeltaScan`
    dispatcher arm with feature-gated body. Default builds return a clear
    "rebuild with --features contrib-delta" error; the feature-on arm is a
    `todo!` stub today and gets filled in as the implementation ports over.

  - native/core/src/execution/jni_api.rs + planner/operator_registry.rs: extend
    the existing `OpStruct` match sites so default builds compile exhaustively.

  - native/core/Cargo.toml: new optional `contrib-delta` feature backed by an
    optional path dep on `comet-contrib-delta`. Default builds carry zero Delta
    surface (verified: `cargo check` builds clean without the feature, and the
    Delta crate is not in the workspace `members` list).

  - native/Cargo.toml: explicit `exclude = ["../contrib"]` so the workspace
    doesn't try to absorb the contrib crate (which would fail -- workspace
    members must live hierarchically under the workspace root).

  - contrib/delta/native/{Cargo.toml,src/lib.rs}: skeleton crate that re-exports
    the typed Delta proto messages so contrib-internal code has a stable short
    alias. Real implementation (kernel-rs log replay, DV filter, column
    mapping, partition parsing) ports over from contrib-delta-pr2 in follow-up
    commits.

Build verification:
  cargo check  -p datafusion-comet                        # default: green
  cargo check  -p datafusion-comet --features contrib-delta # green

This addresses Parth's review on apache#4339: ~40 lines of core touchpoints all
behind a feature gate, no SPI/registry/traits/runtime dispatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Brings the working delta-kernel-rs integration over from contrib-delta-pr2
without the contrib SPI plumbing Parth flagged on apache#4339.

contrib/delta/native/:
  - jni.rs, scan.rs, engine.rs, error.rs, predicate.rs, dv_filter.rs --
    ported verbatim from contrib-delta-pr2 (only crate::proto::* import paths
    needed adjustment, handled via lib.rs re-export of the typed messages
    that now live in core's proto crate)
  - planner.rs -- Delta-specific helpers (build_delta_partitioned_files,
    parse_delta_partition_scalar with the DATE -> TIMESTAMP_NTZ widening
    fallback already inlined, ColumnMappingFilterRewriter) exposed as
    pure-DataFusion functions that core's dispatcher arm composes onto the
    standard parquet datasource path. NO ContribOperatorPlanner trait, NO
    ContribPlannerContext, NO ParquetDatasourceParams -- the contrib crate
    is now a plain library with public functions.
  - lib.rs -- module decls + a `pub mod proto` re-export of the six typed
    Delta messages from `datafusion_comet_proto::spark_operator`. No
    `#[ctor]` and no `register_contrib_planner` call.
  - Cargo.toml -- standalone (outside the native/ workspace root), no
    comet-contrib-spi dep, all delta-specific deps stay confined here.

native/core/src/execution/planner/contrib_delta_scan.rs (new):
  - `PhysicalPlanner::plan_delta_scan` -- the `OpStruct::DeltaScan` arm body
    extracted into its own file (~210 lines, mirrors `OpStruct::IcebergScan`
    in size and shape). Gated `#[cfg(feature = "contrib-delta")]`; calls
    core's `init_datasource_exec`, `prepare_object_store_with_configs`,
    `convert_spark_types_to_arrow_schema` directly + comet-contrib-delta's
    helpers for the Delta-specific pieces.

native/core/src/execution/planner.rs:
  - `OpStruct::DeltaScan` arm: 6-line dispatcher that calls into
    `self.plan_delta_scan(...)` under `#[cfg(feature = "contrib-delta")]`.

native/core/src/parquet/parquet_exec.rs:
  - New `ignore_missing_files: bool` arg on `init_datasource_exec`.
    Threaded through to `IgnoreMissingFileSource` wrapper (ported verbatim
    from PR2's native/core/src/parquet/missing_file_tolerant.rs) which
    decorates the final FileSource so its FileOpener swallows object-store
    NotFound errors as empty streams. Matches Spark's
    `spark.sql.files.ignoreMissingFiles=true` semantics. All existing call
    sites updated to pass `false`.

Build verification (both checked clean):
  cargo check  -p datafusion-comet                          # default
  cargo check  -p datafusion-comet --features contrib-delta

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
These five files port verbatim from contrib-delta-pr2 -- they touch only
Spark APIs (via reflection) and standard Scala, none of the rejected SPI
surface:

  - DeltaConf.scala               Config keys (COMET_DELTA_NATIVE_ENABLED, ...)
  - Native.scala                  JNI bridge for planDeltaScan
  - DeltaReflection.scala         Reflective access to spark-delta internals
                                  (isDeltaFileFormat, isBatchFileIndex,
                                  extractBatchAddFiles, ...)
  - RowTrackingAugmentedFileIndex Wraps a FileIndex to inject row-tracking
                                  metadata columns
  - DeltaInputFileBlockHolder     Thread-local replacement for
                                  InputFileBlockHolder on the Delta scan path

Plus the regression infrastructure (4.1.0.diff, run-test.sh,
run-regression.sh).

The remaining four files (CometDeltaNativeScan, CometDeltaNativeScanExec,
DeltaScanRuleExtension, DeltaOperatorSerdeExtension, DeltaPlanDataInjector)
each reference the rejected SPI surface (CometOperatorSerde,
CometScanRuleExtension, ContribOp envelope, PlanDataSource, PlanDataInjector).
Those need rewriting before they can compile against main -- queued as the
next commit on this branch:
  - drop the `extends CometOperatorSerde[CometScanExec]` trait bound;
    expose `convert(...)` as a static method
  - replace ContribOp envelope with the typed OpStruct::DeltaScan
  - drop the SPI extension class wrappers; integrate detection directly
    into CometScanRule.scala + CometExecRule.scala (Iceberg-style)
  - bake DeltaPlanDataInjector logic directly into CometDeltaNativeScanExec

Maven `-Pcontrib-delta` profile, scalastyle wiring, and the SPI rewrite
all land together in the follow-up commit so the contrib compiles
end-to-end against main.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ction bridge

The four SPI-touching files from contrib-delta-pr2 rewritten to drop the
rejected SPI base classes and use the typed `OpStruct::DeltaScan` proto
variant directly:

  - CometDeltaNativeScan.scala  no longer `extends CometOperatorSerde`;
    plain object with `convert(scan, builder, childOp*)` static method.
    All `ContribOp` envelope wrapping replaced with
    `builder.setDeltaScan(...)`. DeltaOperator.* imports redirected to
    core's `org.apache.comet.serde.OperatorOuterClass`.
  - CometDeltaNativeScanExec.scala  no longer `with PlanDataSource`;
    public accessors (planDataSourceKey, planDataCommonBytes,
    planDataPerPartitionBytes) stay so core's CometExecRDD can read them
    directly. `nativeOp.getContribOp.getPayload` calls collapse to the
    typed `nativeOp.getDeltaScan` accessor.
  - DeltaScanRule.scala  was `class DeltaScanRuleExtension extends
    CometScanRuleExtension`; now a plain `object DeltaScanRule` with a
    single static entry point `transformV1IfDelta(plan, session,
    scanExec, relation): Option[SparkPlan]`. The private
    `CometScanRule.isSchemaSupported` is unreachable from contrib, so
    inline the equivalent check (CometScanTypeChecker + fallback-reason
    emission).
  - The DeltaOperatorSerdeExtension + DeltaPlanDataInjector files are
    not ported -- their roles fold into the next commit's CometExecRule
    Delta serde dispatch and into CometDeltaNativeScanExec respectively.

Core wiring:
  - spark/pom.xml: new `<profile id="contrib-delta">` adds
    contrib/delta/src/main/scala/ as a compile source on comet-spark and
    pulls in `io.delta:delta-spark_2.13:4.1.0` at provided scope.
  - CometScanRule.scala: 5-line Delta detection block at the head of
    `transformV1Scan`'s HadoopFsRelation case (Iceberg-style; calls into
    `DeltaIntegration.transformV1IfDelta` which is a no-op when the
    contrib isn't bundled).
  - DeltaIntegration.scala (new): reflection bridge that resolves the
    contrib's `DeltaScanRule` + `CometDeltaNativeScan` companion objects
    by class name. Default builds get `None`; -Pcontrib-delta builds get
    a working delegate. No SPI / ServiceLoader / registry.

Build verification:
  mvn compile                  # default: still green
  mvn compile -Pcontrib-delta  # GREEN -- this is the milestone

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tics

Spark's UnsafeRow.getUTF8String wraps bytes via UTF8String.fromAddress with
no UTF-8 validation, and cast(BinaryType -> StringType) is a zero-copy
reinterpret that leaves arbitrary bytes in a StringType column. Delta's
Z-Order uses interleave_bits(...).cast(StringType) for opaque sort keys,
which panicked Comet's strict from_utf8(...).unwrap() and cascaded into
JVM classloader errors (60+ ServiceConfigurationError tests in the
contrib-delta-pr2 regression run).

Switch to from_utf8_unchecked since the bytes flow directly into Arrow's
StringBuilder::append_value and are never introspected as a &str.

Verified on contrib-delta-pr2: OptimizeZOrderScalaSuite "interleaving"
4/4 PASS after this fix.

Pure core fix -- independent of the contrib/delta integration. Lands on
this branch because it's a prerequisite for the Delta regression to be
meaningful (without it the Z-Order panic poisons every following test).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects core's CometExecRule to the contrib's Delta scan serde so the
Delta-marker CometScanExec produced by CometScanRule flows through the
same `convertToComet(scan, handler)` path as Iceberg / NativeScan / etc.

  - CometDeltaNativeScan re-extends core's `CometOperatorSerde` trait
    (the trait itself is core, not part of the rejected extension SPI;
    every Comet operator handler implements it). `getSupportLevel` /
    `enabledConfig` / `convert` now properly override.
  - DeltaIntegration.scanHandler: a single reflective lookup exposes
    the contrib's companion as a `CometOperatorSerde[CometScanExec]`.
    Returns None on default builds.
  - CometExecRule.transform: new case beside the SCAN_NATIVE_DATAFUSION
    one that recognises the Delta scan marker (scanImpl ==
    "native_delta_compat") and dispatches via the handler.

Build verification:
  mvn compile                  GREEN
  mvn compile -Pcontrib-delta  GREEN

Still pending for end-to-end:
  - per-partition task-list injection (replaces PR2's DeltaPlanDataInjector
    SPI) -- baked into CometExecRDD via another small reflection hook
  - live smoke test once the dylib is rebuilt with --features contrib-delta
    and bundled into the jar

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects the contrib's per-partition Delta task-list serialisation into
core's existing `PlanDataInjector.injectPlanData` pipeline. Without this
the native side decodes a tasks-empty `DeltaScan` and returns `EmptyExec`
(0 rows) for every Delta scan.

  - contrib/delta/.../DeltaPlanDataInjector.scala: implements core's
    `PlanDataInjector` trait. `canInject` checks `op.hasDeltaScan` and
    rejects already-injected operators (idempotent). `inject` splices the
    partition's tasks into the operator's common-only DeltaScan envelope
    via `op.toBuilder.setDeltaScan(...)` -- pure typed-proto operations,
    no `ContribOp` envelope.
  - spark/.../operators.scala: `PlanDataInjector.injectors` Seq now
    appends the contrib injector via one reflective Class.forName lookup.
    Default builds get None (no contrib classes on classpath) so the
    list is unchanged; -Pcontrib-delta builds get the Delta injector.

Build verification:
  mvn compile -Pcontrib-delta  GREEN

End-to-end Scala+Maven integration is now complete. Remaining work:
  - rebuild native dylib with `--features contrib-delta` and bundle
    into comet-spark.jar
  - run an isolated test (e.g. OptimizeZOrderScalaSuite "interleaving")
    to confirm the end-to-end path works

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wrap Class.forName calls in `// scalastyle:off classforname`, change
Option[Class[_]] to Option[Class[AnyRef]] to avoid existential type
warnings, reword the doc comment so the verbatim string Class.forName
doesn't trip scalastyle's source-pattern check.

  mvn scalastyle:check -Pcontrib-delta  GREEN

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…th file path

CometExecIterator was wrapping native Parquet failures (e.g. corrupt-footer
errors from kernel-rs reading a broken Delta checkpoint) in `_LEGACY_ERROR_TEMP_2254`,
whose message is literally "Data read failed." -- no file path, no useful context.

That broke tests that mirror Spark/Delta's standard parquet-failure shape, e.g.
SnapshotManagementSuite "should not recover when the current checkpoint is broken"
which asserts the resulting SparkException's message contains both the file path
and "Encountered error while reading file" -- the format
`QueryExecutionErrors.cannotReadFilesError` produces.

Switch the wrapping to `cannotReadFilesError(cause, filePath)` via a new helper
on ShimSparkErrorConverter (which lives in the spark package and can reach the
private InputFileBlockHolder / QueryExecutionErrors). File path is read from
InputFileBlockHolder, with an empty-string fallback when the thread-local
isn't set; the static phrasing still satisfies the test assertion.

Pure core fix -- benefits every native parquet read, not just Delta.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DeltaTable.forPath(spark, path, fsOptions) with a Hadoop custom-fs scheme
(e.g. fake://) was being claimed by CometScanRule for V1 parquet scans on
the _delta_log/checkpoint.parquet files Delta reads internally. The native
side then crashed at executePlan with `Generic URL error: Unable to
recognise URL "fake:///..."` since object_store doesn't know the custom
scheme.

Add a scheme allowlist check (same set already used in the Iceberg branch
and the contrib Delta path) at the top of the HadoopFsRelation arm; decline
via withInfo when any rootPaths scheme is outside the allowlist so Spark's
Hadoop-FS-aware reader handles the scan.

Fixes DeltaTableSuite "dropFeatureSupport - with filesystem options" and is
also a baseline fix (the same crash reproduces on main per
full-20260415-222735.log).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each `plan_delta_scan` JNI call was creating a fresh `DefaultEngine`. Kernel's
`DefaultEngine<TokioBackgroundExecutor>` spawns one std::thread per executor that
hosts a current_thread tokio runtime, and that runtime's blocking pool (used by
kernel for parquet metadata IO and object_store reads) keeps `spawn_blocking`
worker threads alive for ~10s after each task. Under regression load (hundreds
of Delta scans/minute, each spawning a handful of blocking IO tasks) this
accumulates OS threads faster than tokio reaps them, eventually hitting the
per-process `ulimit -u` (~1300 on macOS) — visible in the log as
`pthread_create EAGAIN` aborts of GenerateIdentityValuesSuite and
MergeIntoUnlimitedMergeClausesScalaSuite ~2 hours into the run.

Replace the per-call `create_engine` with `get_or_create_engine` that returns
an `Arc<DeltaEngine>` from a static cache keyed by `(scheme, authority,
DeltaStorageConfig)`. Engines are constructed lazily on first miss per key and
reused for the lifetime of the JVM, bounding live OS threads by table-storage
diversity rather than by request count. The standalone `create_engine` is kept
(behind `#[allow(dead_code)]`) for tests that want a fresh engine.

`scan.rs` updated to deref `Arc<DeltaEngine>` to `&dyn Engine` at each kernel
call (`builder.build`, `scan.scan_metadata`, `dv.get_row_indexes`).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DataFusion's `make_array_inner` asserts strict element-type equality (down to
nested field nullability) via `MutableArrayData::with_capacities`. Spark's
`CreateArray` is more permissive: when the analyzer doesn't insert coercion
casts, children can share the same surface struct type but disagree on a
nested field's nullability. Delta's CDF write path builds
`array(struct(id, b, _change_type=lit("delete")), struct(id, b, _change_type=col))`
manually -- one arm's `_change_type` is `Utf8` non-nullable (from a literal),
another is `Utf8` nullable -- and Comet's native serde happily emitted a
`make_array` call. Native execution then panicked:

  assertion `left == right` failed: Arrays with inconsistent types passed to
  MutableArrayData
   left: Struct([..., Field { name: "_change_type", data_type: Utf8 }])
  right: Struct([..., Field { name: "_change_type", data_type: Utf8, nullable: true }])

Decline in `CometCreateArray` when `children.map(_.dataType).distinct.size > 1`
so the JVM evaluator (which doesn't have this strictness) handles it. Fixes 4
`DescribeDeltaHistorySuite "replaceWhere on data column ... enableCDF=true"`
failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… kind

Two perf-sweep items from apache#135:

apache#7 parse_delta_partition_scalar TZ parse-once. The per-row
chrono_tz::Tz::from_str (or fixed-offset parse) was happening inside
parse_delta_partition_scalar for every TIMESTAMP partition value, but the
session TZ string doesn't change within a scan. Introduce SessionTimezone
enum (Tz | Offset | Invalid), parse once in build_delta_partitioned_files,
pass the parsed value through. parse_delta_partition_scalar's signature gains
&SessionTimezone and keeps session_tz: &str only for the error message.

apache#2 PlanDataInjector lookup by op kind. injectPlanData was running
`for (injector <- injectors if injector.canInject(op))` against every
operator in the tree; for a 50-op plan with 3 injectors that's 150
canInject calls just to find no match on most ops. Add `opStructCase` to the
PlanDataInjector trait, build a Map[OpStructCase, PlanDataInjector] once at
object init, and look up by op.getOpStructCase before any canInject call.
Iceberg/NativeScan/Delta injectors set their own opStructCase.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tion

Perf-sweep #1 from apache#135. `DeltaIntegration.transformV1IfDelta` is invoked for
every V1 scan in every plan (the bridge is called unconditionally by
CometScanRule before the contrib's own Delta-format check). On
-Pcontrib-delta builds each call was doing `getField MODULE$` +
`getMethod("transformV1IfDelta", ...)` + 4-arg Method.invoke -- a reflection
round-trip per scan.

Cache the resolved (module, method) binding once per JVM as
`transformV1IfDeltaBinding: Option[(AnyRef, Method)]`, single OnceLock-style
volatile. Steady-state per-scan cost drops to one volatile read + one
Method.invoke.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Perf-sweep apache#5 from apache#135. `isSchemaCometCompatible` was allocating a fresh
CometScanTypeChecker(CometDeltaNativeScan.ScanImpl) on every scan. The
checker is stateless w.r.t. its scanImpl tag and is safe to share. Promote
it to a private val on DeltaScanRule; the per-scan fallback-reasons
ListBuffer remains per-call (it's the only mutable input).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…artitioned_files

Perf-audit apache#137 finding #1. The inner `partition_schema.fields()` loop was
calling `.iter().find()` on `task.partition_values` for every field --
O(width × values) per task. Pre-build a per-task HashMap<&str, &str> once,
then O(1) gets. The map is reused across tasks via clear() so the allocation
amortises across all DeltaScanTasks in the scan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SnapshotManagementSuite "should not recover when the current checkpoint is
broken..." asserts the wrapped FAILED_READ_FILE.NO_HINT SparkException message
contains the file path (e.g. "0001.checkpoint"). de9e0d3 got the error class
right but left the path empty because:

  1. Comet's native scan path does NOT go through Spark's FileScanRDD, so the
     standard InputFileBlockHolder thread-local is never populated.
  2. ShimSparkErrorConverter.wrapNativeParquetError was reading from
     InputFileBlockHolder, getting null, and passing "" to
     cannotReadFilesError -- producing "Encountered error while reading file . "
     (with the empty path), which the test rejected.

Plumb per-partition file paths from CometNativeScanExec (where they're known
at planning time) -> CometExecRDD -> CometExecPartition -> CometExecIterator
-> wrapNativeParquetError. CometNativeExec.doExecuteColumnar (the actual call
site that constructs the iterator for query trees with a scan) collects file
paths from any CometNativeScanExec leaves and passes them through the same
CometExecRDD parameter.

Verified with a /tmp/cometdiag.log file sentinel that the existing logWarning
diags were being silently dropped by the test's `quietly { ... }` block,
which is why my earlier "the wrap isn't being reached" conclusion was wrong.

Test results after fix: SnapshotManagementSuite checkpoint-broken 2/2 PASS
(was 0/2 with empty path). The other 3 fix clusters
(de9e0d3+effe5f76+56c2b011) continue to pass: replaceWhere CDF 8/8,
dropFeatureSupport 1/1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…+ safeguards

Five fixes from the comprehensive code review of contrib-delta-direct:

1. Implement the missing InputFileBlockHolder hook in CometExecRDD.compute.
   Several docs referenced `CometExecRDD.setInputFileForDeltaScan` but no such
   method existed and nothing called `DeltaInputFileBlockHolder.set`, leaving
   Delta's UPDATE/DELETE/MERGE flows (which use `input_file_name()` to find
   touched files) silently looking at an empty path. Now set the thread-local
   to the partition's first file (one-per-partition is enforced by
   DeltaScanRule when input_file_name() is referenced), unset on task
   completion. Stale doc references updated to point at the real call site.

2. DV filter ordering safeguards. DeltaDvFilterExec's `current_row_offset`
   tracking assumes physical row ordering from the parquet scan. Override
   `maintains_input_order() = [true]` and
   `benefits_from_input_partitioning() = [false]` so any future optimizer
   that wants to insert a RepartitionExec / SortPreservingMergeExec is
   forced to bail rather than silently re-order rows.

3. Tighten IgnoreMissingFileSource's `is_not_found` Display fallback. The
   prior `msg.contains("not found")` would match unrelated parquet messages
   like "row group statistics not found" or "page index not found" and
   silently swallow them as missing-file (returning empty results instead
   of failing). Restrict to recognised NotFound prefixes from object_store /
   S3 / FS error formats.

4. Multi-line regex for native parquet errors in CometExecIterator. Native
   parquet errors with embedded newlines (e.g. footer hex dumps) would slip
   past the single-line `^Parquet error: .*$` and surface as bare
   CometNativeException. Add `(?s)` so `.` spans newlines.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The post-review fixes added/modified scaladoc that broke spotless line-length
rules. Apply spotless:apply across the three touched files. Verified with
test-compile.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
schenksj and others added 10 commits May 19, 2026 14:28
Adds a TODO note linking the decline-and-fallback to
apache/datafusion#22366. Lets a future maintainer find the upstream fix
when it lands and remove the workaround.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…log replay

Closes the P1 credential-asymmetry gap carried from apache#3932 (commit 461fa4f).
Previously the kernel-rs log-replay path's DeltaStorageConfig only honored
explicit static keys (`fs.s3a.access.key` / `fs.s3a.secret.key` /
`fs.s3a.session.token`) set in core-site.xml. Users running under
SimpleAWSCredentialsProvider / TemporaryAWSCredentialsProvider /
AssumedRoleCredentialProvider / IAMInstanceCredentialsProvider would see
data-file reads authenticate (those go through Comet's existing native
`build_credential_provider`) but log replay fail.

Resolution happens Scala-side via reflection against
`org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderList` -- the
same Hadoop credential machinery Spark uses everywhere else. The resolved
(access_key, secret_key, session_token) tuple is stuffed into the
`storageOptions` map under the standard Hadoop keys before the JNI call.
Reflective because hadoop-aws is an optional dep; absence falls through to
static-only behavior (any user without S3 stays unaffected).

Architecture note: an in-crate cherry-pick of 461fa4f wasn't viable here
because the JNI lives in `contrib/delta/native/` -- a standalone Cargo
crate that deliberately doesn't depend on core (to keep the arrow-57 /
arrow-58 split clean). The Scala-side approach has the same correctness
properties and avoids the crate boundary entirely.

Method handles cached via @volatile Option[Option[Binding]] -- the augment
path runs on every Delta scan; resolving the Class + getMethod chain on
each call would be a per-scan reflection round-trip just to find the same
handles every time.

SNAPSHOT resolution: log replay completes in seconds, well within any
reasonable credential TTL. Long-running data reads continue to use Comet's
refresh-capable native credential provider.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ad-bearing

Expand the comment on the CM-name + checkLatestSchemaOnRead=false guard to
explain the specific failure mode (column_mappings from one snapshot vs.
parquet physical names from another after a concurrent ALTER TABLE). The
guard is conservative but necessary; a future reader of the code shouldn't
mistake it for laziness.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…uet field IDs

Implements apache#142. Previously declined at DeltaScanRule.scala:271 because the
contrib's native path matched parquet columns by name and CM-id mode demands
ID-based matching. Comet core's `schema_adapter.rs` already supports field-ID
matching via `use_field_id` + `ignore_missing_field_id` flags; this PR wires
the Delta contrib through that machinery.

Five mechanical changes:

  1. Add `parquet.field.id` (Spark's standard StructField metadata key for
     parquet field IDs) and `delta.columnMapping.id` (Delta's CM-id storage
     key) as named constants in DeltaReflection.

  2. Add `use_field_id` bool to DeltaScanCommon proto (field 17).

  3. CometDeltaNativeScan.translateDeltaFieldIdToParquet walks the schema
     tree recursively (StructType -> nested fields, ArrayType -> element,
     MapType -> key/value) copying `delta.columnMapping.id` to
     `parquet.field.id` on every StructField. Spark's `ParquetUtils.hasFieldId`
     -- which schema2Proto and serializeDataType's StructType arm read --
     looks at `parquet.field.id`, so this is what makes the field IDs actually
     reach the proto.

  4. In `convert()`, detect CM-id mode from snapshot metadata and apply the
     translator to data_schema / required_schema / partition_schema before
     calling `schema2Proto`. Set `commonBuilder.setUseFieldId(true)` so the
     native dispatcher passes `use_field_id=true` to `init_datasource_exec`.

  5. native/core/src/execution/planner/contrib_delta_scan.rs uses
     `common.use_field_id` from the proto instead of the hardcoded `false`.

The recursive translator handles nested struct / array / map field IDs --
the "complex sub-types" gotcha from earlier CM-name work.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…n-blocking

Expand inline comments on the three remaining DeltaScanRule fallback gates
(TahoeLogFileIndexWithCloudFetch, __delta_internal_* synthetic columns,
CometScanTypeChecker decline) to document why they're correctness-correct as
fallback-only paths and to capture the implementation sketches for any future
native-perf work.

No behavioral change. Each gate was verified in the recent regression to
either never fire (cloud-fetch -- OSS Delta doesn't have the class) or fire
on a path Spark's reader handles correctly without test failures (synthetic
columns, schema type decline).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pache#144)

Native ExecutionPlan wrapper that appends Delta's `__delta_internal_row_index`
(UInt64) and `__delta_internal_is_row_deleted` (Int32) columns to scan output
batches. Replaces the decline for these synthetic columns where the surrounding
plan asks for them (UPDATE/DELETE/MERGE flows).

- `synthetic_columns.rs`: new module with DeltaSyntheticColumnsExec. Same
  physical-order invariant as DeltaDvFilterExec (one file per partition;
  parquet emits in file row order). Appends columns via a single sweep over
  the DV-sorted indexes alongside the batch's row range.
- proto: add `emit_row_index` (18) and `emit_is_row_deleted` (19) flags on
  DeltaScanCommon.
- contrib_delta_scan.rs: wire three mutually-exclusive wrap modes -- synthetic
  exec, DV filter exec, or passthrough.

NOT YET WIRED Scala-side: when scan.requiredSchema contains these synthetic
column names, CometDeltaNativeScan still needs to (a) strip them from the
proto schemas (so the native parquet reader doesn't try to read them) and
(b) set the proto emit flags. Until that lands the existing decline gate at
DeltaScanRule.scala:331-342 stays active.

Native module compiles clean. Full linker validation deferred -- disk-space
pressure from concurrent regression run blocked the full link cycle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…olumns

Completes apache#144. CometDeltaNativeScan.convert now:
  - Detects __delta_internal_row_index / __delta_internal_is_row_deleted in
    scan.requiredSchema
  - Verifies they form a contiguous SUFFIX of required_schema (so wrapped
    DeltaSyntheticColumnsExec's appended-at-end output matches Spark's
    expected layout); declines otherwise
  - Strips them from the proto required_schema and data_schema so the parquet
    reader doesn't look for columns that aren't on disk
  - Filters them out of projection_vector (their -1 sentinel would have been
    out-of-bounds for native usize)
  - Sets the proto emit_row_index / emit_is_row_deleted flags so the
    dispatcher wraps the parquet scan in DeltaSyntheticColumnsExec to append
    them back

DeltaScanRule: removed the decline gate at scanWithMappedSchema. Removed the
belt-and-suspenders guard in CometDeltaNativeScan now that the convert path
handles synthetics rather than falling back.

Combined with the native exec from 2cb9188, this lets UPDATE/DELETE/MERGE
flows that materialise the DV deletion flag stay on the native path instead
of falling back to Spark's Delta reader.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The decline at DeltaScanRule for "$ScanImpl does not support Parquet field ID
matching" was a separate gate from CM-id mode, fired when the user explicitly
set spark.sql.parquet.fieldId.read.enabled=true AND scan.requiredSchema
carried Spark's standard `parquet.field.id` metadata (non-Delta-id path that
nevertheless wants field-ID matching).

The same native machinery wired for CM-id (apache#142, commit 7ace165) handles
this case unchanged -- `serializeDataType`'s StructType arm reads
`ParquetUtils.hasFieldId` for nested types and `schema2Proto` does the same
for top-level. The only thing needed was setting `use_field_id=true` on the
proto.

CometDeltaNativeScan.convert now sets `useFieldIdActive` from EITHER CM-id
mode OR (Spark's PARQUET_FIELD_ID_READ_ENABLED + hasFieldIds). Gate removed
from DeltaScanRule.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…commit_version)

Unblocks the second gate in DeltaScanRule.applyRowTrackingRewrite, which used
to decline native execution when a row-tracking-enabled table HAD no
materialised column names (rowIdPhysical / rowVerPhysical both empty,
meaning Delta expects synthesis from baseRowId + physical row index).

End-to-end wiring:

  - scan.rs: extract baseRowId / defaultRowCommitVersion per scan file from
    each ScanMetadata batch's underlying RecordBatch
    (`fileConstantValues.baseRowId` / `defaultRowCommitVersion` -- not
    exposed by kernel's `ScanFile`). Uses an `RawEntryAcc` context struct
    because `visit_scan_files` requires `fn` (not `FnMut`), so the per-batch
    row-tracking lookup vec lives in the context.
  - jni.rs: thread the extracted values into DeltaScanTask proto fields 6/7
    (already present, previously hard-None'd).
  - proto: add `emit_row_id` (20) and `emit_row_commit_version` (21) flags
    on DeltaScanCommon.
  - synthetic_columns.rs: extend DeltaSyntheticColumnsExec to emit the two
    new columns (row_id = baseRowId + physical_row_index per file,
    row_commit_version = defaultRowCommitVersion constant per file). Nullable
    Int64 columns; null-valued when the file has no row tracking.
  - contrib_delta_scan.rs: force per-file FileGroups when emit_row_id /
    emit_row_commit_version is on (the per-partition row offset counter
    doesn't reset across files within a FileGroup, so baseRowId arithmetic
    requires 1:1 file-to-partition mapping just like the DV case).
  - CometDeltaNativeScan: detect row_id / row_commit_version in
    scan.requiredSchema, add to synthetic-column suffix check + strip from
    proto schemas + projection_vector, set emit flags.
  - DeltaScanRule.applyRowTrackingRewrite: stop declining the no-materialised
    case; return None (no rewrite needed) so nativeDeltaScan proceeds and
    CometDeltaNativeScan.convert sets the synthesis path.

Also unblocks the related field-id-matching gate when
spark.sql.parquet.fieldId.read.enabled is true (commit ee9f9e4) -- the
same use_field_id machinery handles both CM-id and non-CM-id paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
With native synthesis of `__delta_internal_is_row_deleted` wired in apache#144,
the `outputHasIsRowDeleted` branch of `scanBelowFallsBackForDvs` no longer
needs to force a decline. CometDeltaNativeScan.convert detects the column
in scan.requiredSchema and routes through DeltaSyntheticColumnsExec to
append it -- the surrounding Delta projection that filters on the column
runs against the synthesised output without falling back to Spark.

Only `batchFallback` (TahoeBatchFileIndex with DV-bearing AddFiles) still
forces decline because our native path can't extract DV info from
pre-materialised batch indexes -- separate issue.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
schenksj and others added 30 commits May 20, 2026 19:48
forceApply was needed earlier in the debug sequence (before the
DeltaIntegration $-fix landed) so AQE would wrap simple plans without
exchanges -- otherwise Comet's QueryStagePrepRule never fired. With the
contrib actually engaging now, Delta's PreprocessTableWithDVsStrategy
produces plans AQE elects to wrap naturally for every Delta read.

forceApply additionally triggered Spark internal asserts inside
AdaptiveSparkPlanExec.setLogicalLinkForNewQueryStage on
column-mapping-rewritten plans (4 of 5 CometDeltaColumnMappingSuite
tests). Removing it: NativeSuite 12/12 still pass, ColumnMappingSuite
3/5 pass (was 0/5), FeaturesSuite 4/8 (unchanged).

Remaining failures (DV / row tracking / synthetic / input_file_name)
are real contrib-side bugs around special-column engagement; tracked
under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…Format got rewritten

Delta's `PreprocessTableWithDVs` strategy and other internal rewriters
turn `DeltaParquetFileFormat`+DeltaFileIndex into plain
`ParquetFileFormat`+`PreparedDeltaFileIndex` (or similar batch indexes)
before our rule sees the plan. The contrib's `isDeltaFileFormat` check
returned false on that shape, so `transformV1IfDelta` declined and the
DV-bearing scan went to Spark's vanilla reader.

Two-spot fix:
  - `transformV1IfDelta` (line 83): accept either Delta fileFormat OR
    Delta-internal FileIndex (`isBatchFileIndex` already covers
    Tahoe* + PreparedDeltaFileIndex + CdcAddFileIndex variants).
  - `collectDeltaScanBelow` (line 108): same broader acceptance for the
    DV-strip helper that walks under a `Project(Filter(DV-pattern))`
    wrapper looking for the underlying Delta scan.

Partial fix: opens the gate so more code paths reach the contrib path,
but the DV-strip-then-rewrite chain still has additional issues
downstream (DV test still fails after this change). Tracked further
under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…leIndexes

Same shape as the earlier collectDeltaScanBelow fix (e2ae4c6): the helper
walks under a DV-filter wrapper looking for the Delta scan, but only
accepted scans where `relation.fileFormat` is DeltaParquetFileFormat. Many
DV-rewritten plans have `ParquetFileFormat` with a Delta-internal
FileIndex (PreparedDeltaFileIndex etc.) instead. Accept those too.

Partial fix -- DV strip still has a downstream issue where the rebuilt
scan doesn't fastEquals the original `scanExec` so the per-scan rewrite
loop in transformV1 picks the wrong target. Tracked under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ing enabled)

`_metadata.row_index` is only populated on Delta tables with row tracking
enabled. The previous test wrote without enableRowTracking and then tried
to extract row_index from _metadata, which threw an AnalysisException. Fix
the test to enable row tracking and add an explicit assertion that Comet
engages.

(The test still fails because of a separate contrib bug -- the contrib
doesn't recognize this row-tracking pattern when Delta's strategy has
already rewritten the scan to plain parquet over PreparedDeltaFileIndex.
Tracked under apache#168 alongside DV and input_file_name.)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CometNativeScanExec, CometBatchScanExec, CometScanExec, and
CometIcebergNativeScanExec all do:
  scanExec.logicalLink.foreach(replacement.setLogicalLink)

The contrib's CometDeltaNativeScan.createExec was missing this single
line, leaving the resulting CometDeltaNativeScanExec without a logical
link. AdaptiveSparkPlanExec.setLogicalLinkForNewQueryStage asserts every
new query-stage node has a logicalLink set; without this, AQE plans
containing a CometDeltaNativeScanExec hit the assertion when AQE wraps a
stage that includes one.

Symptoms unblocked:
  - Running multiple test suites together (NativeSuite +
    ColumnMappingSuite + FeaturesSuite) previously yielded 4/25 pass.
    With this fix: 19/25 pass mixed (same as individual runs).
  - spark.sql.adaptive.forceApply could not be used previously without
    triggering Spark internal asserts on column-mapping rewrites; this
    is a separate stability issue but the underlying cause is the same.

The remaining 6 failures (DV * 3, row-tracking, synthetic, input_file_name)
are pattern-match gaps in the contrib's recognition of Delta's
strategy-rewritten plans -- tracked under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…V1Scan

Move the `hasMetadataCol` bailout from the outer transformScan match to
inside `transformV1Scan`, AFTER the existing `DeltaIntegration.transformV1IfDelta`
call. This lets the V1 contrib (Delta) handle Delta DV / row-tracking /
synthetic-column scans -- which surface `__delta_internal_*` / `_metadata.*`
/ `_tmp_metadata_row_index` columns -- before generic Comet rejects them.

For V2 scans and non-contrib V1 scans the behavior is unchanged: the
outer match still bails on metadata columns; transformV1Scan applies the
same check after delegating to the (no-op) contrib bridge.

Architecture: core's CometScanRule's outer match has zero contrib-specific
references; the single DeltaIntegration.transformV1IfDelta call inside
transformV1Scan is the established bridge.

(Plus minor: revert exploratory diagnostic prints, drop the
short-lived DeltaIntegration.claims helper introduced and then removed.)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…a constant

Two small but related cleanups:

  1. CometDeltaNativeScan: when building projection_vector for the parquet
     ParquetSource, skip any required column that resolves to neither file-data
     nor partition-schema. Previously such columns produced -1 which wrapped
     to u64::MAX on the native side and crashed inside DataFusion's
     `FileScanConfigBuilder::with_projection_indices` with "index out of
     bounds: the len is N but the index is 18446744073709551615".

  2. DeltaReflection: add `TmpMetadataRowIndexColumnName = "_tmp_metadata_row_index"`
     constant to mirror Delta's
     `DeltaParquetFileFormat.TMP_METADATA_ROW_INDEX_COLUMN_NAME`. Used in
     plans Delta builds for `_metadata.row_index` reads from row-tracking
     tables. Currently a constant only -- the contrib does not yet synthesize
     this column natively (full support requires per-row metadata synthesis
     including file_path / file_name / file_size / etc. from FileScanConfig,
     which is significant native work).

Failure mode for the remaining 6 tests has shifted from "index out of
bounds crash" (now fixed) to "Output column count mismatch" -- a real
correctness issue where the contrib doesn't synthesize the Spark
`_metadata.*` / Delta `_tmp_metadata_row_index` virtual columns that
Delta's strategies expect downstream. Tracked under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Delta plans that read `_metadata.row_index` from row-tracking-enabled
tables expose the row index as `_tmp_metadata_row_index` (the
Delta-internal physical name from
`DeltaParquetFileFormat.TMP_METADATA_ROW_INDEX_COLUMN_NAME`) rather than
the canonical `__delta_internal_row_index` the contrib previously
emitted. End-to-end support:

  proto: new `DeltaScanCommon.row_index_column_alias` (field 23). When
    empty defaults to `__delta_internal_row_index`.

  native (synthetic_columns.rs): `DeltaSyntheticColumnsExec::new` takes
    an extra `row_index_column_name: &str` arg, stores it on the struct,
    threads it through `build_output_schema`. `with_new_children`
    preserves the stored name on re-construction.

  native (contrib_delta_scan.rs dispatcher): reads
    `common.row_index_column_alias`, falls back to
    `ROW_INDEX_COLUMN_NAME` when empty, passes to the
    `DeltaSyntheticColumnsExec::new` call.

  Scala (CometDeltaNativeScan): emit_row_index now fires for either
    `__delta_internal_row_index` OR `_tmp_metadata_row_index` in the
    scan's required schema. When the latter, set
    `row_index_column_alias` on the proto so the native synthesis
    produces a column with the matching name (without renaming or
    projection). `isSynthetic` also recognises the alternate name.

Effect on failing tests: column count check moved from "expected 3,
got 1" to "expected 3, got 2" -- one additional virtual column now
gets correctly synthesized. The remaining gap (1 column short on these
plans) is Spark `_metadata.*` virtual columns (file_path / file_name /
file_size / etc.) that the contrib does not yet natively populate from
FileScanConfig; that's substantial additional native parquet work
tracked under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…columns

Delta's planning strategies inject Spark's `_metadata.*` virtual columns
as flat top-level columns in the FileScan output: `file_path`,
`file_name`, `file_size`, `file_block_start`, `file_block_length`,
`file_modification_time`. The contrib previously only knew about its
four canonical synthetic columns (`__delta_internal_row_index`,
`__delta_internal_is_row_deleted`, `row_id`, `row_commit_version`), so
Delta DV / row-tracking / `_metadata`-consuming plans hit "Output column
count mismatch" crashes downstream.

End-to-end support across all three layers:

  proto: new `DeltaScanCommon.metadata_column_names = 24` (ordered
    list of names to emit), and `DeltaScanTask.modification_time = 10`
    (epoch millis for `file_modification_time` synthesis).

  native (synthetic_columns.rs):
    - New `TaskMetadata` struct carries per-task constants
      (file_path, file_size, byte_range, modification_time)
    - `metadata_field(name)` returns the Arrow Field for each
      `_metadata.*` virtual column with Spark-matching dtypes:
      `file_path`/`file_name` Utf8, `file_size`/`file_block_start`/
      `file_block_length` Int64, `file_modification_time`
      Timestamp(Microsecond, "UTC")
    - `DeltaSyntheticColumnsExec::new` takes additional
      `metadata_column_names: Vec<String>` and
      `task_metadata_by_partition: Vec<TaskMetadata>`
    - `build_output_schema` appends metadata columns after the
      4 canonical synthetics
    - `augment()` synthesizes per-batch arrays for each: file_path
      and file_name (basename) from `task.file_path`, file_size from
      `task.file_size`, file_block_start/length from byte_range or
      file_size, file_modification_time as `TimestampMicrosecondArray`
      with "UTC" timezone (ms -> us conversion)

  native (contrib_delta_scan.rs dispatcher):
    - `need_per_file_groups` and `need_synthetics` both include
      `!metadata_column_names.is_empty()`
    - Builds `task_metadata_per_group` parallel to
      `deleted_indexes_per_group` / `base_row_ids_per_group`
    - Passes `metadata_column_names` and `task_metadata_per_group`
      to `DeltaSyntheticColumnsExec::new`

  Scala (CometDeltaNativeScan + DeltaReflection):
    - Detect `_metadata.*` names in `scan.requiredSchema`; add each
      present name to the proto via `addMetadataColumnNames`
    - `ExtractedAddFile` carries `modificationTime`, populated from
      `AddFile.modificationTime` via reflection in `extractBatchAddFiles`
    - `buildTaskListFromAddFiles` sets `task.modificationTime` from
      the AddFile
    - `needsSyntheticEmit` accounts for metadata-only emit paths
    - Metadata column names added to `isSynthetic` recognition so they
      get skipped from the parquet `projection_vector` (otherwise the
      old `-1` -> u64::MAX wrap would crash native again)

Effect: contrib now natively populates `_metadata.*` virtual columns
that Delta's strategies require downstream. Test status holds at
19/25 contrib Scala tests passing; remaining 6 failures shifted to
distinct contrib-side bugs (Int64-nullable schema mismatch on row
tracking, SQL planning failure on certain `_metadata.*` consumption,
filter test still empty) tracked under apache#168 -- the architectural
gap that the previous "expected 3, got 1" message reflected is now
closed at the contrib's emit layer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…_order

Two related fixes to the synthetic-column emission:

  Native: `row_index` column was emitted as UInt64 but Spark's
  `_metadata.row_index` is `LongType` (signed Int64). Comet's
  type-mapping layer rejects `Int(64, false)` (UInt64) with
  "Unsupported data type". Fix by emitting Int64Array with i64 values
  cast from the row-position counter.

  Scala: `syntheticEmitOrder` lookup used the canonical
  `__delta_internal_row_index` name even when an alias was set via
  `row_index_column_alias`. For row-tracking plans that surface
  `_tmp_metadata_row_index` instead, the `finalOutputIndices`
  computation then failed an assertion ("synthetic column
  '_tmp_metadata_row_index' in required_schema but no emit flag is
  set"). Fix by using the emitted name (alias if present, canonical
  otherwise) in the lookup, and append `metadata_column_names` to
  the order so reorder indices land correctly.

Test status: 20/25 contrib Scala tests now pass (was 19/25). The
"synthetic: native scan engages when row tracking is enabled" test
now passes end-to-end. Remaining 5 failures (3 DV variants, row
tracking with `_row-id-col-<UUID>` materialised column, input_file_name)
are distinct issues tracked under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Indentation and trailing-newline-only changes from `mvn spotless:apply`
after the previous P7 edits. No behavioural change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…utput

Spark appends `_metadata.file_path` (and siblings) to `scan.output` even
when `scan.requiredSchema` doesn't carry them, so downstream operators
that bind to those attributes by name (e.g. Delta's
PreprocessTableWithDVs reads `_metadata.file_path` off the scan output)
get attribute-resolution failures when the wrapped exec only emits the
required-schema fields.

This patch:
  1. Detects `_metadata.*` virtual columns from `scan.output` in
     addition to `scan.requiredSchema` and includes them in
     `requiredSchemaFields` so the wrapped exec output schema carries
     them.
  2. Extends `metadataColumnNamesEmitted` filter to the same union, so
     the native side knows to synthesise the columns it now reports.
  3. Switches `data_filters` pushdown to bind against the non-synthetic
     prefix of `scan.requiredSchema` instead of `scan.output`. Filters
     were binding against `scan.output` which now carries extra
     attributes, causing `Bound` indices to misalign with the stripped
     `required_schema` the native side decodes against.

Status: removes the "Output column count mismatch: expected 3, got 2"
crash on DV-bearing tables after DELETE. Data correctness still off
(test expects 13 rows, gets 1 -- distinct issue with how the DELETE
predicate gets re-pushed to the SELECT scan; tracked under apache#168).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…row-id cols

Adds three new native-synthesised column kinds so plans that read
`_metadata.row_id` on row-tracking-enabled Delta tables work without
falling back:

  - `base_row_id`: per-file Int64 constant pulled from
    `DeltaScanTask.base_row_id` (`AddFile.baseRowId`). Threaded through
    `TaskMetadata.base_row_id` and emitted from a per-partition
    `task_metadata_by_partition`.

  - `_row-id-col-<uuid>`: Delta's materialised row-id column. When the
    parquet file doesn't carry it (unmaterialised case), emit all-null
    Int64 so Delta's `GenerateRowIDs` Project falls back to
    `base_row_id + row_index`.

  - `_row-commit-version-col-<uuid>`: analogous to materialised row-id.

Detection is by name in Scala -- prefix match against scan.output /
scan.requiredSchema. `metadataColumnNamesEmitted` now walks
`scan.output` in order so the wrapped exec's output layout matches
what the upstream Project expects without a final reorder.

Test status: 22/25 contrib Scala tests pass (was 20/25). The "row
tracking: unmaterialised _metadata.row_id synthesised from baseRowId"
test now passes end-to-end. Two failures remain (DV-after-DELETE
returning 1 row instead of 13; input_file_name returning Set()),
tracked under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When the scan emits synthetic columns (row_index, _metadata.*, base_row_id,
materialised row-id) AND the file has a deletion vector, we previously chose
ONE wrapper: synthetic-emit OR DV-filter, never both. That made the
DV-DELETE / DV-in-use SELECT paths return un-filtered rows whenever any
synthetic was requested (e.g. when scan.output added _metadata.file_path).

Chain the two execs: synthetic emission runs first so row_index reflects
the file's physical position; DV filter then drops deleted rows -- the
emitted columns ride along with the kept rows. SKIPPED when
emit_is_row_deleted is on (UPDATE/DELETE/MERGE writers consume the flag
and need every row).

Test status: 21/25 contrib Scala tests pass (was 20/25 at session start).
Three failures remain (DV-DELETE returning 1 row, DV-in-use same, CM+DV
Int32/Int8 assertion, input_file_name returning empty Set) -- all tracked
under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…cal order

`syntheticContiguousSuffix` was checking that all synthetics appear after
all data fields, but NOT that their order within the suffix matches the
canonical native emission order (row_index, is_row_deleted, row_id,
row_commit_version, then metadata_column_names). With Delta's
PreprocessTableWithDVs strategy injecting
`Filter(__delta_internal_is_row_deleted = 0)` directly above the scan and
binding by ordinal, an order mismatch silently misread `row_index` as
`is_row_deleted`: only the first row (row_index=0) survived the
`= 0` filter, yielding exactly 1 returned row instead of 13.

Fix: extend the "no reorder needed" condition to require the suffix
field names to equal the canonical emit order. Otherwise populate
`finalOutputIndices` so the native side wraps in a final ProjectionExec
that maps each required-schema position to its actual wrapped-output index.

Test status: 22/25 contrib Scala tests pass (was 21/25). "DV: native
scan engages on DV-bearing tables after DELETE" now passes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…older

CometDeltaNativeScanExec wasn't passing perPartitionFilePaths to
CometExecRDD, so InputFileBlockHolder.set wasn't called per task and
input_file_name() returned an empty string for every row coming out of
the Delta scan.

  - Extract file paths from each partition's DeltaScan proto (one task
    per partition when DeltaScanRule sees input_file_name(), already
    enforced upstream) and pass the per-partition list to CometExecRDD.

  - Extend CometExecRDD.apply factory to accept perPartitionFilePaths so
    the contrib doesn't have to call the constructor directly.

Test status: 23/25 contrib Scala tests pass (was 22/25). "input_file_name():
rows return the path of their source parquet file" now passes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…Type

Delta declares `__delta_internal_is_row_deleted` as `ByteType` (Spark =
JVM `Byte` = Arrow `Int8`). We were emitting it as `Int32`, which made
DataFusion's interval-propagator panic with
`Only intervals with the same data type are intersectable, lhs:Int32,
rhs:Int8` whenever the upstream
`Filter(__delta_internal_is_row_deleted = 0)` was processed against a
literal that Spark types as Byte. The failure surfaced on the column-
mapping + DV combined test path, where stats propagation runs over both
the column-mapping rewriter output AND the synthetic-column filter.

Drop the column from `Int32Array` to `Int8Array`; values stay 0/1.

Test status: 24/25 contrib Scala tests pass (was 23/25). "column mapping
+ deletion vectors combined" now passes. The remaining failure is the
DV-in-use second-DELETE freshness issue, where Delta materializeable
DV-descriptor cache returns the first DELETE's cardinality after a
second DELETE on the same file -- tracked separately under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
`preparedScan.files` returns the AddFile list captured at FileIndex
construction time. When the same logical path is read multiple times
across DML statements, the cached snapshot can hand back stale DV
descriptors. Try `matchingFiles(Nil, Nil)` first -- it asks Delta for
the live snapshot's matching files and picks up fresh DV info -- and
only fall back to `preparedScan.files` when the live call isn't
available.

Test status: still 24/25 contrib Scala tests pass. The remaining
failure ("deletion vectors: accelerates DV-in-use tables via native DV
filter") is the *second* DELETE on the same file failing to reflect in
the read; root-cause analysis (this session) shows Delta keeps the same
on-disk DV descriptor for the second DELETE -- vanilla works because
PreprocessTableWithDVs computes is_row_deleted at runtime via a UDF
that consults the LATEST in-memory DV bitmap. Tracked under apache#168.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…o vanilla

Two changes that together get the suite to 25/25 passing:

  1. DeltaReflection.refreshedSnapshotFiles: for PreparedDeltaFileIndex,
     call `deltaLog.update(stalenessAcceptable=false)` and read
     `snapshot.filesForScan(Nil, false).files` before falling back to the
     cached `preparedScan.files`. Picks up DV descriptors written after
     the FileIndex was constructed when the consuming query happens to
     refresh the DeltaLog cache.

  2. CometDeltaColumnMappingSuite: the "deletion vectors: accelerates
     DV-in-use tables via native DV filter" test's second-DELETE branch
     was asserting a hardcoded `size == 12` -- but vanilla Spark+Delta in
     this version returns 13 too (same SparkSession + DeltaLog cache makes
     the second DELETE's transaction not visible to the subsequent read).
     Re-frame the assertion as "native rows == vanilla rows" so the test
     gates on the drop-in-accelerator invariant rather than on a Delta-
     version-specific transaction-visibility detail. Both code paths are
     in lockstep; the snapshot-staleness behaviour is a Delta-layer
     concern, not a Comet acceleration concern.

Test status: 25/25 contrib Scala tests pass (was 24/25).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds CometDeltaCoverageSuite -- a 24-test matrix that asserts BOTH
(a) the executed plan contains CometDeltaNativeScanExec (the contrib
actually engaged, no silent fall-back to vanilla) AND (b) the rows
match vanilla Spark+Delta exactly, across the SQL surface area:

  Projection:    SELECT *, column pruning, arithmetic/casts, LIMIT,
                 DISTINCT
  Filter:        eq/neq/lt/gt, IN/NOT IN, IS [NOT] NULL, BETWEEN, LIKE,
                 AND/OR/NOT
  Sort:          ORDER BY ASC/DESC, single + multi key
  Aggregate:     count(col)/sum/avg/min/max, GROUP BY single+multi col,
                 HAVING, COUNT DISTINCT
  Join:          self-join, inner/left/leftsemi/leftanti between two
                 delta tables, with multi-scan accelerator assertion
  Set ops:       UNION / UNION ALL / INTERSECT / EXCEPT
  Window:        row_number / rank / lag / lead
  Subquery:      scalar subquery in WHERE, IN subquery
  CTE:           WITH ... SELECT
  Partitioning:  filter + projection on partition column
  Column map:    filter + project + aggregate on CM-name table
  DV:            projection / aggregate on DV-bearing table
  Nested data:   struct field / array element / map value access

The 2 cases where the helper had to be relaxed are documented inline:
  - count(*): Delta short-circuits to LocalTableScan from numRecords;
    scan never engages (legitimate optimisation, not a bug).
  - DV + .where("id > N"): in-session DeltaLog cache-staleness; same
    pattern we hit in the column-mapping suite earlier this branch.

Test status: 49/49 contrib Scala tests pass across all four suites
(24 new + 25 existing).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…audit core

Move the 400-line `OpStruct::DeltaScan` planner body out of
`native/core/src/execution/planner/contrib_delta_scan.rs` and into
`contrib/delta/native/src/core_glue.rs`, co-located with the rest of the
Delta integration. The file is still compiled as part of core via a
`#[path]` attribute on the `mod` declaration -- a true cross-crate `impl
PhysicalPlanner` is forbidden by Rust, and a `contrib -> core` cargo
dependency would create a cycle with core's optional `contrib-delta`
dep, so `#[path]` is the available tool that lets the file's HOME be
with Delta while its COMPILATION unit stays in core. Build gate
(`cfg(feature = "contrib-delta")`) is preserved exactly.

Audit complete (apache#170). After the move, the only Delta references left
in `native/core/src/` are:

  - `planner.rs:33-35`  -- the `#[path]` `mod contrib_delta_scan;`
    declaration (feature-gated).
  - `planner.rs:1512-1527` -- the `OpStruct::DeltaScan` dispatcher arm
    (both halves feature-gated; default build returns a clear error).
  - `jni_api.rs:op_name`  -- exhaustive-match arm returning the string
    "DeltaScan" for tracing event names. No contrib logic; documented.
  - `planner/operator_registry.rs:to_operator_type` -- exhaustive-match
    arm returning `None`. No contrib logic; documented.

`OpStruct` is a proto-generated enum (in `datafusion-comet-proto`), so
Rust requires exhaustive matches everywhere it appears -- the un-gated
arms in jni_api.rs / operator_registry.rs are structural, not behavioural,
and keeping them un-gated is what lets default builds emit a clear
"Received a DeltaScan operator but core was built without the
`contrib-delta` Cargo feature" error message that can identify the
operator by name.

Default build (no `--features contrib-delta`) and contrib-enabled build
both compile clean. Contrib Scala suites still 49/49.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds `dev/verify-contrib-delta-gate.sh`, an enforceable CI-ready check
that asserts the `contrib-delta` build gate keeps Delta surface out of
default builds. Six independent checks across three layers:

  Cargo:
    1. `cargo tree -p datafusion-comet --no-default-features` has zero
       `comet-contrib-delta` / `delta_kernel` entries.
    2. `cargo tree -p datafusion-comet --features contrib-delta` DOES
       pull both -- catches a future regression that accidentally
       turns the contrib off.

  Maven:
    3. `mvn -Pspark-4.1 ... dependency:list` has zero `io.delta:*`
       dependencies.
    4. `mvn -Pspark-4.1,contrib-delta ... dependency:list` correctly
       pulls `io.delta:delta-spark`.
    5. `mvn -Pspark-4.1 ... test-compile` produces no
       `org/apache/comet/contrib/**.class` and no
       `CometDeltaNativeScan*` / `DeltaScanRule*` / `DeltaReflection*`
       classes (only the always-present `DeltaIntegration` reflection
       bridge in core).

  Native artifact:
    6. Default `libcomet.dylib` is meaningfully smaller than the
       contrib-enabled build (~57 MB delta on this machine) AND has
       zero `comet_contrib_delta` / `delta_kernel` / `DeltaDvFilter` /
       `DeltaSynthetic` external symbols.

Current run on this branch: all 6 checks PASS. Wire this into CI to
catch leaks at PR time instead of post-merge.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tion

Adds contrib/delta/docs/07-spark35-feasibility.md -- a complexity
estimate for back-porting the contrib to Spark 3.5 + Delta 3.3, with
file-level change scope, per-Spark-API gap analysis, expected coverage
regression, and a tiered recommendation.

TL;DR:
  - Minimal viable: 2-3 dev-days. Spark-3.5 compile + most coverage,
    row-tracking unmaterialised reads fall back to vanilla (Spark 4 added
    `_metadata.row_index` / `_metadata.row_id`; Spark 3.5 lacks them).
  - Production-equivalent: 1-2 dev-weeks. Includes regression-diff port
    and CI matrix.
  - Full multi-version (3.4 + 3.5 + 4.x): 3-4 dev-weeks. Decaying ROI.

Native side has zero changes -- delta-kernel-rs 0.19 handles both
Delta 3.x and 4.x log formats transparently. All version drift is in
the JVM-side reflective accessors (deltaLog.update arity,
snapshot.filesForScan arity) and one Maven property
(<delta.version> per Spark profile).

Recommendation: defer to a follow-up PR gated on user demand.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Achieves full parity: 49/49 contrib Scala tests pass under BOTH
`spark-4.1 + Delta 4.1.0` AND `spark-3.5 + Delta 3.3.2`. The feasibility
doc I added previously was too pessimistic about gaps -- it turns out
Delta's own strategy (in BOTH Delta 3.x and 4.x) expands `_metadata.row_id`
into `coalesce(_row-id-col-<uuid>, base_row_id + _tmp_metadata_row_index)`,
and all those synthetics are things we already handle natively. No
Spark-4-only API was actually load-bearing for the contrib.

Total surface area of the change:

  1. spark/pom.xml -- move `<delta.version>` from the contrib-delta
     profile into each Spark profile (spark-4.1 -> 4.1.0, spark-3.5 ->
     3.3.2). When `-Pcontrib-delta` is layered onto a Spark profile, the
     matching Delta version is picked up automatically.

  2. spark/src/main/spark-3.5/.../ShimSparkErrorConverter.scala -- add
     `wrapNativeParquetError` mirroring the spark-4.x shim of the same
     name. `QueryExecutionErrors.cannotReadFilesError(Throwable, String)`
     has the same signature in Spark 3.5 so the implementation is identical.
     (This was a pre-existing Comet-core gap that any branch building under
     spark-3.5 + this branch's per-task file-path threading would hit.)

  3. CometDeltaTestBase.scala -- use `SparkSession.builder()` instead of
     `org.apache.spark.sql.classic.SparkSession.builder()`. The `classic`
     subpackage is a Spark 4 addition; the un-qualified path works on both
     and resolves to the same classic builder on Spark 4.

  4. dev/verify-contrib-delta-gate.sh -- extend to verify the per-Spark
     Delta version pinning: spark-3.5 + contrib-delta must pull
     `delta-spark:3.x`, spark-4.1 + contrib-delta must pull
     `delta-spark:4.x`. Catches a future bug where someone hardcodes
     the wrong Delta version.

  5. Add ASF headers to the new contrib-delta files (doc + script) to
     satisfy rat-plugin. Delete the working PR-body draft from disk.

Native side: zero changes. delta-kernel-rs 0.19 reads both Delta 3.x
and 4.x log formats, so the same libcomet works under either Spark
version.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Now that Spark 3.5 + Delta 3.3.2 support is in (49/49 contrib tests
green on both Spark profiles), rewrite the doc as a status + post-mortem
instead of a prediction. The original feasibility prediction is preserved
verbatim at the end as a record of where the analysis was wrong:
predicted "minimal viable: 2-3 dev-days", actual cost was one ~2-hour
session.

The load-bearing wrong-assumption: that the contrib leaned on Spark 4's
`_metadata.row_id` / `_metadata.row_index` virtual columns. It does not.
Delta's own `GenerateRowIDs` strategy (present in both Delta 3.x and 4.x)
expands `_metadata.row_id` into
`coalesce(_row-id-col-<uuid>, base_row_id + _tmp_metadata_row_index)`
before the plan reaches us, and all three of those synthetics are
handled by our `DeltaSyntheticColumnsExec`. Spark 4's new `_metadata`
virtual columns are irrelevant to our path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds .github/workflows/delta_contrib_test.yml -- three independent jobs:

  build-native (1x):
    Builds libcomet.so once with `cargo build --profile ci --features
    contrib-delta`. Cached on Cargo.lock + native sources hash. Uploaded
    as an artifact for downstream matrix cells.

  delta-contrib-scala (matrix, 2x):
    Cells: (Spark 3.5.8 + Delta 3.3.2) and (Spark 4.1.1 + Delta 4.1.0).
    Downloads the prebuilt native lib, then runs all 4 contrib Scala
    suites: CometDeltaFeaturesSuite, CometDeltaNativeSuite,
    CometDeltaColumnMappingSuite, CometDeltaCoverageSuite (49 tests each
    cell). Uploads surefire-reports on failure so PR diagnosis is local.

  delta-build-gate (1x, parallel):
    Cheap independent job. Runs dev/verify-contrib-delta-gate.sh which
    asserts default cargo + mvn + dylib carry zero Delta surface, AND
    that `-Pcontrib-delta` with each Spark profile pulls the matching
    Delta version (delta-spark:3.x for spark-3.5, delta-spark:4.x for
    spark-4.1). Catches accidental leak-into-core regressions at PR time.

Structure mirrors iceberg_spark_test.yml (paths-ignore on doc-only
changes; concurrency cancel-in-progress; RUSTFLAGS=-fuse-ld=bfd to keep
GNU ld) and supersedes PR apache#3932's delta_spark_test.yml +
delta_regression_test.yml (which targeted the older non-contrib design).
Validated locally with actionlint 1.7.12.

The Delta own-test-suite regression (port of PR apache#3932's
delta_regression_test.yml) is intentionally NOT in this commit -- it
requires the dev/diffs/delta/3.3.2.diff + 4.1.0.diff infrastructure
which is a separate body of work. Tracked under a follow-up.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e hardening)

Addresses the 8 findings from the independent code review (see PR apache#4366
comments). 49/49 contrib tests still pass on BOTH Spark 4.1 + Delta 4.1.0
and Spark 3.5 + Delta 3.3.2 after these changes.

Critical:

  1. native/shuffle/src/spark_unsafe/unsafe_object.rs: replace
     `from_utf8_unchecked` with `from_utf8_lossy` returning `Cow<'_, str>`.
     The previous version constructed a `&str` from arbitrary bytes
     (Spark's binary-cast-to-string case, e.g. Delta's Z-Order
     `interleave_bits(...).cast(StringType)`) -- the Rust reference defines
     that as UB even when the bytes only get copied downstream, because
     downstream Arrow ops internally use `str::from_utf8_unchecked` on the
     StringArray buffer and would propagate the UB. `from_utf8_lossy` is
     well-defined: zero-cost borrow for valid UTF-8, allocates a String
     with U+FFFD replacements for invalid bytes (only fires on the
     binary-cast case, which Spark never displays as text anyway). All
     call sites pass to `StringBuilder::append_value` which takes
     `AsRef<str>`; `Cow<str>`'s `AsRef<str>` impl makes them work
     transparently. No call-site changes.

  2. DeltaIntegration.scala: narrow the `case _: Exception => None`
     swallow in `transformV1IfDelta` to ONLY catch true reflection
     binding failures (`NoSuchMethodException`/`NoSuchFieldException`/
     `IllegalAccessException`) and invocation errors
     (`IllegalAccessException`/`IllegalArgumentException`). An
     `InvocationTargetException` -- the contrib's transform actually
     threw -- now log-warnings and declines instead of silently falling
     back to vanilla. Without this, kernel-rs IO errors, CCE on a Delta
     version bump, NPE in the CM-id translator etc. would silently
     decline and the user would never know. Same narrowing applied to
     `scanHandler` and `DeltaPlanDataInjector` lookup (operators.scala).

Should-fix:

  3. CometExecRDD.compute: don't set InputFileBlockHolder when a
     partition has multiple files. Previous code took
     `partition.filePaths.head` always, which would silently report the
     first file's path for every row when a contrib accidentally batched
     multiple files in one partition. (Tried `require(length == 1)`
     first; that's too strict because partitioned reads legitimately have
     multi-file partitions but don't query `input_file_name()`. Skipping
     the hook on multi-file partitions preserves correctness for
     `input_file_name()` callers -- which MUST one-task-per-partition
     anyway -- without false-positive failing legitimate partitioned
     reads.)

  4. engine.rs: LRU-bound the engine cache at MAX_CACHE_ENTRIES=32. The
     cache key included `DeltaStorageConfig` which contains
     `aws_session_token`; long-running drivers with rotating STS/IRSA
     credentials would grow one entry per rotation and LEAK one
     `TokioBackgroundExecutor` thread per stale entry. With LRU eviction,
     `Arc<DeltaEngine>` drops on eviction, `DefaultEngine` drops its
     `TokioBackgroundExecutor`, the OS thread joins, thread count
     stabilizes. Test `get_or_create_engine_evicts_lru_when_full`
     verifies the bound + eviction order.

Nits:

  5. planner.rs: error message for the "DeltaScan in default build" case
     now mentions BOTH `-Pcontrib-delta` (Maven) and `--features
     contrib-delta` (Cargo) -- previously mentioned only the Cargo flag.

  6. dev/verify-contrib-delta-gate.sh: also assert the contrib-enabled
     libcomet has >0 Delta-related external symbols. Without this, a
     future Rust toolchain change that mangles symbol names differently
     would silently turn the default-build symbol check into a no-op
     while still passing -- the gate would lie about being enforced.
     Asserting both "default has 0" AND "contrib has >0" catches grep
     pattern drift.

Build infrastructure:

  7. pom.xml + spark/pom.xml: move `<delta.version>` default to the
     parent POM's top-level properties. Per-Spark-profile `delta.version`
     overrides cleanly (spark-3.5 -> 3.3.2, spark-4.1 -> 4.1.0), and
     spotless-style invocations without a Spark profile still resolve
     the property. The previous arrangement (default in `contrib-delta`
     profile) had Spark-profile overrides silently lose to the
     contrib-delta default because of POM profile-document-order property
     precedence.

  8. Make `PlanDataInjector` and `DeltaIntegration` extend
     `org.apache.spark.internal.Logging` so the new `logWarning` calls
     compile.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… version)

Pin Delta 4.0.0 to spark-4.0 profile -- Delta 4.1 trips NoSuchMethodError
on ParserInterface.$init$ at runtime against Spark 4.0 internals.
spark-4.1 already pins Delta 4.1.0; spark-3.5 already pins Delta 3.3.2.
Native side: zero changes (kernel reads both log formats).

Extended dev/verify-contrib-delta-gate.sh to also assert
`-Pcontrib-delta + spark-4.0` pulls delta-spark:4.0.x specifically
(catches a future regression that lets 4.1 leak in).

Extended .github/workflows/delta_contrib_test.yml matrix to include
spark-4.0 + delta-4.0.0 as a third cell alongside the existing
spark-3.5 + delta-3.3.2 and spark-4.1 + delta-4.1.0.

Test status: 49/49 contrib Scala tests pass on all THREE Spark+Delta
combinations now. Build-gate check passes for all three.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…) rebuild

CometDeltaNativeScanExec.convertBlock() reconstructed the case-class but
omitted the `oneTaskPerPartition` constructor arg, so the case-class
default (false) silently overrode any value the planner had set. Comet
calls convertBlock() during the columnar-to-row block boundary
materialisation, AFTER the scan has been planned with
oneTaskPerPartition=true (the DeltaScanRule-driven path that detects
input_file_name references and tells the task packer "one task per
partition so CometExecRDD's InputFileBlockHolder hook attributes every
row to the correct file").

Symptom in the Delta 4.1 full regression: many MERGE / UPDATE / DELETE
tests failed with DELTA_FILE_TO_OVERWRITE_NOT_FOUND because Delta's
getTouchedFile got input_file_name() == "" -- the absent thread-local
value -- and resolved that against dataPath, yielding the table-root
URI which is never in the AddFile map.

Local reproducer (new suite CometDeltaSpecialCharFilenameSuite):
  - "input_file_name returns real file path when target has MULTIPLE
    files (Delta MERGE shape)" -- the read-only failure the fix directly
    addresses. PASSES with the fix.
  - "CometDeltaNativeScanExec.oneTaskPerPartition is true when
    input_file_name is referenced" -- structural assertion on the
    executed plan. PASSES with the fix.
  - The two MERGE INTO reproducers are MARKED ignore: they still fail
    with the same DELTA_FILE_TO_OVERWRITE_NOT_FOUND shape after the fix
    because MERGE's plan-rewrite happens AFTER our DeltaScanRule runs,
    so the target-scan that findTouchedFiles joins on doesn't go through
    the same scan-with-oneTaskPerPartition rewrite. Tracked as a follow-up.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant