From 2abd44fa24d92b8e49a47e11599bd36329398f4a Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 19 May 2026 14:51:11 -0500 Subject: [PATCH 1/6] refactor(upsert): early rejection of unsupported join column types --- pyiceberg/table/__init__.py | 39 +++++++++++++++++++++++++++++++++---- tests/table/test_upsert.py | 37 ++++++++++++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b8d87143c9..5fbeb2248d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -782,10 +782,7 @@ def upsert( if not when_matched_update_all and not when_not_matched_insert_all: raise ValueError("no upsert options selected...exiting") - if upsert_util.has_duplicate_rows(df, join_cols): - raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed") - - from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible + from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, schema_to_pyarrow downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False _check_pyarrow_schema_compatible( @@ -795,6 +792,40 @@ def upsert( format_version=self.table_metadata.format_version, ) + table_arrow_schema = schema_to_pyarrow(self.table_metadata.schema(), include_field_ids=False) + + for col in join_cols: + table_field = table_arrow_schema.field(col) + # Table-level rejections: These types are fundamentally unreliable or + # unsupported as join keys regardless of the input data format. + if pa.types.is_floating(table_field.type): + raise ValueError( + f"Floating point column '{col}' cannot be used as a join key in upsert. " + "Floating point equality is unreliable; please cast to Decimal or Integer." + ) + if pa.types.is_nested(table_field.type): + raise ValueError( + f"Nested column '{col}' of type '{table_field.type}' cannot be used as a join key in upsert. " + "Only primitive types are supported." + ) + + # Dataframe-level rejections: These implementation-specific formats (e.g., + # dictionary encoding) are not yet supported by the PyArrow join engine. + arr = df.column(col) + if pa.types.is_dictionary(arr.type): + raise NotImplementedError( + f"Dictionary-encoded column '{col}' is not currently supported as a join key in upsert." + ) + if pa.types.is_null(arr.type): + raise ValueError(f"Null-type column '{col}' cannot be used as a join key in upsert.") + if isinstance(arr.type, pa.BaseExtensionType): + raise NotImplementedError( + f"Extension type '{arr.type}' for column '{col}' is not currently supported as a join key in upsert." + ) + + if upsert_util.has_duplicate_rows(df, join_cols): + raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed") + # get list of rows that exist so we don't have to load the entire target table matched_predicate = upsert_util.create_match_filter(df, join_cols) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 08f90c6600..50b1647ba7 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -666,7 +666,7 @@ def test_upsert_with_struct_field_as_join_key(catalog: Catalog) -> None: ) with pytest.raises( - pa.lib.ArrowNotImplementedError, match="Keys of type struct" + ValueError, match="Nested column 'nested_type' of type 'struct' cannot be used as a join key in upsert" ): _ = tbl.upsert(update_data, join_cols=["nested_type"]) @@ -888,3 +888,38 @@ def test_upsert_snapshot_properties(catalog: Catalog) -> None: for snapshot in snapshots[initial_snapshot_count:]: assert snapshot.summary is not None assert snapshot.summary.additional_properties.get("test_prop") == "test_value" + +@pytest.mark.parametrize( + "arrow_type, expected_error, match", + [ + (pa.float32(), ValueError, "Floating point column 'k' cannot be used as a join key in upsert"), + (pa.float64(), ValueError, "Floating point column 'k' cannot be used as a join key in upsert"), + (pa.struct([("a", pa.int32())]), ValueError, "Nested column 'k' of type 'struct' cannot be used as a join key in upsert"), + (pa.list_(pa.int32()), ValueError, "Nested column 'k' of type 'list' cannot be used as a join key in upsert"), + (pa.dictionary(pa.int32(), pa.string()), NotImplementedError, "Dictionary-encoded column 'k' is not currently supported as a join key in upsert"), + (pa.null(), ValueError, "Null-type column 'k' cannot be used as a join key in upsert"), + (pa.uuid(), NotImplementedError, "is not currently supported as a join key in upsert"), + ], + ids=["float32", "float64", "struct", "list", "dictionary", "null", "uuid"], +) +def test_upsert_unsupported_join_column_types( + catalog: Catalog, arrow_type: pa.DataType, expected_error: type[Exception], match: str +) -> None: + """Upsert must clearly reject types that are unreliable (floats) or unsupported (extensions/complex) as join keys.""" + identifier = "default.test_upsert_unsupported_join_column_types" + try: + catalog.drop_table(identifier) + except NoSuchTableError: + pass + + # Create a simple table with a valid schema + table = catalog.create_table(identifier, pa.schema([("id", pa.int32()), ("payload", pa.string())])) + + # Source has the "bad" type + source = pa.Table.from_pylist( + [{"k": None, "payload": "val"}], + schema=pa.schema([("k", arrow_type), ("payload", pa.string())]), + ) + + with pytest.raises(expected_error, match=match): + table.upsert(source, join_cols=["k"]) From 12d978e24054ac27717b5d1945aa2f7dd3eb1ea3 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 19 May 2026 14:59:51 -0500 Subject: [PATCH 2/6] test(upsert): align test schemas with early compatibility check and update error expectations --- tests/table/test_upsert.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 50b1647ba7..36d7d61040 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -323,7 +323,7 @@ def test_key_cols_misaligned(catalog: Catalog) -> None: df_src = ctx.sql("select 1 as item_id, date '2021-05-01' as order_date, 'B' as order_type").to_arrow_table() - with pytest.raises(Exception, match=r"""Field ".*" does not exist in schema"""): + with pytest.raises(ValueError, match="PyArrow table contains more columns: item_id"): table.upsert(df=df_src, join_cols=["order_id"]) @@ -912,14 +912,21 @@ def test_upsert_unsupported_join_column_types( except NoSuchTableError: pass - # Create a simple table with a valid schema - table = catalog.create_table(identifier, pa.schema([("id", pa.int32()), ("payload", pa.string())])) + # Define the table schema to be compatible with the arrow_type but still trigger our check + if pa.types.is_dictionary(arrow_type): + table_type = pa.string() + elif pa.types.is_null(arrow_type): + table_type = pa.int32() + else: + table_type = arrow_type + + table = catalog.create_table(identifier, pa.schema([("k", table_type), ("payload", pa.string())])) # Source has the "bad" type source = pa.Table.from_pylist( [{"k": None, "payload": "val"}], schema=pa.schema([("k", arrow_type), ("payload", pa.string())]), ) - + with pytest.raises(expected_error, match=match): table.upsert(source, join_cols=["k"]) From 336dcf8c46891ab75341f8c397646fa34a12e0d9 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 19 May 2026 15:00:09 -0500 Subject: [PATCH 3/6] docs(upsert): add rationale for duplicate check ordering --- pyiceberg/table/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 5fbeb2248d..ef516fe4ab 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -823,6 +823,7 @@ def upsert( f"Extension type '{arr.type}' for column '{col}' is not currently supported as a join key in upsert." ) + # Validate uniqueness after type checks to avoid comparing/hashing unsupported types. if upsert_util.has_duplicate_rows(df, join_cols): raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed") From c97f724b16c37445156cae5b389e7ad8e9202c37 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Tue, 19 May 2026 15:13:34 -0500 Subject: [PATCH 4/6] style: fix linting issues (line length and whitespace) --- pyiceberg/table/__init__.py | 4 ++-- tests/table/test_upsert.py | 27 ++++++++++++++++++++++----- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index ef516fe4ab..8cf9539a07 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -796,7 +796,7 @@ def upsert( for col in join_cols: table_field = table_arrow_schema.field(col) - # Table-level rejections: These types are fundamentally unreliable or + # Table-level rejections: These types are fundamentally unreliable or # unsupported as join keys regardless of the input data format. if pa.types.is_floating(table_field.type): raise ValueError( @@ -809,7 +809,7 @@ def upsert( "Only primitive types are supported." ) - # Dataframe-level rejections: These implementation-specific formats (e.g., + # Dataframe-level rejections: These implementation-specific formats (e.g., # dictionary encoding) are not yet supported by the PyArrow join engine. arr = df.column(col) if pa.types.is_dictionary(arr.type): diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 36d7d61040..e63482db53 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -666,7 +666,11 @@ def test_upsert_with_struct_field_as_join_key(catalog: Catalog) -> None: ) with pytest.raises( - ValueError, match="Nested column 'nested_type' of type 'struct' cannot be used as a join key in upsert" + ValueError, + match=( + "Nested column 'nested_type' of type 'struct' " + "cannot be used as a join key in upsert" + ), ): _ = tbl.upsert(update_data, join_cols=["nested_type"]) @@ -889,14 +893,27 @@ def test_upsert_snapshot_properties(catalog: Catalog) -> None: assert snapshot.summary is not None assert snapshot.summary.additional_properties.get("test_prop") == "test_value" + @pytest.mark.parametrize( "arrow_type, expected_error, match", [ (pa.float32(), ValueError, "Floating point column 'k' cannot be used as a join key in upsert"), (pa.float64(), ValueError, "Floating point column 'k' cannot be used as a join key in upsert"), - (pa.struct([("a", pa.int32())]), ValueError, "Nested column 'k' of type 'struct' cannot be used as a join key in upsert"), - (pa.list_(pa.int32()), ValueError, "Nested column 'k' of type 'list' cannot be used as a join key in upsert"), - (pa.dictionary(pa.int32(), pa.string()), NotImplementedError, "Dictionary-encoded column 'k' is not currently supported as a join key in upsert"), + ( + pa.struct([("a", pa.int32())]), + ValueError, + "Nested column 'k' of type 'struct' cannot be used as a join key in upsert", + ), + ( + pa.list_(pa.int32()), + ValueError, + "Nested column 'k' of type 'list' cannot be used as a join key in upsert", + ), + ( + pa.dictionary(pa.int32(), pa.string()), + NotImplementedError, + "Dictionary-encoded column 'k' is not currently supported as a join key in upsert", + ), (pa.null(), ValueError, "Null-type column 'k' cannot be used as a join key in upsert"), (pa.uuid(), NotImplementedError, "is not currently supported as a join key in upsert"), ], @@ -911,7 +928,7 @@ def test_upsert_unsupported_join_column_types( catalog.drop_table(identifier) except NoSuchTableError: pass - + # Define the table schema to be compatible with the arrow_type but still trigger our check if pa.types.is_dictionary(arrow_type): table_type = pa.string() From fe60c9790368e03e38237051119d14d459012137 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 21 May 2026 00:11:21 +0000 Subject: [PATCH 5/6] test(upsert): expect large_list in unsupported-type error for list join key The error message renders the type from the Iceberg table's pyarrow schema, and schema_to_pyarrow converts pa.list_ into pa.large_list (see pyiceberg/io/pyarrow.py). The test regex must match the rendered large_list, not the source list. --- tests/table/test_upsert.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index e63482db53..175146aa21 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -907,7 +907,7 @@ def test_upsert_snapshot_properties(catalog: Catalog) -> None: ( pa.list_(pa.int32()), ValueError, - "Nested column 'k' of type 'list' cannot be used as a join key in upsert", + "Nested column 'k' of type 'large_list' cannot be used as a join key in upsert", ), ( pa.dictionary(pa.int32(), pa.string()), From 9928f50d80e310a886c138e6882b00328e6c1e94 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 21 May 2026 00:30:43 +0000 Subject: [PATCH 6/6] fix(upsert): run join-column type rejection before schema compat check A pa.null() source column was being rejected by _check_pyarrow_schema_compatible (format-version=2 forbids null) before the join-column validation could surface the intended "Null-type column ... cannot be used as a join key" error. Reordering the checks lets the upsert-specific rejection fire first, giving users the actionable message. Dataframe-level checks now skip columns that are absent from the source so the pre-existing _check_pyarrow_schema_compatible path still owns the "PyArrow table contains more columns" error in test_key_cols_misaligned. --- pyiceberg/table/__init__.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8cf9539a07..7fa3e8d117 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -784,15 +784,8 @@ def upsert( from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, schema_to_pyarrow - downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False - _check_pyarrow_schema_compatible( - self.table_metadata.schema(), - provided_schema=df.schema, - downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, - format_version=self.table_metadata.format_version, - ) - table_arrow_schema = schema_to_pyarrow(self.table_metadata.schema(), include_field_ids=False) + df_column_names = set(df.schema.names) for col in join_cols: table_field = table_arrow_schema.field(col) @@ -809,8 +802,10 @@ def upsert( "Only primitive types are supported." ) - # Dataframe-level rejections: These implementation-specific formats (e.g., - # dictionary encoding) are not yet supported by the PyArrow join engine. + # Dataframe-level rejections: only validate when the column is present in the + # source; missing columns are surfaced by _check_pyarrow_schema_compatible below. + if col not in df_column_names: + continue arr = df.column(col) if pa.types.is_dictionary(arr.type): raise NotImplementedError( @@ -823,6 +818,14 @@ def upsert( f"Extension type '{arr.type}' for column '{col}' is not currently supported as a join key in upsert." ) + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False + _check_pyarrow_schema_compatible( + self.table_metadata.schema(), + provided_schema=df.schema, + downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, + format_version=self.table_metadata.format_version, + ) + # Validate uniqueness after type checks to avoid comparing/hashing unsupported types. if upsert_util.has_duplicate_rows(df, join_cols): raise ValueError("Duplicate rows found in source dataset based on the key columns. No upsert executed")