Skip to content
14 changes: 10 additions & 4 deletions docs/docs/pypaimon/ray-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,16 @@ Conditions use SQL-style expressions with `s.` (source) and `t.` (target)
column prefixes. `WhenNotMatched` conditions may only reference source
columns (`s.*`). Requires the `datafusion` package: `pip install pypaimon[sql]`.

- `update` / `insert`: only `"*"` is supported in this PR. A future follow-up
will add mapping-based SET (e.g. `{"col": "s.col"}`) where values are
analyzable string expressions (`"s.<col>"`, `"t.<col>"`, or literals),
not Python callables.
- `update` / `insert`: `"*"` updates/inserts all non-blob columns from source.
A mapping selects specific columns:
```python
from pypaimon.ray import source_col, target_col, lit

WhenMatched(update={"age": source_col("age"), "name": target_col("name")})
WhenNotMatched(insert={"id": source_col("id"), "status": lit("new")})
```
`"s.<col>"` / `"t.<col>"` shorthands also work (`t.*` only in update).
Use `lit()` for literals starting with `s.` or `t.`.
- `condition`: an optional SQL-style boolean expression. Use `s.<col>` and
`t.<col>` to reference source and target columns.

Expand Down
8 changes: 8 additions & 0 deletions paimon-python/pypaimon/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@
WhenNotMatched,
merge_into,
)
from pypaimon.ray.data_evolution_merge_transform import (
source_col,
target_col,
lit,
)

