[python][ray] Ray merge into support condition#8076
Conversation
| rewritten: str, on_map: Mapping[str, str], | ||
| ) -> str: | ||
| for s_col, t_col in on_map.items(): | ||
| rewritten = rewritten.replace(f'"s.{s_col}"', f'"t.{t_col}"') |
There was a problem hiding this comment.
Could we avoid doing this raw replace across the whole rewritten SQL? rewrite_condition() keeps string literals intact, but this second pass can still mutate literals that happen to contain the quoted ON-key token. For example, a matched condition like s.note = '"s.id"' AND s.id = 1 is rewritten to "s.note" = '"s.id"' AND "s.id" = 1, then this remap turns it into "s.note" = '"t.id"' AND "t.id" = 1, so DataFusion filters against the wrong literal and can skip/update the wrong rows. It would be safer to apply the ON-key remap while still respecting SQL string-literal spans (similar to rewrite_condition()), and add a regression test for literals containing "s.<on-key>".
There was a problem hiding this comment.
Could we avoid doing this raw replace across the whole rewritten SQL?
rewrite_condition()keeps string literals intact, but this second pass can still mutate literals that happen to contain the quoted ON-key token. For example, a matched condition likes.note = '"s.id"' AND s.id = 1is rewritten to"s.note" = '"s.id"' AND "s.id" = 1, then this remap turns it into"s.note" = '"t.id"' AND "t.id" = 1, so DataFusion filters against the wrong literal and can skip/update the wrong rows. It would be safer to apply the ON-key remap while still respecting SQL string-literal spans (similar torewrite_condition()), and add a regression test for literals containing"s.<on-key>".
Fixed
2f54041 to
8d8ef6a
Compare
Add condition support for WhenMatched and WhenNotMatched clauses using DataFusion SQL engine for expression evaluation. - Condition filtering in both matched (update) and not-matched (insert) paths - Rewrite and remap respect SQL string literal spans - Validate: WhenNotMatched rejects t.* refs, blob column refs rejected - Fail-fast datafusion availability check - Source ON key remapped to target ON key in matched conditions - Add datafusion>=52 to CI dependencies - SessionContext cached per worker, empty batch handled safely
6bc2edc to
88abb4b
Compare
- Create fresh SessionContext per filter_batch call (no global state) - Guard merge_condition import behind condition check - Check WhenNotMatched target-ref before blob-ref for clearer errors - Clarify num_matched semantics in comment
88abb4b to
23ef745
Compare
Local dev installs via requirements-dev.txt were missing datafusion, causing condition integration tests to fail outside CI.
|
LGTM. I went through the condition rewrite/evaluation path, matched and not-matched filtering, blob column validation, empty-batch handling, and the new test coverage. The current implementation looks reasonable to me. Two minor non-blocking suggestions:
|
Two source rows match the same target row (id=1). Without condition this would raise "multiple source rows". With condition s.age > t.age, only the row with age=20 passes (age=5 is filtered), so the update succeeds with exactly one matching row.
Check that s.* and t.* references in condition expressions exist in the source and target schemas at merge_into call time, instead of deferring to DataFusion runtime errors.
Add @unittest.skipIf decorator to all condition E2E tests so they gracefully skip on Python < 3.10 or environments without datafusion.
test_not_matched_condition_rejects_target_refs also requires datafusion (_prepare calls _require_datafusion before the ValueError check).
Thanks, updated. |
Purpose
Tests