From a5cd7e70c99059efcc39d4e534d1326151b169bf Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 3 Jun 2026 20:07:00 +0800 Subject: [PATCH 1/8] [python][ray] Support partial SET and INSERT in merge_into Allow mapping-based update/insert specs with typed expression API: from pypaimon.ray import source_col, target_col, lit WhenMatched(update={"age": source_col("age"), "name": target_col("name")}) WhenNotMatched(insert={"id": source_col("id"), "status": lit("new")}) - Add SourceColumnRef, TargetColumnRef, LiteralValue types - Add source_col(), target_col(), lit() helpers - _normalize_set_spec converts all values to typed refs - Shorthand "s.col"/"t.col" strings still accepted - Validate keys against target schema, reject callables - Validate t.col refs exist, reject t.col in insert specs - Reject empty mapping specs - Only require referenced source columns, not all columns - Update docs with expression API examples --- docs/docs/pypaimon/ray-data.md | 15 +- paimon-python/pypaimon/ray/__init__.py | 8 + .../pypaimon/ray/data_evolution_merge_into.py | 90 ++++- .../ray/data_evolution_merge_transform.py | 38 +- .../ray_data_evolution_merge_into_test.py | 339 +++++++++++++++++- 5 files changed, 462 insertions(+), 28 deletions(-) diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md index a3c59c9f68b8..9d77770d9f76 100644 --- a/docs/docs/pypaimon/ray-data.md +++ b/docs/docs/pypaimon/ray-data.md @@ -374,10 +374,17 @@ Conditions use SQL-style expressions with `s.` (source) and `t.` (target) column prefixes. `WhenNotMatched` conditions may only reference source columns (`s.*`). Requires the `datafusion` package: `pip install pypaimon[sql]`. -- `update` / `insert`: only `"*"` is supported in this PR. A future follow-up - will add mapping-based SET (e.g. `{"col": "s.col"}`) where values are - analyzable string expressions (`"s."`, `"t."`, or literals), - not Python callables. +- `update` / `insert`: `"*"` updates/inserts all non-blob columns from source. + A mapping selects specific columns: + ```python + from pypaimon.ray import source_col, target_col, lit + + WhenMatched(update={"age": source_col("age"), "name": target_col("name")}) + WhenNotMatched(insert={"id": source_col("id"), "status": lit("new")}) + ``` + Shorthand strings `"s."` and `"t."` are also accepted but ambiguous + for literals starting with `s.` or `t.`; use `lit()` for those cases. + Python callables are not supported. - `condition`: an optional SQL-style boolean expression. Use `s.` and `t.` to reference source and target columns. diff --git a/paimon-python/pypaimon/ray/__init__.py b/paimon-python/pypaimon/ray/__init__.py index 9161f3cbb3b7..4280187956e3 100644 --- a/paimon-python/pypaimon/ray/__init__.py +++ b/paimon-python/pypaimon/ray/__init__.py @@ -21,6 +21,11 @@ WhenNotMatched, merge_into, ) +from pypaimon.ray.data_evolution_merge_transform import ( + source_col, + target_col, + lit, +) __all__ = [ "read_paimon", @@ -28,4 +33,7 @@ "merge_into", "WhenMatched", "WhenNotMatched", + "source_col", + "target_col", + "lit", ] diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py b/paimon-python/pypaimon/ray/data_evolution_merge_into.py index 5be68f301e05..251b67c38491 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py @@ -30,8 +30,11 @@ distributed_write_collect_msgs, ) from pypaimon.ray.data_evolution_merge_transform import ( + LiteralValue, OnSpec, SetSpec, + SourceColumnRef, + TargetColumnRef, WhenMatched, WhenNotMatched, _NormalizedClause, @@ -163,6 +166,7 @@ def _prepare(target, source, catalog_options, when_matched, when_not_matched, on _NormalizedClause( spec=_normalize_set_spec( c.insert, settable_field_names, on_map, + allow_target_refs=False, ), condition=c.condition, ) @@ -172,7 +176,7 @@ def _prepare(target, source, catalog_options, when_matched, when_not_matched, on source_ds = _normalize_source(source, catalog_options) _validate_source_on_cols(source_ds, source_on_cols) _validate_source_has_target_cols( - source_ds, settable_field_names, on_map, + source_ds, matched_specs + not_matched_specs, ) if has_condition: @@ -398,8 +402,8 @@ def _needed_target_cols( set_by_all = set(update_cols) for clause in clauses: for value in clause.spec.values(): - if isinstance(value, str) and value.startswith("t."): - needed.add(value[2:]) + if isinstance(value, TargetColumnRef): + needed.add(value.column) set_by_all &= set(clause.spec.keys()) needed |= set(update_cols) - set_by_all return [c for c in all_target_cols if c in needed] @@ -427,15 +431,66 @@ def _normalize_set_spec( spec: SetSpec, target_field_names: Sequence[str], on_map: Optional[Mapping[str, str]] = None, + allow_target_refs: bool = True, ) -> Dict[str, Any]: on_map = on_map or {} - if spec != "*": - raise NotImplementedError( - "merge_into currently only supports '*' for update/insert; " - "partial SET will be added in a follow-up PR." + if spec == "*": + return { + col: SourceColumnRef(on_map.get(col, col)) + for col in target_field_names + } + if not isinstance(spec, Mapping): + raise TypeError( + f"SET spec must be '*' or a mapping, got {type(spec).__name__}" ) - # A renamed ON key resolves via the source's ON column, not its own name. - return {col: f"s.{on_map.get(col, col)}" for col in target_field_names} + if not spec: + raise ValueError("SET spec must not be empty") + target_set = set(target_field_names) + for key in spec: + if key not in target_set: + raise ValueError( + f"SET spec references unknown target column '{key}'" + ) + result: Dict[str, Any] = {} + for key, val in spec.items(): + if callable(val) and not isinstance(val, type): + raise TypeError( + "SET values must be source_col(), target_col(), " + "lit(), or literals, not callables" + ) + if isinstance(val, SourceColumnRef): + result[key] = val + elif isinstance(val, TargetColumnRef): + if not allow_target_refs: + raise ValueError( + "INSERT spec must not reference target columns " + f"(t.*), but found: 't.{val.column}'" + ) + if val.column not in target_set: + raise ValueError( + f"SET spec references unknown target column " + f"'{val.column}'" + ) + result[key] = val + elif isinstance(val, LiteralValue): + result[key] = val + elif isinstance(val, str) and val.startswith("s."): + result[key] = SourceColumnRef(val[2:]) + elif isinstance(val, str) and val.startswith("t."): + if not allow_target_refs: + raise ValueError( + "INSERT spec must not reference target columns " + f"(t.*), but found: '{val}'" + ) + ref = val[2:] + if ref not in target_set: + raise ValueError( + f"SET spec references unknown target column '{ref}'" + ) + result[key] = TargetColumnRef(ref) + else: + result[key] = LiteralValue(val) + return result def _normalize_source(source: Any, catalog_options: Dict[str, str]): @@ -483,17 +538,16 @@ def _validate_source_on_cols(source_ds, on: Sequence[str]) -> None: def _validate_source_has_target_cols( source_ds, - target_field_names: Sequence[str], - on_map: Mapping[str, str], + specs: List[_NormalizedClause], ) -> None: - """For update='*'/insert='*', source must carry every (non-blob) target - column; otherwise the SET spec resolves to null and silently overwrites.""" names = set(_source_schema_or_raise(source_ds).names) - expected = {on_map.get(c, c) for c in target_field_names} - missing = sorted(expected - names) + needed = set() + for clause in specs: + for val in clause.spec.values(): + if isinstance(val, SourceColumnRef): + needed.add(val.column) + missing = sorted(needed - names) if missing: raise ValueError( - f"source is missing target columns {missing}; " - f"update='*'/insert='*' requires the source to carry every " - f"(non-blob) target column." + f"source is missing columns {missing} referenced by SET spec" ) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_transform.py b/paimon-python/pypaimon/ray/data_evolution_merge_transform.py index ed786467f1e7..0e11b4b6c005 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_transform.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_transform.py @@ -25,6 +25,33 @@ OnSpec = Union[Sequence[str], Mapping[str, str]] +@dataclass(frozen=True) +class SourceColumnRef: + column: str + + +@dataclass(frozen=True) +class TargetColumnRef: + column: str + + +@dataclass(frozen=True) +class LiteralValue: + value: Any + + +def source_col(name: str) -> SourceColumnRef: + return SourceColumnRef(name) + + +def target_col(name: str) -> TargetColumnRef: + return TargetColumnRef(name) + + +def lit(value: Any) -> LiteralValue: + return LiteralValue(value) + + @dataclass class WhenMatched: update: SetSpec @@ -105,17 +132,18 @@ def _resolve_spec_array( on_pairs: Sequence[Tuple[str, str]], out_type: pa.DataType, ): - if isinstance(val, str) and val.startswith("s."): - ref = val[2:] + if isinstance(val, LiteralValue): + return pa.array([val.value] * batch.num_rows, type=out_type) + if isinstance(val, SourceColumnRef): + ref = val.column if f"s.{ref}" in available: return batch.column(f"s.{ref}") for sk, tk in on_pairs: if sk == ref and f"t.{tk}" in available: return batch.column(f"t.{tk}") return pa.nulls(batch.num_rows, type=out_type) - if isinstance(val, str) and val.startswith("t."): - ref = val[2:] - col_name = f"t.{ref}" + if isinstance(val, TargetColumnRef): + col_name = f"t.{val.column}" return batch.column(col_name) if col_name in available else pa.nulls( batch.num_rows, type=out_type ) diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index 47981088f2d3..379825951814 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -26,7 +26,10 @@ import ray from pypaimon import CatalogFactory, Schema -from pypaimon.ray import WhenMatched, WhenNotMatched, merge_into +from pypaimon.ray import ( + WhenMatched, WhenNotMatched, merge_into, + source_col, target_col, lit, +) try: import datafusion # noqa: F401 @@ -815,6 +818,340 @@ def test_duplicate_source_filtered_by_condition(self): self.assertEqual(out['name'], ['y']) self.assertEqual(out['age'], [20]) + def test_matched_partial_update(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1, 2], type=pa.int32()), + 'name': ['a', 'b'], + 'age': pa.array([10, 20], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1, 2], type=pa.int32()), + 'name': ['a2', 'b2'], + 'age': pa.array([99, 88], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={'age': 's.age'})], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['id'], [1, 2]) + self.assertEqual(out['name'], ['a', 'b']) + self.assertEqual(out['age'], [99, 88]) + + def test_insert_partial_mapping(self): + target = self._create_table() + + source = pa.Table.from_pydict( + { + 'id': pa.array([1, 2], type=pa.int32()), + 'name': ['a', 'b'], + 'age': pa.array([10, 20], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_not_matched=[ + WhenNotMatched(insert={'id': 's.id', 'name': 's.name'}) + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['id'], [1, 2]) + self.assertEqual(out['name'], ['a', 'b']) + self.assertEqual(out['age'], [None, None]) + + def test_update_with_literal(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['old'], + 'age': pa.array([10], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['ignored'], + 'age': pa.array([99], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={'name': 'updated'})], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['name'], ['updated']) + self.assertEqual(out['age'], [10]) + + def test_invalid_target_column_rejected(self): + target = self._create_table() + with self.assertRaises(ValueError) as ctx: + merge_into( + target=target, + source=self._source(), + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={'nonexistent': 's.id'})], + num_partitions=_TEST_NUM_PARTITIONS, + ) + self.assertIn('nonexistent', str(ctx.exception)) + + def test_invalid_target_ref_rejected(self): + target = self._create_table() + with self.assertRaises(ValueError) as ctx: + merge_into( + target=target, + source=self._source(), + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={'name': 't.nme'})], + num_partitions=_TEST_NUM_PARTITIONS, + ) + self.assertIn('nme', str(ctx.exception)) + + def test_empty_mapping_rejected(self): + target = self._create_table() + with self.assertRaises(ValueError): + merge_into( + target=target, + source=self._source(), + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={})], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + def test_insert_target_ref_rejected(self): + target = self._create_table() + with self.assertRaises(ValueError) as ctx: + merge_into( + target=target, + source=self._source(), + catalog_options=self.catalog_options, + on=['id'], + when_not_matched=[ + WhenNotMatched(insert={'name': 't.name'}) + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + self.assertIn('t.', str(ctx.exception)) + + def test_matched_update_with_target_ref(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['old'], + 'age': pa.array([10], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['ignored'], + 'age': pa.array([99], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={'age': 's.age', 'name': 't.name'})], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['name'], ['old']) + self.assertEqual(out['age'], [99]) + + def test_callable_value_rejected(self): + target = self._create_table() + with self.assertRaises(TypeError): + merge_into( + target=target, + source=self._source(), + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={'name': lambda r: r})], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + def test_source_missing_referenced_col(self): + target = self._create_table() + source = pa.Table.from_pydict( + {'id': pa.array([1], type=pa.int32())}, + schema=pa.schema([('id', pa.int32())]), + ) + with self.assertRaises(ValueError) as ctx: + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={'name': 's.name'})], + num_partitions=_TEST_NUM_PARTITIONS, + ) + self.assertIn('name', str(ctx.exception)) + + def test_lit_prevents_column_ref_interpretation(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['old'], + 'age': pa.array([10], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['ignored'], + 'age': pa.array([99], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={ + 'name': lit('s.active'), + })], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['name'], ['s.active']) + self.assertEqual(out['age'], [10]) + + def test_source_col_helper(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['old'], + 'age': pa.array([10], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['new'], + 'age': pa.array([99], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={ + 'age': source_col('age'), + })], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['name'], ['old']) + self.assertEqual(out['age'], [99]) + + def test_target_col_helper(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['keep'], + 'age': pa.array([10], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source = pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['ignored'], + 'age': pa.array([99], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[WhenMatched(update={ + 'age': source_col('age'), + 'name': target_col('name'), + })], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['name'], ['keep']) + self.assertEqual(out['age'], [99]) + class TargetProjectionTest(unittest.TestCase): From b7c8cfbc07a45be3f97f3dbf037d49c371688c38 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 3 Jun 2026 20:33:12 +0800 Subject: [PATCH 2/8] [ray] Auto-fill ON keys in partial insert and fix rename mapping - Partial insert specs auto-fill missing ON key columns from source - Mapping specs with renamed ON keys apply on_map to SourceColumnRef so {"id": "s.id"} resolves correctly when on={"id": "uid"} - Add tests for both scenarios --- .../pypaimon/ray/data_evolution_merge_into.py | 27 +++++---- .../ray_data_evolution_merge_into_test.py | 59 +++++++++++++++++++ 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py b/paimon-python/pypaimon/ray/data_evolution_merge_into.py index 251b67c38491..4c3892de3756 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py @@ -162,16 +162,18 @@ def _prepare(target, source, catalog_options, when_matched, when_not_matched, on f"condition must not reference blob columns, " f"but found: {sorted(blob_refs)}" ) - not_matched_specs = [ - _NormalizedClause( - spec=_normalize_set_spec( - c.insert, settable_field_names, on_map, - allow_target_refs=False, - ), - condition=c.condition, + not_matched_specs = [] + for c in when_not_matched: + spec = _normalize_set_spec( + c.insert, settable_field_names, on_map, + allow_target_refs=False, + ) + for tk, sk in on_map.items(): + if tk in settable_field_names and tk not in spec: + spec[tk] = SourceColumnRef(sk) + not_matched_specs.append( + _NormalizedClause(spec=spec, condition=c.condition) ) - for c in when_not_matched - ] source_ds = _normalize_source(source, catalog_options) _validate_source_on_cols(source_ds, source_on_cols) @@ -459,6 +461,8 @@ def _normalize_set_spec( "lit(), or literals, not callables" ) if isinstance(val, SourceColumnRef): + if key in on_map and val.column == key: + val = SourceColumnRef(on_map[key]) result[key] = val elif isinstance(val, TargetColumnRef): if not allow_target_refs: @@ -475,7 +479,10 @@ def _normalize_set_spec( elif isinstance(val, LiteralValue): result[key] = val elif isinstance(val, str) and val.startswith("s."): - result[key] = SourceColumnRef(val[2:]) + ref = val[2:] + if key in on_map and ref == key: + ref = on_map[key] + result[key] = SourceColumnRef(ref) elif isinstance(val, str) and val.startswith("t."): if not allow_target_refs: raise ValueError( diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index 379825951814..c5a96ede6321 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1037,6 +1037,65 @@ def test_source_missing_referenced_col(self): ) self.assertIn('name', str(ctx.exception)) + def test_partial_insert_auto_fills_on_key(self): + target = self._create_table() + + source = pa.Table.from_pydict( + { + 'id': pa.array([1, 2], type=pa.int32()), + 'name': ['a', 'b'], + 'age': pa.array([10, 20], type=pa.int32()), + }, + schema=self.pa_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on=['id'], + when_not_matched=[ + WhenNotMatched(insert={'name': 's.name'}) + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['id'], [1, 2]) + self.assertEqual(out['name'], ['a', 'b']) + + def test_partial_insert_renamed_on_key_auto_filled(self): + target = self._create_table() + + source_schema = pa.schema([ + ('uid', pa.int32()), + ('name', pa.string()), + ('age', pa.int32()), + ]) + source = pa.Table.from_pydict( + { + 'uid': pa.array([1, 2], type=pa.int32()), + 'name': ['a', 'b'], + 'age': pa.array([10, 20], type=pa.int32()), + }, + schema=source_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on={'id': 'uid'}, + when_not_matched=[ + WhenNotMatched(insert={'name': 's.name'}) + ], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['id'], [1, 2]) + self.assertEqual(out['name'], ['a', 'b']) + def test_lit_prevents_column_ref_interpretation(self): target = self._create_table() self._write( From 8209a8960ce7bedfe52d99e4a1b911967932a40e Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 3 Jun 2026 20:43:17 +0800 Subject: [PATCH 3/8] [ray] Add TypeError fallback in _resolve_spec_array Raise clear error for unexpected spec value types instead of implicit None return that would cause confusing downstream errors. --- paimon-python/pypaimon/ray/data_evolution_merge_transform.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_transform.py b/paimon-python/pypaimon/ray/data_evolution_merge_transform.py index 0e11b4b6c005..003977f3e7f2 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_transform.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_transform.py @@ -147,4 +147,4 @@ def _resolve_spec_array( return batch.column(col_name) if col_name in available else pa.nulls( batch.num_rows, type=out_type ) - return pa.array([val] * batch.num_rows, type=out_type) + raise TypeError(f"unexpected spec value type: {type(val).__name__}") From fbc56920f63632aa4c3ab9d8019c11bc6707b14a Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 3 Jun 2026 20:46:40 +0800 Subject: [PATCH 4/8] [ray] Do not remap explicit source refs by ON key rename Explicit source_col("id") or "s.id" in mapping specs should refer to the source column as written, not be silently rewritten via the ON key rename map. The remap is only needed for update="*" expansion and insert ON-key auto-fill, not for user-specified mappings. Add test: source has both uid (ON key) and id columns, update={"id": source_col("id")} should write source id=999, not source uid=1. --- .../pypaimon/ray/data_evolution_merge_into.py | 7 +-- .../ray_data_evolution_merge_into_test.py | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/paimon-python/pypaimon/ray/data_evolution_merge_into.py b/paimon-python/pypaimon/ray/data_evolution_merge_into.py index 4c3892de3756..35451bb744e9 100644 --- a/paimon-python/pypaimon/ray/data_evolution_merge_into.py +++ b/paimon-python/pypaimon/ray/data_evolution_merge_into.py @@ -461,8 +461,6 @@ def _normalize_set_spec( "lit(), or literals, not callables" ) if isinstance(val, SourceColumnRef): - if key in on_map and val.column == key: - val = SourceColumnRef(on_map[key]) result[key] = val elif isinstance(val, TargetColumnRef): if not allow_target_refs: @@ -479,10 +477,7 @@ def _normalize_set_spec( elif isinstance(val, LiteralValue): result[key] = val elif isinstance(val, str) and val.startswith("s."): - ref = val[2:] - if key in on_map and ref == key: - ref = on_map[key] - result[key] = SourceColumnRef(ref) + result[key] = SourceColumnRef(val[2:]) elif isinstance(val, str) and val.startswith("t."): if not allow_target_refs: raise ValueError( diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index c5a96ede6321..b41ad6476ece 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1096,6 +1096,52 @@ def test_partial_insert_renamed_on_key_auto_filled(self): self.assertEqual(out['id'], [1, 2]) self.assertEqual(out['name'], ['a', 'b']) + def test_explicit_source_ref_not_remapped_by_on_key(self): + target = self._create_table() + self._write( + target, + pa.Table.from_pydict( + { + 'id': pa.array([1], type=pa.int32()), + 'name': ['old'], + 'age': pa.array([10], type=pa.int32()), + }, + schema=self.pa_schema, + ), + ) + + source_schema = pa.schema([ + ('uid', pa.int32()), + ('id', pa.int32()), + ('name', pa.string()), + ('age', pa.int32()), + ]) + source = pa.Table.from_pydict( + { + 'uid': pa.array([1], type=pa.int32()), + 'id': pa.array([999], type=pa.int32()), + 'name': ['new'], + 'age': pa.array([99], type=pa.int32()), + }, + schema=source_schema, + ) + + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on={'id': 'uid'}, + when_matched=[WhenMatched(update={ + 'id': source_col('id'), + 'name': source_col('name'), + })], + num_partitions=_TEST_NUM_PARTITIONS, + ) + + out = self._read_sorted(target) + self.assertEqual(out['id'], [999]) + self.assertEqual(out['name'], ['new']) + def test_lit_prevents_column_ref_interpretation(self): target = self._create_table() self._write( From e5c68ac19525020db75c8fb8d347b0ac2fd0e33c Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 3 Jun 2026 20:59:03 +0800 Subject: [PATCH 5/8] [ray] Add test for missing source col with renamed ON key source_col("id") with on={"id": "uid"} should raise ValueError when source has no "id" column, not silently remap to "uid". --- .../ray_data_evolution_merge_into_test.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index b41ad6476ece..989c33aebf0d 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1142,6 +1142,34 @@ def test_explicit_source_ref_not_remapped_by_on_key(self): self.assertEqual(out['id'], [999]) self.assertEqual(out['name'], ['new']) + def test_renamed_on_key_missing_source_col_rejected(self): + target = self._create_table() + source_schema = pa.schema([ + ('uid', pa.int32()), + ('name', pa.string()), + ('age', pa.int32()), + ]) + source = pa.Table.from_pydict( + { + 'uid': pa.array([1], type=pa.int32()), + 'name': ['a'], + 'age': pa.array([10], type=pa.int32()), + }, + schema=source_schema, + ) + with self.assertRaises(ValueError) as ctx: + merge_into( + target=target, + source=source, + catalog_options=self.catalog_options, + on={'id': 'uid'}, + when_matched=[WhenMatched(update={ + 'id': source_col('id'), + })], + num_partitions=_TEST_NUM_PARTITIONS, + ) + self.assertIn('id', str(ctx.exception)) + def test_lit_prevents_column_ref_interpretation(self): target = self._create_table() self._write( From 11149c1260af62ba185f56934d6c354b4e2b4886 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 3 Jun 2026 21:21:09 +0800 Subject: [PATCH 6/8] [ray] Clarify mapping spec docs for target ref and auto-fill - t.col only accepted in WhenMatched update, not insert - Omitted cols: matched preserves target, insert writes NULL - ON key auto-filled from source in partial insert --- docs/docs/pypaimon/ray-data.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md index 9d77770d9f76..3e612d89d966 100644 --- a/docs/docs/pypaimon/ray-data.md +++ b/docs/docs/pypaimon/ray-data.md @@ -382,9 +382,13 @@ columns (`s.*`). Requires the `datafusion` package: `pip install pypaimon[sql]`. WhenMatched(update={"age": source_col("age"), "name": target_col("name")}) WhenNotMatched(insert={"id": source_col("id"), "status": lit("new")}) ``` - Shorthand strings `"s."` and `"t."` are also accepted but ambiguous - for literals starting with `s.` or `t.`; use `lit()` for those cases. - Python callables are not supported. + Shorthand strings `"s."` are accepted for both update and insert. + `"t."` and `target_col()` are only accepted in `WhenMatched(update=...)`; + insert specs cannot reference target columns. For literals starting with + `s.` or `t.`, use `lit()` to avoid ambiguity. Matched update specs that omit + a column preserve the target's original value; not-matched insert specs that + omit non-key columns write NULL, and omitted ON key columns are auto-filled + from the source. Python callables are not supported. - `condition`: an optional SQL-style boolean expression. Use `s.` and `t.` to reference source and target columns. From 9496ed9a7ec7390c02aed79f5502883618cc1857 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 3 Jun 2026 21:22:27 +0800 Subject: [PATCH 7/8] [ray] Simplify mapping spec docs --- docs/docs/pypaimon/ray-data.md | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md index 3e612d89d966..d160b4302f65 100644 --- a/docs/docs/pypaimon/ray-data.md +++ b/docs/docs/pypaimon/ray-data.md @@ -382,13 +382,8 @@ columns (`s.*`). Requires the `datafusion` package: `pip install pypaimon[sql]`. WhenMatched(update={"age": source_col("age"), "name": target_col("name")}) WhenNotMatched(insert={"id": source_col("id"), "status": lit("new")}) ``` - Shorthand strings `"s."` are accepted for both update and insert. - `"t."` and `target_col()` are only accepted in `WhenMatched(update=...)`; - insert specs cannot reference target columns. For literals starting with - `s.` or `t.`, use `lit()` to avoid ambiguity. Matched update specs that omit - a column preserve the target's original value; not-matched insert specs that - omit non-key columns write NULL, and omitted ON key columns are auto-filled - from the source. Python callables are not supported. + `"s."` / `"t."` shorthands also work (`t.*` only in update). + Use `lit()` for literals starting with `s.` or `t.`. - `condition`: an optional SQL-style boolean expression. Use `s.` and `t.` to reference source and target columns. From 9154599ee703caf80f40b19de92d1c196cb88a71 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 3 Jun 2026 21:23:13 +0800 Subject: [PATCH 8/8] [ray] Rephrase explicit source ref test to avoid ON key mutation Update non-key column (age) instead of ON key (id) to prove source_col("id") is not remapped, without implying we support changing join keys. --- .../pypaimon/tests/ray_data_evolution_merge_into_test.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index 989c33aebf0d..00814057c23e 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1119,7 +1119,7 @@ def test_explicit_source_ref_not_remapped_by_on_key(self): source = pa.Table.from_pydict( { 'uid': pa.array([1], type=pa.int32()), - 'id': pa.array([999], type=pa.int32()), + 'id': pa.array([42], type=pa.int32()), 'name': ['new'], 'age': pa.array([99], type=pa.int32()), }, @@ -1132,15 +1132,14 @@ def test_explicit_source_ref_not_remapped_by_on_key(self): catalog_options=self.catalog_options, on={'id': 'uid'}, when_matched=[WhenMatched(update={ - 'id': source_col('id'), - 'name': source_col('name'), + 'age': source_col('id'), })], num_partitions=_TEST_NUM_PARTITIONS, ) out = self._read_sorted(target) - self.assertEqual(out['id'], [999]) - self.assertEqual(out['name'], ['new']) + self.assertEqual(out['age'], [42]) + self.assertEqual(out['name'], ['old']) def test_renamed_on_key_missing_source_col_rejected(self): target = self._create_table()