__all__ = [
"read_paimon",
"write_paimon",
"merge_into",
"WhenMatched",
"WhenNotMatched",
"source_col",
"target_col",
"lit",
]
108 changes: 82 additions & 26 deletions paimon-python/pypaimon/ray/data_evolution_merge_into.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
distributed_write_collect_msgs,
)
from pypaimon.ray.data_evolution_merge_transform import (
LiteralValue,
OnSpec,
SetSpec,
SourceColumnRef,
TargetColumnRef,
WhenMatched,
WhenNotMatched,
_NormalizedClause,
Expand Down Expand Up @@ -159,20 +162,23 @@ def _prepare(target, source, catalog_options, when_matched, when_not_matched, on
f"condition must not reference blob columns, "
f"but found: {sorted(blob_refs)}"
)
not_matched_specs = [
_NormalizedClause(
spec=_normalize_set_spec(
c.insert, settable_field_names, on_map,
),
condition=c.condition,
not_matched_specs = []
for c in when_not_matched:
spec = _normalize_set_spec(
c.insert, settable_field_names, on_map,
allow_target_refs=False,
)
for tk, sk in on_map.items():
if tk in settable_field_names and tk not in spec:
spec[tk] = SourceColumnRef(sk)
not_matched_specs.append(
_NormalizedClause(spec=spec, condition=c.condition)
)
for c in when_not_matched
]

source_ds = _normalize_source(source, catalog_options)
_validate_source_on_cols(source_ds, source_on_cols)
_validate_source_has_target_cols(
source_ds, settable_field_names, on_map,
source_ds, matched_specs + not_matched_specs,
)

if has_condition:
Expand Down Expand Up @@ -398,8 +404,8 @@ def _needed_target_cols(
set_by_all = set(update_cols)
for clause in clauses:
for value in clause.spec.values():
if isinstance(value, str) and value.startswith("t."):
needed.add(value[2:])
if isinstance(value, TargetColumnRef):
needed.add(value.column)
set_by_all &= set(clause.spec.keys())
needed |= set(update_cols) - set_by_all
return [c for c in all_target_cols if c in needed]
Expand Down Expand Up @@ -427,15 +433,66 @@ def _normalize_set_spec(
spec: SetSpec,
target_field_names: Sequence[str],
on_map: Optional[Mapping[str, str]] = None,
allow_target_refs: bool = True,
) -> Dict[str, Any]:
on_map = on_map or {}
if spec != "*":
raise NotImplementedError(
"merge_into currently only supports '*' for update/insert; "
"partial SET will be added in a follow-up PR."
if spec == "*":
return {
col: SourceColumnRef(on_map.get(col, col))
for col in target_field_names
}
if not isinstance(spec, Mapping):
raise TypeError(
f"SET spec must be '*' or a mapping, got {type(spec).__name__}"
)
# A renamed ON key resolves via the source's ON column, not its own name.
return {col: f"s.{on_map.get(col, col)}" for col in target_field_names}
if not spec:
raise ValueError("SET spec must not be empty")
target_set = set(target_field_names)
for key in spec:
if key not in target_set:
raise ValueError(
f"SET spec references unknown target column '{key}'"
)
result: Dict[str, Any] = {}
for key, val in spec.items():
if callable(val) and not isinstance(val, type):
raise TypeError(
"SET values must be source_col(), target_col(), "
"lit(), or literals, not callables"
)
if isinstance(val, SourceColumnRef):
result[key] = val
elif isinstance(val, TargetColumnRef):
if not allow_target_refs:
raise ValueError(
"INSERT spec must not reference target columns "
f"(t.*), but found: 't.{val.column}'"
)
if val.column not in target_set:
raise ValueError(
f"SET spec references unknown target column "
f"'{val.column}'"
)
result[key] = val
elif isinstance(val, LiteralValue):
result[key] = val
elif isinstance(val, str) and val.startswith("s."):
result[key] = SourceColumnRef(val[2:])
elif isinstance(val, str) and val.startswith("t."):
if not allow_target_refs:
raise ValueError(
"INSERT spec must not reference target columns "
f"(t.*), but found: '{val}'"
)
ref = val[2:]
if ref not in target_set:
raise ValueError(
f"SET spec references unknown target column '{ref}'"
)
result[key] = TargetColumnRef(ref)
else:
result[key] = LiteralValue(val)
return result


def _normalize_source(source: Any, catalog_options: Dict[str, str]):
Expand Down Expand Up @@ -483,17 +540,16 @@ def _validate_source_on_cols(source_ds, on: Sequence[str]) -> None:

def _validate_source_has_target_cols(
source_ds,
target_field_names: Sequence[str],
on_map: Mapping[str, str],
specs: List[_NormalizedClause],
) -> None:
"""For update='*'/insert='*', source must carry every (non-blob) target
column; otherwise the SET spec resolves to null and silently overwrites."""
names = set(_source_schema_or_raise(source_ds).names)
expected = {on_map.get(c, c) for c in target_field_names}
missing = sorted(expected - names)
needed = set()
for clause in specs:
for val in clause.spec.values():
if isinstance(val, SourceColumnRef):
needed.add(val.column)
missing = sorted(needed - names)
if missing:
raise ValueError(
f"source is missing target columns {missing}; "
f"update='*'/insert='*' requires the source to carry every "
f"(non-blob) target column."
f"source is missing columns {missing} referenced by SET spec"
)
40 changes: 34 additions & 6 deletions paimon-python/pypaimon/ray/data_evolution_merge_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,33 @@
OnSpec = Union[Sequence[str], Mapping[str, str]]


@dataclass(frozen=True)
class SourceColumnRef:
column: str


@dataclass(frozen=True)
class TargetColumnRef:
column: str


@dataclass(frozen=True)
class LiteralValue:
value: Any


def source_col(name: str) -> SourceColumnRef:
return SourceColumnRef(name)


def target_col(name: str) -> TargetColumnRef:
return TargetColumnRef(name)


def lit(value: Any) -> LiteralValue:
return LiteralValue(value)


@dataclass
class WhenMatched:
update: SetSpec
Expand Down Expand Up @@ -105,18 +132,19 @@ def _resolve_spec_array(
on_pairs: Sequence[Tuple[str, str]],
out_type: pa.DataType,
):
if isinstance(val, str) and val.startswith("s."):
ref = val[2:]
if isinstance(val, LiteralValue):
return pa.array([val.value] * batch.num_rows, type=out_type)
if isinstance(val, SourceColumnRef):
ref = val.column
if f"s.{ref}" in available:
return batch.column(f"s.{ref}")
for sk, tk in on_pairs:
if sk == ref and f"t.{tk}" in available:
return batch.column(f"t.{tk}")
return pa.nulls(batch.num_rows, type=out_type)
if isinstance(val, str) and val.startswith("t."):
ref = val[2:]
col_name = f"t.{ref}"
if isinstance(val, TargetColumnRef):
col_name = f"t.{val.column}"
return batch.column(col_name) if col_name in available else pa.nulls(
batch.num_rows, type=out_type
)
return pa.array([val] * batch.num_rows, type=out_type)
raise TypeError(f"unexpected spec value type: {type(val).__name__}")
Loading
Loading