Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,41 @@ def upsert(
if not when_matched_update_all and not when_not_matched_insert_all:
raise ValueError("no upsert options selected...exiting")

if upsert_util.has_duplicate_rows(df, join_cols):
raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")
from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, schema_to_pyarrow

from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible
table_arrow_schema = schema_to_pyarrow(self.table_metadata.schema(), include_field_ids=False)
df_column_names = set(df.schema.names)

for col in join_cols:
table_field = table_arrow_schema.field(col)
# Table-level rejections: These types are fundamentally unreliable or
# unsupported as join keys regardless of the input data format.
if pa.types.is_floating(table_field.type):
raise ValueError(
f"Floating point column '{col}' cannot be used as a join key in upsert. "
"Floating point equality is unreliable; please cast to Decimal or Integer."
)
if pa.types.is_nested(table_field.type):
raise ValueError(
f"Nested column '{col}' of type '{table_field.type}' cannot be used as a join key in upsert. "
"Only primitive types are supported."
)

# Dataframe-level rejections: only validate when the column is present in the
# source; missing columns are surfaced by _check_pyarrow_schema_compatible below.
if col not in df_column_names:
continue
arr = df.column(col)
if pa.types.is_dictionary(arr.type):
raise NotImplementedError(
f"Dictionary-encoded column '{col}' is not currently supported as a join key in upsert."
)
if pa.types.is_null(arr.type):
raise ValueError(f"Null-type column '{col}' cannot be used as a join key in upsert.")
if isinstance(arr.type, pa.BaseExtensionType):
raise NotImplementedError(
f"Extension type '{arr.type}' for column '{col}' is not currently supported as a join key in upsert."
)

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
Expand All @@ -795,6 +826,10 @@ def upsert(
format_version=self.table_metadata.format_version,
)

# Validate uniqueness after type checks to avoid comparing/hashing unsupported types.
if upsert_util.has_duplicate_rows(df, join_cols):
raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")

# get list of rows that exist so we don't have to load the entire target table
matched_predicate = upsert_util.create_match_filter(df, join_cols)

Expand Down
63 changes: 61 additions & 2 deletions tests/table/test_upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def test_key_cols_misaligned(catalog: Catalog) -> None:

df_src = ctx.sql("select 1 as item_id, date '2021-05-01' as order_date, 'B' as order_type").to_arrow_table()

with pytest.raises(Exception, match=r"""Field ".*" does not exist in schema"""):
with pytest.raises(ValueError, match="PyArrow table contains more columns: item_id"):
table.upsert(df=df_src, join_cols=["order_id"])


Expand Down Expand Up @@ -666,7 +666,11 @@ def test_upsert_with_struct_field_as_join_key(catalog: Catalog) -> None:
)

with pytest.raises(
pa.lib.ArrowNotImplementedError, match="Keys of type struct<sub1: large_string not null, sub2: large_string not null>"
ValueError,
match=(
"Nested column 'nested_type' of type 'struct<sub1: large_string not null, sub2: large_string not null>' "
"cannot be used as a join key in upsert"
),
):
_ = tbl.upsert(update_data, join_cols=["nested_type"])

Expand Down Expand Up @@ -888,3 +892,58 @@ def test_upsert_snapshot_properties(catalog: Catalog) -> None:
for snapshot in snapshots[initial_snapshot_count:]:
assert snapshot.summary is not None
assert snapshot.summary.additional_properties.get("test_prop") == "test_value"


@pytest.mark.parametrize(
"arrow_type, expected_error, match",
[
(pa.float32(), ValueError, "Floating point column 'k' cannot be used as a join key in upsert"),
(pa.float64(), ValueError, "Floating point column 'k' cannot be used as a join key in upsert"),
(
pa.struct([("a", pa.int32())]),
ValueError,
"Nested column 'k' of type 'struct<a: int32>' cannot be used as a join key in upsert",
),
(
pa.list_(pa.int32()),
ValueError,
"Nested column 'k' of type 'large_list<element: int32>' cannot be used as a join key in upsert",
),
(
pa.dictionary(pa.int32(), pa.string()),
NotImplementedError,
"Dictionary-encoded column 'k' is not currently supported as a join key in upsert",
),
(pa.null(), ValueError, "Null-type column 'k' cannot be used as a join key in upsert"),
(pa.uuid(), NotImplementedError, "is not currently supported as a join key in upsert"),
],
ids=["float32", "float64", "struct", "list", "dictionary", "null", "uuid"],
)
def test_upsert_unsupported_join_column_types(
catalog: Catalog, arrow_type: pa.DataType, expected_error: type[Exception], match: str
) -> None:
"""Upsert must clearly reject types that are unreliable (floats) or unsupported (extensions/complex) as join keys."""
identifier = "default.test_upsert_unsupported_join_column_types"
try:
catalog.drop_table(identifier)
except NoSuchTableError:
pass

# Define the table schema to be compatible with the arrow_type but still trigger our check
if pa.types.is_dictionary(arrow_type):
table_type = pa.string()
elif pa.types.is_null(arrow_type):
table_type = pa.int32()
else:
table_type = arrow_type

table = catalog.create_table(identifier, pa.schema([("k", table_type), ("payload", pa.string())]))

# Source has the "bad" type
source = pa.Table.from_pylist(
[{"k": None, "payload": "val"}],
schema=pa.schema([("k", arrow_type), ("payload", pa.string())]),
)

with pytest.raises(expected_error, match=match):
table.upsert(source, join_cols=["k"])
Loading