diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b8d87143c9..7fa3e8d117 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -782,10 +782,41 @@ def upsert( if not when_matched_update_all and not when_not_matched_insert_all: raise ValueError("no upsert options selected...exiting") - if upsert_util.has_duplicate_rows(df, join_cols): - raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed") + from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, schema_to_pyarrow - from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible + table_arrow_schema = schema_to_pyarrow(self.table_metadata.schema(), include_field_ids=False) + df_column_names = set(df.schema.names) + + for col in join_cols: + table_field = table_arrow_schema.field(col) + # Table-level rejections: These types are fundamentally unreliable or + # unsupported as join keys regardless of the input data format. + if pa.types.is_floating(table_field.type): + raise ValueError( + f"Floating point column '{col}' cannot be used as a join key in upsert. " + "Floating point equality is unreliable; please cast to Decimal or Integer." + ) + if pa.types.is_nested(table_field.type): + raise ValueError( + f"Nested column '{col}' of type '{table_field.type}' cannot be used as a join key in upsert. " + "Only primitive types are supported." + ) + + # Dataframe-level rejections: only validate when the column is present in the + # source; missing columns are surfaced by _check_pyarrow_schema_compatible below. + if col not in df_column_names: + continue + arr = df.column(col) + if pa.types.is_dictionary(arr.type): + raise NotImplementedError( + f"Dictionary-encoded column '{col}' is not currently supported as a join key in upsert." + ) + if pa.types.is_null(arr.type): + raise ValueError(f"Null-type column '{col}' cannot be used as a join key in upsert.") + if isinstance(arr.type, pa.BaseExtensionType): + raise NotImplementedError( + f"Extension type '{arr.type}' for column '{col}' is not currently supported as a join key in upsert." + ) downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( @@ -795,6 +826,10 @@ def upsert( format_version=self.table_metadata.format_version, ) + # Validate uniqueness after type checks to avoid comparing/hashing unsupported types. + if upsert_util.has_duplicate_rows(df, join_cols): + raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed") + # get list of rows that exist so we don't have to load the entire target table matched_predicate = upsert_util.create_match_filter(df, join_cols) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 08f90c6600..175146aa21 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -323,7 +323,7 @@ def test_key_cols_misaligned(catalog: Catalog) -> None: df_src = ctx.sql("select 1 as item_id, date '2021-05-01' as order_date, 'B' as order_type").to_arrow_table() - with pytest.raises(Exception, match=r"""Field ".*" does not exist in schema"""): + with pytest.raises(ValueError, match="PyArrow table contains more columns: item_id"): table.upsert(df=df_src, join_cols=["order_id"]) @@ -666,7 +666,11 @@ def test_upsert_with_struct_field_as_join_key(catalog: Catalog) -> None: ) with pytest.raises( - pa.lib.ArrowNotImplementedError, match="Keys of type struct" + ValueError, + match=( + "Nested column 'nested_type' of type 'struct' " + "cannot be used as a join key in upsert" + ), ): _ = tbl.upsert(update_data, join_cols=["nested_type"]) @@ -888,3 +892,58 @@ def test_upsert_snapshot_properties(catalog: Catalog) -> None: for snapshot in snapshots[initial_snapshot_count:]: assert snapshot.summary is not None assert snapshot.summary.additional_properties.get("test_prop") == "test_value" + + +@pytest.mark.parametrize( + "arrow_type, expected_error, match", + [ + (pa.float32(), ValueError, "Floating point column 'k' cannot be used as a join key in upsert"), + (pa.float64(), ValueError, "Floating point column 'k' cannot be used as a join key in upsert"), + ( + pa.struct([("a", pa.int32())]), + ValueError, + "Nested column 'k' of type 'struct' cannot be used as a join key in upsert", + ), + ( + pa.list_(pa.int32()), + ValueError, + "Nested column 'k' of type 'large_list' cannot be used as a join key in upsert", + ), + ( + pa.dictionary(pa.int32(), pa.string()), + NotImplementedError, + "Dictionary-encoded column 'k' is not currently supported as a join key in upsert", + ), + (pa.null(), ValueError, "Null-type column 'k' cannot be used as a join key in upsert"), + (pa.uuid(), NotImplementedError, "is not currently supported as a join key in upsert"), + ], + ids=["float32", "float64", "struct", "list", "dictionary", "null", "uuid"], +) +def test_upsert_unsupported_join_column_types( + catalog: Catalog, arrow_type: pa.DataType, expected_error: type[Exception], match: str +) -> None: + """Upsert must clearly reject types that are unreliable (floats) or unsupported (extensions/complex) as join keys.""" + identifier = "default.test_upsert_unsupported_join_column_types" + try: + catalog.drop_table(identifier) + except NoSuchTableError: + pass + + # Define the table schema to be compatible with the arrow_type but still trigger our check + if pa.types.is_dictionary(arrow_type): + table_type = pa.string() + elif pa.types.is_null(arrow_type): + table_type = pa.int32() + else: + table_type = arrow_type + + table = catalog.create_table(identifier, pa.schema([("k", table_type), ("payload", pa.string())])) + + # Source has the "bad" type + source = pa.Table.from_pylist( + [{"k": None, "payload": "val"}], + schema=pa.schema([("k", arrow_type), ("payload", pa.string())]), + ) + + with pytest.raises(expected_error, match=match): + table.upsert(source, join_cols=["k"])