diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 39954ef561..2e38b20744 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -392,6 +392,14 @@ class ListViewsResponse(IcebergBaseModel): _PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse) +_S3_COMPATIBLE_SCHEMES = ("s3://", "s3a://", "s3n://", "oss://") + + +def _is_hadoop_only_config(config: dict[str, str]) -> bool: + """Return True if every key is a Hadoop ``fs.*`` key — pyiceberg has no HadoopFileIO to consume them.""" + return bool(config) and all(k.startswith("fs.") for k in config) + + class RestCatalog(Catalog): uri: str _session: Session @@ -458,22 +466,30 @@ def _create_session(self) -> Session: @staticmethod def _resolve_storage_credentials(storage_credentials: list[StorageCredential], location: str | None) -> Properties: - """Resolve the best-matching storage credential by longest prefix match. + """Pick the longest-prefix storage credential for ``location``. - Mirrors the Java implementation in S3FileIO.clientForStoragePath() which iterates - over storage credential prefixes and selects the one with the longest match. + Mirrors Java ``S3FileIO.clientForStoragePath``. Hadoop-only (``fs.*``) + credentials are filtered out since pyiceberg has no HadoopFileIO to + consume them — otherwise a catalog vending both ``fs.*`` and ``s3.*`` + bundles per location could strand the FileIO with unusable keys. See: https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java """ if not storage_credentials or not location: return {} + consumable = [c for c in storage_credentials if not _is_hadoop_only_config(c.config)] + best_match: StorageCredential | None = None - for cred in storage_credentials: + for cred in consumable: if location.startswith(cred.prefix): if best_match is None or len(cred.prefix) > len(best_match.prefix): best_match = cred + # Mirrors Java S3FileIO.clientForStoragePath() ROOT_PREFIX fallback. + if best_match is None and location.startswith(_S3_COMPATIBLE_SCHEMES): + best_match = next((c for c in consumable if c.prefix == "s3"), None) + return best_match.config if best_match else {} def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO: diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index df2f96a392..59df8ada43 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -2845,6 +2845,81 @@ def test_resolve_storage_credentials_empty() -> None: assert RestCatalog._resolve_storage_credentials([], None) == {} +def test_resolve_storage_credentials_skips_hadoop_only() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + # The longer fs.* prefix would win a blind longest-match; the filter drops it. + credentials = [ + StorageCredential(prefix="s3://warehouse/jindo", config={"fs.s3.access-key": "hadoop-k"}), + StorageCredential(prefix="s3://warehouse", config={"s3.access-key-id": "native-k"}), + ] + result = RestCatalog._resolve_storage_credentials(credentials, "s3://warehouse/jindo/table/data") + assert result == {"s3.access-key-id": "native-k"} + + +def test_resolve_storage_credentials_mixed_prefix_namespaces_preserved() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + credentials = [ + StorageCredential(prefix="gs", config={"gs.oauth2.token": "tok"}), + StorageCredential(prefix="s3", config={"s3.access-key-id": "native-k"}), + ] + result = RestCatalog._resolve_storage_credentials(credentials, "gs://bucket/path") + assert result == {"gs.oauth2.token": "tok"} + + +def test_resolve_storage_credentials_all_hadoop_only_returns_empty() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + credentials = [ + StorageCredential(prefix="custom", config={"fs.custom.access-key": "hadoop-k"}), + ] + assert RestCatalog._resolve_storage_credentials(credentials, "custom://bucket/path") == {} + + +def test_resolve_storage_credentials_s3a_s3n_match_root_prefix_directly() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + # s3a:// and s3n:// start with "s3", so they match prefix="s3" via + # startswith at the normal matching stage — no fallback needed. + credentials = [ + StorageCredential(prefix="s3", config={"s3.access-key-id": "native-k"}), + ] + for scheme in ("s3a", "s3n"): + result = RestCatalog._resolve_storage_credentials(credentials, f"{scheme}://bucket/path") + assert result == {"s3.access-key-id": "native-k"}, f"{scheme}:// should match prefix='s3' directly" + + +def test_resolve_storage_credentials_root_prefix_fallback_for_oss() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + # oss:// is S3-API-compatible; pyiceberg routes it through pyarrow's + # S3FileSystem, so the ROOT_PREFIX "s3" credential applies. + credentials = [ + StorageCredential(prefix="s3", config={"s3.access-key-id": "native-k"}), + ] + result = RestCatalog._resolve_storage_credentials(credentials, "oss://bucket/path") + assert result == {"s3.access-key-id": "native-k"} + + +def test_resolve_storage_credentials_root_prefix_fallback_respects_consumable() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + credentials = [ + StorageCredential(prefix="s3", config={"fs.s3.access-key": "hadoop-k"}), + ] + assert RestCatalog._resolve_storage_credentials(credentials, "s3://bucket/path") == {} + + +def test_resolve_storage_credentials_fallback_skipped_for_non_s3_scheme() -> None: + from pyiceberg.catalog.rest.scan_planning import StorageCredential + + credentials = [ + StorageCredential(prefix="s3", config={"s3.access-key-id": "native-k"}), + ] + assert RestCatalog._resolve_storage_credentials(credentials, "gs://bucket/path") == {} + + def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None: metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json" rest_mock.get(