diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 391cca78b0..e1c6e27052 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -685,6 +685,9 @@ catalog: | hive.kerberos-authentication | true | Using authentication via Kerberos | | hive.kerberos-service-name | hive | Kerberos service name (default hive) | | ugi | t-1234:secret | Hadoop UGI for Hive client. | +| hive.metastore.authentication | DIGEST-MD5 | Auth mechanism: `NONE` (default), `KERBEROS`, or `DIGEST-MD5` | + +When using DIGEST-MD5 authentication, PyIceberg reads a Hive delegation token from the file pointed to by the `$HADOOP_TOKEN_FILE_LOCATION` environment variable. This is the standard mechanism used in secure Hadoop environments where delegation tokens are distributed to jobs. Install PyIceberg with `pip install "pyiceberg[hive]"` to get the required `pure-sasl` dependency. When using Hive 2.x, make sure to set the compatibility flag: diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 1bec186ca8..0b3a0b9741 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -63,6 +63,7 @@ ) from pyiceberg.exceptions import ( CommitFailedException, + HiveAuthError, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchIcebergTableError, @@ -109,6 +110,7 @@ UnknownType, UUIDType, ) +from pyiceberg.utils.hadoop_credentials import read_hive_delegation_token from pyiceberg.utils.properties import property_as_bool, property_as_float if TYPE_CHECKING: @@ -127,6 +129,9 @@ HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name" HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive" +HIVE_METASTORE_AUTH = "hive.metastore.authentication" +HIVE_METASTORE_AUTH_DEFAULT = "NONE" + LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time" LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time" LOCK_CHECK_RETRIES = "lock-check-retries" @@ -139,6 +144,20 @@ logger = logging.getLogger(__name__) +class _DigestMD5SaslTransport(TTransport.TSaslClientTransport): + """TSaslClientTransport subclass that works around THRIFT-5926. + + The upstream ``TSaslClientTransport.open()`` passes the first + ``sasl.process()`` response directly to ``_send_sasl_message()``, + but for DIGEST-MD5 the initial response is ``None`` (challenge-first + mechanism). Sending ``None`` causes a ``TypeError``. This subclass + coerces ``None`` to ``b""`` so the SASL handshake proceeds normally. + """ + + def send_sasl_msg(self, status: int, body: bytes | None) -> None: # type: ignore[override] + super().send_sasl_msg(status, body if body is not None else b"") + + class _HiveClient: """Helper class to nicely open and close the transport.""" @@ -150,21 +169,44 @@ def __init__( uri: str, ugi: str | None = None, kerberos_auth: bool | None = HIVE_KERBEROS_AUTH_DEFAULT, - kerberos_service_name: str | None = HIVE_KERBEROS_SERVICE_NAME, + kerberos_service_name: str | None = HIVE_KERBEROS_SERVICE_NAME_DEFAULT, + auth_mechanism: str | None = None, ): self._uri = uri - self._kerberos_auth = kerberos_auth self._kerberos_service_name = kerberos_service_name self._ugi = ugi.split(":") if ugi else None + + # Resolve auth mechanism: explicit auth_mechanism takes precedence, + # then fall back to legacy kerberos_auth boolean for backward compat. + if auth_mechanism is not None: + self._auth_mechanism = auth_mechanism.upper() + elif kerberos_auth: + self._auth_mechanism = "KERBEROS" + else: + self._auth_mechanism = HIVE_METASTORE_AUTH_DEFAULT + self._transport = self._init_thrift_transport() def _init_thrift_transport(self) -> TTransport: url_parts = urlparse(self._uri) socket = TSocket.TSocket(url_parts.hostname, url_parts.port) - if not self._kerberos_auth: + + if self._auth_mechanism == "NONE": return TTransport.TBufferedTransport(socket) - else: + elif self._auth_mechanism == "KERBEROS": return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service=self._kerberos_service_name) + elif self._auth_mechanism == "DIGEST-MD5": + identifier, password = read_hive_delegation_token() + return _DigestMD5SaslTransport( + socket, + host=url_parts.hostname, + service=self._kerberos_service_name, + mechanism="DIGEST-MD5", + username=identifier, + password=password, + ) + else: + raise HiveAuthError(f"Unknown auth mechanism: {self._auth_mechanism!r}. Valid values: NONE, KERBEROS, DIGEST-MD5") def _client(self) -> Client: protocol = TBinaryProtocol.TBinaryProtocol(self._transport) @@ -319,6 +361,7 @@ def _create_hive_client(properties: dict[str, str]) -> _HiveClient: properties.get("ugi"), property_as_bool(properties, HIVE_KERBEROS_AUTH, HIVE_KERBEROS_AUTH_DEFAULT), properties.get(HIVE_KERBEROS_SERVICE_NAME, HIVE_KERBEROS_SERVICE_NAME_DEFAULT), + auth_mechanism=properties.get(HIVE_METASTORE_AUTH), ) except BaseException as e: last_exception = e diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py index e755c73095..76a34a6e23 100644 --- a/pyiceberg/exceptions.py +++ b/pyiceberg/exceptions.py @@ -130,3 +130,7 @@ class WaitingForLockException(Exception): class ValidationException(Exception): """Raised when validation fails.""" + + +class HiveAuthError(Exception): + """Raised when Hive Metastore authentication fails.""" diff --git a/pyiceberg/utils/hadoop_credentials.py b/pyiceberg/utils/hadoop_credentials.py new file mode 100644 index 0000000000..6b345a83ee --- /dev/null +++ b/pyiceberg/utils/hadoop_credentials.py @@ -0,0 +1,142 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Hadoop Delegation Token Service (HDTS) file parser. + +Reads delegation tokens from the binary token file pointed to by +the ``$HADOOP_TOKEN_FILE_LOCATION`` environment variable. +""" + +from __future__ import annotations + +import base64 +import os +from io import BytesIO + +from pyiceberg.exceptions import HiveAuthError + +HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" +HIVE_DELEGATION_TOKEN_KIND = "HIVE_DELEGATION_TOKEN" +HDTS_MAGIC = b"HDTS" +HDTS_SUPPORTED_VERSION = 0 + + +def _read_hadoop_vint(stream: BytesIO) -> int: + """Decode a Hadoop WritableUtils VInt/VLong from a byte stream. + + Matches the encoding in Java's ``WritableUtils.readVInt``/``readVLong``: + - If the first byte (interpreted as signed) is >= -112, it *is* the value. + - Otherwise the first byte encodes both a negativity flag and the number + of additional big-endian payload bytes that carry the actual value. + """ + first = stream.read(1) + if not first: + raise HiveAuthError("Unexpected end of token file while reading VInt") + # Reinterpret as signed byte to match Java's signed-byte semantics + b = first[0] + if b > 127: + b -= 256 + if b >= -112: + return b + negative = b < -120 + length = (-119 - b) if negative else (-111 - b) + extra = stream.read(length) + if len(extra) != length: + raise HiveAuthError("Unexpected end of token file while reading VInt") + result = 0 + for byte_val in extra: + result = (result << 8) | byte_val + if negative: + result = ~result + return result + + +def _read_hadoop_bytes(stream: BytesIO) -> bytes: + """Read a VInt-prefixed byte array from a Hadoop token stream.""" + length = _read_hadoop_vint(stream) + if length < 0: + raise HiveAuthError(f"Invalid byte array length: {length}") + data = stream.read(length) + if len(data) != length: + raise HiveAuthError("Unexpected end of token file while reading byte array") + return data + + +def _read_hadoop_text(stream: BytesIO) -> str: + """Read a VInt-prefixed UTF-8 string from a Hadoop token stream.""" + raw = _read_hadoop_bytes(stream) + try: + return raw.decode("utf-8") + except UnicodeDecodeError as e: + raise HiveAuthError(f"Token file contains invalid UTF-8 in text field: {e}") from e + + +def read_hive_delegation_token() -> tuple[str, str]: + """Read a Hive delegation token from ``$HADOOP_TOKEN_FILE_LOCATION``. + + Returns: + A ``(identifier, password)`` tuple where both values are + base64-encoded strings suitable for SASL DIGEST-MD5 auth. + + Raises: + HiveAuthError: If the token file is missing, malformed, or + does not contain a ``HIVE_DELEGATION_TOKEN``. + """ + token_file = os.environ.get(HADOOP_TOKEN_FILE_LOCATION) + if not token_file: + raise HiveAuthError( + f"${HADOOP_TOKEN_FILE_LOCATION} environment variable is not set. " + "A Hadoop delegation token file is required for DIGEST-MD5 authentication." + ) + + try: + with open(token_file, "rb") as f: + data = f.read() + except OSError as e: + raise HiveAuthError(f"Cannot read Hadoop token file {token_file}: {e}") from e + + stream = BytesIO(data) + + magic = stream.read(4) + if magic != HDTS_MAGIC: + raise HiveAuthError(f"Invalid Hadoop token file magic: expected {HDTS_MAGIC!r}, got {magic!r}") + + version_byte = stream.read(1) + if not version_byte: + raise HiveAuthError("Unexpected end of token file while reading version") + version = version_byte[0] + if version != HDTS_SUPPORTED_VERSION: + raise HiveAuthError(f"Unsupported Hadoop token file version: {version}") + + num_tokens = _read_hadoop_vint(stream) + + for _ in range(num_tokens): + # Each token entry: identifier_bytes, password_bytes, kind_text, service_text + identifier_bytes = _read_hadoop_bytes(stream) + password_bytes = _read_hadoop_bytes(stream) + kind = _read_hadoop_text(stream) + _service = _read_hadoop_text(stream) + + if kind == HIVE_DELEGATION_TOKEN_KIND: + return ( + base64.b64encode(identifier_bytes).decode("ascii"), + base64.b64encode(password_bytes).decode("ascii"), + ) + + raise HiveAuthError( + f"No {HIVE_DELEGATION_TOKEN_KIND} found in token file: {token_file}. File contains {num_tokens} token(s)." + ) diff --git a/pyproject.toml b/pyproject.toml index adbbd4fe2f..6dfccc06f5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,7 +74,10 @@ bodo = ["bodo>=2025.7.4"] daft = ["daft>=0.5.0"] polars = ["polars>=1.21.0,<2"] snappy = ["python-snappy>=0.6.0,<1.0.0"] -hive = ["thrift>=0.13.0,<1.0.0"] +hive = [ + "thrift>=0.13.0,<1.0.0", + "pure-sasl>=0.6.0,<1.0.0", +] hive-kerberos = [ "thrift>=0.13.0,<1.0.0", "thrift-sasl>=0.4.3", diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 88b653e44f..6821aaae68 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -41,6 +41,7 @@ ) from hive_metastore.ttypes import Database as HiveDatabase from hive_metastore.ttypes import Table as HiveTable +from thrift.transport import TSocket, TTransport from pyiceberg.catalog import PropertiesUpdateSummary from pyiceberg.catalog.hive import ( @@ -48,11 +49,13 @@ DO_NOT_UPDATE_STATS_DEFAULT, HIVE_KERBEROS_AUTH, HIVE_KERBEROS_SERVICE_NAME, + HIVE_METASTORE_AUTH, LOCK_CHECK_MAX_WAIT_TIME, LOCK_CHECK_MIN_WAIT_TIME, LOCK_CHECK_RETRIES, HiveCatalog, _construct_hive_storage_descriptor, + _DigestMD5SaslTransport, _HiveClient, ) from pyiceberg.exceptions import ( @@ -1326,7 +1329,7 @@ def test_create_hive_client_success() -> None: with patch("pyiceberg.catalog.hive._HiveClient", return_value=MagicMock()) as mock_hive_client: client = HiveCatalog._create_hive_client(properties) - mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user", False, "hive") + mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user", False, "hive", auth_mechanism=None) assert client is not None @@ -1339,7 +1342,7 @@ def test_create_hive_client_with_kerberos_success() -> None: } with patch("pyiceberg.catalog.hive._HiveClient", return_value=MagicMock()) as mock_hive_client: client = HiveCatalog._create_hive_client(properties) - mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user", True, "hiveuser") + mock_hive_client.assert_called_once_with("thrift://localhost:10000", "user", True, "hiveuser", auth_mechanism=None) assert client is not None @@ -1352,7 +1355,10 @@ def test_create_hive_client_multiple_uris() -> None: client = HiveCatalog._create_hive_client(properties) assert mock_hive_client.call_count == 2 mock_hive_client.assert_has_calls( - [call("thrift://localhost:10000", "user", False, "hive"), call("thrift://localhost:10001", "user", False, "hive")] + [ + call("thrift://localhost:10000", "user", False, "hive", auth_mechanism=None), + call("thrift://localhost:10001", "user", False, "hive", auth_mechanism=None), + ] ) assert client is not None @@ -1407,3 +1413,102 @@ def test_create_hive_client_with_kerberos_using_context_manager( # closing and re-opening work as expected. with client as open_client: assert open_client._iprot.trans.isOpen() + + +def _fake_read_token() -> tuple[str, str]: + """Return a fake delegation token for tests.""" + return ("dGVzdC1pZA==", "dGVzdC1wdw==") + + +def test_auth_mechanism_none_creates_buffered_transport_explicit() -> None: + """When auth_mechanism is explicitly NONE, a TBufferedTransport is created.""" + client = _HiveClient(uri="thrift://localhost:9083", auth_mechanism="NONE") + assert isinstance(client._transport, TTransport.TBufferedTransport) + assert client._auth_mechanism == "NONE" + + +def test_auth_mechanism_kerberos_resolved(monkeypatch: pytest.MonkeyPatch) -> None: + """When auth_mechanism is KERBEROS, _auth_mechanism is resolved correctly.""" + # Stub TSaslClientTransport.__init__ to avoid requiring the kerberos C module + monkeypatch.setattr(TTransport.TSaslClientTransport, "__init__", lambda *a, **kw: None) + client = _HiveClient(uri="thrift://localhost:9083", auth_mechanism="KERBEROS") + assert client._auth_mechanism == "KERBEROS" + assert isinstance(client._transport, TTransport.TSaslClientTransport) + + +def test_auth_mechanism_digest_md5_creates_digest_transport(monkeypatch: pytest.MonkeyPatch) -> None: + """When auth_mechanism is DIGEST-MD5, a _DigestMD5SaslTransport is created.""" + monkeypatch.setattr("pyiceberg.catalog.hive.read_hive_delegation_token", _fake_read_token) + client = _HiveClient(uri="thrift://localhost:9083", auth_mechanism="DIGEST-MD5") + assert isinstance(client._transport, _DigestMD5SaslTransport) + + +def test_legacy_kerberos_auth_backward_compat(monkeypatch: pytest.MonkeyPatch) -> None: + """Legacy kerberos_auth=True resolves to KERBEROS auth_mechanism.""" + monkeypatch.setattr(TTransport.TSaslClientTransport, "__init__", lambda *a, **kw: None) + client = _HiveClient(uri="thrift://localhost:9083", kerberos_auth=True) + assert client._auth_mechanism == "KERBEROS" + assert isinstance(client._transport, TTransport.TSaslClientTransport) + + +def test_auth_mechanism_overrides_kerberos_auth() -> None: + """Explicit auth_mechanism takes precedence over kerberos_auth boolean.""" + client = _HiveClient(uri="thrift://localhost:9083", kerberos_auth=True, auth_mechanism="NONE") + assert isinstance(client._transport, TTransport.TBufferedTransport) + assert client._auth_mechanism == "NONE" + + +def test_auth_mechanism_unknown_raises() -> None: + """Unknown auth mechanism should raise HiveAuthError, not silently fall back.""" + from pyiceberg.exceptions import HiveAuthError + + with pytest.raises(HiveAuthError, match="Unknown auth mechanism.*PLAIN"): + _HiveClient(uri="thrift://localhost:9083", auth_mechanism="PLAIN") + + +def test_auth_mechanism_case_insensitive(monkeypatch: pytest.MonkeyPatch) -> None: + """Auth mechanism should be case-insensitive.""" + monkeypatch.setattr("pyiceberg.catalog.hive.read_hive_delegation_token", _fake_read_token) + client = _HiveClient(uri="thrift://localhost:9083", auth_mechanism="digest-md5") + assert isinstance(client._transport, _DigestMD5SaslTransport) + + +def test_create_hive_client_passes_auth_mechanism(monkeypatch: pytest.MonkeyPatch) -> None: + """_create_hive_client passes hive.metastore.authentication to _HiveClient.""" + monkeypatch.setattr("pyiceberg.catalog.hive.read_hive_delegation_token", _fake_read_token) + properties = { + "uri": "thrift://localhost:9083", + HIVE_METASTORE_AUTH: "DIGEST-MD5", + } + client = HiveCatalog._create_hive_client(properties) + assert client._auth_mechanism == "DIGEST-MD5" + + +def test_digest_md5_transport_send_sasl_msg_coerces_none(monkeypatch: pytest.MonkeyPatch) -> None: + """_DigestMD5SaslTransport.send_sasl_msg coerces None body to b''.""" + monkeypatch.setattr("pyiceberg.catalog.hive.read_hive_delegation_token", _fake_read_token) + + transport = _DigestMD5SaslTransport( + TSocket.TSocket("localhost", 9083), + host="localhost", + service="hive", + mechanism="DIGEST-MD5", + username="dGVzdC1pZA==", + password="dGVzdC1wdw==", + ) + + # Capture what the parent send_sasl_msg receives + captured_calls: list[tuple[int, bytes | None]] = [] + + def capture_send(self: TTransport.TSaslClientTransport, status: int, body: bytes | None) -> None: + captured_calls.append((status, body)) + + monkeypatch.setattr(TTransport.TSaslClientTransport, "send_sasl_msg", capture_send) + + # Send with None body — should be coerced to b"" + transport.send_sasl_msg(1, None) + # Send with real body — should pass through unchanged + transport.send_sasl_msg(2, b"real-data") + + assert captured_calls[0] == (1, b""), "None body should be coerced to b''" + assert captured_calls[1] == (2, b"real-data"), "Non-None body should pass through" diff --git a/tests/utils/test_hadoop_credentials.py b/tests/utils/test_hadoop_credentials.py new file mode 100644 index 0000000000..9ad4e21872 --- /dev/null +++ b/tests/utils/test_hadoop_credentials.py @@ -0,0 +1,274 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint: disable=protected-access + +import base64 +import pathlib +import struct +from io import BytesIO + +import pytest + +from pyiceberg.exceptions import HiveAuthError +from pyiceberg.utils.hadoop_credentials import ( + HADOOP_TOKEN_FILE_LOCATION, + _read_hadoop_bytes, + _read_hadoop_text, + _read_hadoop_vint, + read_hive_delegation_token, +) + + +def _write_vint(value: int) -> bytes: + """Encode an integer as a Hadoop VInt (matching Java WritableUtils.writeVLong).""" + if -112 <= value <= 127: + return struct.pack("b", value) + negative = value < 0 + work = ~value if negative else value + # Java: len starts at -120 (negative) or -112 (positive), + # decrements for each significant byte in the value + prefix = -120 if negative else -112 + tmp = work + while tmp != 0: + tmp >>= 8 + prefix -= 1 + num_bytes = (-119 - prefix) if negative else (-111 - prefix) + result = struct.pack("b", prefix) + result += work.to_bytes(num_bytes, byteorder="big", signed=False) + return result + + +def _write_bytes(data: bytes) -> bytes: + """Write VInt-prefixed byte array.""" + return _write_vint(len(data)) + data + + +def _write_text(text: str) -> bytes: + """Write VInt-prefixed UTF-8 string.""" + encoded = text.encode("utf-8") + return _write_vint(len(encoded)) + encoded + + +def _build_token_file(tokens: list[tuple[bytes, bytes, str, str]]) -> bytes: + """Build a valid HDTS binary file with the given tokens. + + Each token is (identifier_bytes, password_bytes, kind, service). + """ + buf = bytearray() + buf.extend(b"HDTS") # magic + buf.append(0) # version + buf.extend(_write_vint(len(tokens))) + for identifier, password, kind, service in tokens: + buf.extend(_write_bytes(identifier)) + buf.extend(_write_bytes(password)) + buf.extend(_write_text(kind)) + buf.extend(_write_text(service)) + return bytes(buf) + + +# --- VInt unit tests --- + + +def test_read_hadoop_vint_single_byte() -> None: + stream = BytesIO(bytes([42])) + assert _read_hadoop_vint(stream) == 42 + + +def test_read_hadoop_vint_zero() -> None: + stream = BytesIO(bytes([0])) + assert _read_hadoop_vint(stream) == 0 + + +def test_read_hadoop_vint_max_single_byte() -> None: + stream = BytesIO(bytes([0x7F])) + assert _read_hadoop_vint(stream) == 127 + + +def test_read_hadoop_vint_negative_single_byte() -> None: + """Values -112 through -1 are single-byte in Hadoop VInt.""" + for value in [-1, -50, -112]: + encoded = _write_vint(value) + assert _read_hadoop_vint(BytesIO(encoded)) == value + + +def test_read_hadoop_vint_multi_byte_positive() -> None: + """Values > 127 require multi-byte encoding.""" + for value in [128, 255, 256, 1000, 65535]: + encoded = _write_vint(value) + assert _read_hadoop_vint(BytesIO(encoded)) == value + + +def test_read_hadoop_vint_multi_byte_negative() -> None: + """Values < -112 require multi-byte encoding with negative flag.""" + for value in [-113, -200, -1000]: + encoded = _write_vint(value) + assert _read_hadoop_vint(BytesIO(encoded)) == value + + +def test_read_hadoop_vint_empty_stream() -> None: + stream = BytesIO(b"") + with pytest.raises(HiveAuthError, match="Unexpected end of token file"): + _read_hadoop_vint(stream) + + +def test_read_hadoop_vint_truncated_multi_byte() -> None: + """Multi-byte VInt with missing payload bytes should raise.""" + # Prefix byte for 2-byte positive value, but no payload + stream = BytesIO(struct.pack("b", -113)) + with pytest.raises(HiveAuthError, match="Unexpected end of token file"): + _read_hadoop_vint(stream) + + +# --- Bytes/Text unit tests --- + + +def test_read_hadoop_bytes() -> None: + data = b"hello" + stream = BytesIO(_write_bytes(data)) + assert _read_hadoop_bytes(stream) == data + + +def test_read_hadoop_text() -> None: + stream = BytesIO(_write_text("hello")) + assert _read_hadoop_text(stream) == "hello" + + +def test_read_hadoop_text_invalid_utf8() -> None: + """Invalid UTF-8 in text field should raise HiveAuthError.""" + invalid_bytes = b"\xff\xfe" + raw = _write_vint(len(invalid_bytes)) + invalid_bytes + with pytest.raises(HiveAuthError, match="invalid UTF-8"): + _read_hadoop_text(BytesIO(raw)) + + +# --- Token file tests --- + + +def test_read_hive_delegation_token_valid(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: + identifier = b"test-identifier-bytes" + password = b"test-password-bytes" + token_data = _build_token_file( + [ + (identifier, password, "HIVE_DELEGATION_TOKEN", "hive_service"), + ] + ) + + token_file = tmp_path / "token_file" + token_file.write_bytes(token_data) + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) + result_id, result_pw = read_hive_delegation_token() + + assert result_id == base64.b64encode(identifier).decode("ascii") + assert result_pw == base64.b64encode(password).decode("ascii") + + +def test_read_hive_delegation_token_multiple_tokens(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: + """The parser should find the HIVE_DELEGATION_TOKEN even if other tokens come first.""" + identifier = b"hive-id" + password = b"hive-pw" + token_data = _build_token_file( + [ + (b"hdfs-id", b"hdfs-pw", "HDFS_DELEGATION_TOKEN", "hdfs_service"), + (identifier, password, "HIVE_DELEGATION_TOKEN", "hive_service"), + ] + ) + + token_file = tmp_path / "token_file" + token_file.write_bytes(token_data) + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) + result_id, result_pw = read_hive_delegation_token() + + assert result_id == base64.b64encode(identifier).decode("ascii") + assert result_pw == base64.b64encode(password).decode("ascii") + + +def test_read_hive_delegation_token_env_not_set(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv(HADOOP_TOKEN_FILE_LOCATION, raising=False) + with pytest.raises(HiveAuthError, match="HADOOP_TOKEN_FILE_LOCATION.*not set"): + read_hive_delegation_token() + + +def test_read_hive_delegation_token_file_not_found(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, "/nonexistent/path/token_file") + with pytest.raises(HiveAuthError, match="Cannot read Hadoop token file"): + read_hive_delegation_token() + + +def test_read_hive_delegation_token_permission_error(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Permission errors on the token file should raise HiveAuthError.""" + token_file = tmp_path / "token_file" + token_file.write_bytes(b"HDTS\x00\x00") + token_file.chmod(0o000) + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) + try: + with pytest.raises(HiveAuthError, match="Cannot read Hadoop token file"): + read_hive_delegation_token() + finally: + token_file.chmod(0o644) + + +def test_read_hive_delegation_token_bad_magic(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: + token_file = tmp_path / "token_file" + token_file.write_bytes(b"BAAD\x00\x00") + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) + with pytest.raises(HiveAuthError, match="Invalid Hadoop token file magic"): + read_hive_delegation_token() + + +def test_read_hive_delegation_token_unsupported_version(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: + token_file = tmp_path / "token_file" + token_file.write_bytes(b"HDTS\x01\x00") # version 1 + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) + with pytest.raises(HiveAuthError, match="Unsupported.*version"): + read_hive_delegation_token() + + +def test_read_hive_delegation_token_no_hive_token(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: + token_data = _build_token_file( + [ + (b"hdfs-id", b"hdfs-pw", "HDFS_DELEGATION_TOKEN", "hdfs_service"), + ] + ) + + token_file = tmp_path / "token_file" + token_file.write_bytes(token_data) + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) + with pytest.raises(HiveAuthError, match="No HIVE_DELEGATION_TOKEN found"): + read_hive_delegation_token() + + +def test_read_hive_delegation_token_truncated(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: + # Build a valid file and then truncate it + token_data = _build_token_file( + [ + (b"test-id", b"test-pw", "HIVE_DELEGATION_TOKEN", "hive_service"), + ] + ) + truncated = token_data[:10] # Cut off in the middle + + token_file = tmp_path / "token_file" + token_file.write_bytes(truncated) + + monkeypatch.setenv(HADOOP_TOKEN_FILE_LOCATION, str(token_file)) + with pytest.raises(HiveAuthError, match="Unexpected end of token file"): + read_hive_delegation_token() diff --git a/uv.lock b/uv.lock index f686b2eed9..46253d1bbd 100644 --- a/uv.lock +++ b/uv.lock @@ -1837,7 +1837,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7d/ed/6bfa4109fcb23a58819600392564fea69cdc6551ffd5e69ccf1d52a40cbc/greenlet-3.2.4-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:8c68325b0d0acf8d91dde4e6f930967dd52a5302cd4062932a6b2e7c2969f47c", size = 271061, upload-time = "2025-08-07T13:17:15.373Z" }, { url = "https://files.pythonhosted.org/packages/2a/fc/102ec1a2fc015b3a7652abab7acf3541d58c04d3d17a8d3d6a44adae1eb1/greenlet-3.2.4-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:94385f101946790ae13da500603491f04a76b6e4c059dab271b3ce2e283b2590", size = 629475, upload-time = "2025-08-07T13:42:54.009Z" }, { url = "https://files.pythonhosted.org/packages/c5/26/80383131d55a4ac0fb08d71660fd77e7660b9db6bdb4e8884f46d9f2cc04/greenlet-3.2.4-cp310-cp310-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:f10fd42b5ee276335863712fa3da6608e93f70629c631bf77145021600abc23c", size = 640802, upload-time = "2025-08-07T13:45:25.52Z" }, - { url = "https://files.pythonhosted.org/packages/9f/7c/e7833dbcd8f376f3326bd728c845d31dcde4c84268d3921afcae77d90d08/greenlet-3.2.4-cp310-cp310-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:c8c9e331e58180d0d83c5b7999255721b725913ff6bc6cf39fa2a45841a4fd4b", size = 636703, upload-time = "2025-08-07T13:53:12.622Z" }, { url = "https://files.pythonhosted.org/packages/e9/49/547b93b7c0428ede7b3f309bc965986874759f7d89e4e04aeddbc9699acb/greenlet-3.2.4-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:58b97143c9cc7b86fc458f215bd0932f1757ce649e05b640fea2e79b54cedb31", size = 635417, upload-time = "2025-08-07T13:18:25.189Z" }, { url = "https://files.pythonhosted.org/packages/7f/91/ae2eb6b7979e2f9b035a9f612cf70f1bf54aad4e1d125129bef1eae96f19/greenlet-3.2.4-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c2ca18a03a8cfb5b25bc1cbe20f3d9a4c80d8c3b13ba3df49ac3961af0b1018d", size = 584358, upload-time = "2025-08-07T13:18:23.708Z" }, { url = "https://files.pythonhosted.org/packages/f7/85/433de0c9c0252b22b16d413c9407e6cb3b41df7389afc366ca204dbc1393/greenlet-3.2.4-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:9fe0a28a7b952a21e2c062cd5756d34354117796c6d9215a87f55e38d15402c5", size = 1113550, upload-time = "2025-08-07T13:42:37.467Z" }, @@ -1848,7 +1847,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a4/de/f28ced0a67749cac23fecb02b694f6473f47686dff6afaa211d186e2ef9c/greenlet-3.2.4-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:96378df1de302bc38e99c3a9aa311967b7dc80ced1dcc6f171e99842987882a2", size = 272305, upload-time = "2025-08-07T13:15:41.288Z" }, { url = "https://files.pythonhosted.org/packages/09/16/2c3792cba130000bf2a31c5272999113f4764fd9d874fb257ff588ac779a/greenlet-3.2.4-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:1ee8fae0519a337f2329cb78bd7a8e128ec0f881073d43f023c7b8d4831d5246", size = 632472, upload-time = "2025-08-07T13:42:55.044Z" }, { url = "https://files.pythonhosted.org/packages/ae/8f/95d48d7e3d433e6dae5b1682e4292242a53f22df82e6d3dda81b1701a960/greenlet-3.2.4-cp311-cp311-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:94abf90142c2a18151632371140b3dba4dee031633fe614cb592dbb6c9e17bc3", size = 644646, upload-time = "2025-08-07T13:45:26.523Z" }, - { url = "https://files.pythonhosted.org/packages/d5/5e/405965351aef8c76b8ef7ad370e5da58d57ef6068df197548b015464001a/greenlet-3.2.4-cp311-cp311-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:4d1378601b85e2e5171b99be8d2dc85f594c79967599328f95c1dc1a40f1c633", size = 640519, upload-time = "2025-08-07T13:53:13.928Z" }, { url = "https://files.pythonhosted.org/packages/25/5d/382753b52006ce0218297ec1b628e048c4e64b155379331f25a7316eb749/greenlet-3.2.4-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:0db5594dce18db94f7d1650d7489909b57afde4c580806b8d9203b6e79cdc079", size = 639707, upload-time = "2025-08-07T13:18:27.146Z" }, { url = "https://files.pythonhosted.org/packages/1f/8e/abdd3f14d735b2929290a018ecf133c901be4874b858dd1c604b9319f064/greenlet-3.2.4-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2523e5246274f54fdadbce8494458a2ebdcdbc7b802318466ac5606d3cded1f8", size = 587684, upload-time = "2025-08-07T13:18:25.164Z" }, { url = "https://files.pythonhosted.org/packages/5d/65/deb2a69c3e5996439b0176f6651e0052542bb6c8f8ec2e3fba97c9768805/greenlet-3.2.4-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:1987de92fec508535687fb807a5cea1560f6196285a4cde35c100b8cd632cc52", size = 1116647, upload-time = "2025-08-07T13:42:38.655Z" }, @@ -1859,7 +1857,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/44/69/9b804adb5fd0671f367781560eb5eb586c4d495277c93bde4307b9e28068/greenlet-3.2.4-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:3b67ca49f54cede0186854a008109d6ee71f66bd57bb36abd6d0a0267b540cdd", size = 274079, upload-time = "2025-08-07T13:15:45.033Z" }, { url = "https://files.pythonhosted.org/packages/46/e9/d2a80c99f19a153eff70bc451ab78615583b8dac0754cfb942223d2c1a0d/greenlet-3.2.4-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ddf9164e7a5b08e9d22511526865780a576f19ddd00d62f8a665949327fde8bb", size = 640997, upload-time = "2025-08-07T13:42:56.234Z" }, { url = "https://files.pythonhosted.org/packages/3b/16/035dcfcc48715ccd345f3a93183267167cdd162ad123cd93067d86f27ce4/greenlet-3.2.4-cp312-cp312-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:f28588772bb5fb869a8eb331374ec06f24a83a9c25bfa1f38b6993afe9c1e968", size = 655185, upload-time = "2025-08-07T13:45:27.624Z" }, - { url = "https://files.pythonhosted.org/packages/31/da/0386695eef69ffae1ad726881571dfe28b41970173947e7c558d9998de0f/greenlet-3.2.4-cp312-cp312-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:5c9320971821a7cb77cfab8d956fa8e39cd07ca44b6070db358ceb7f8797c8c9", size = 649926, upload-time = "2025-08-07T13:53:15.251Z" }, { url = "https://files.pythonhosted.org/packages/68/88/69bf19fd4dc19981928ceacbc5fd4bb6bc2215d53199e367832e98d1d8fe/greenlet-3.2.4-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c60a6d84229b271d44b70fb6e5fa23781abb5d742af7b808ae3f6efd7c9c60f6", size = 651839, upload-time = "2025-08-07T13:18:30.281Z" }, { url = "https://files.pythonhosted.org/packages/19/0d/6660d55f7373b2ff8152401a83e02084956da23ae58cddbfb0b330978fe9/greenlet-3.2.4-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3b3812d8d0c9579967815af437d96623f45c0f2ae5f04e366de62a12d83a8fb0", size = 607586, upload-time = "2025-08-07T13:18:28.544Z" }, { url = "https://files.pythonhosted.org/packages/8e/1a/c953fdedd22d81ee4629afbb38d2f9d71e37d23caace44775a3a969147d4/greenlet-3.2.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:abbf57b5a870d30c4675928c37278493044d7c14378350b3aa5d484fa65575f0", size = 1123281, upload-time = "2025-08-07T13:42:39.858Z" }, @@ -1870,7 +1867,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/49/e8/58c7f85958bda41dafea50497cbd59738c5c43dbbea5ee83d651234398f4/greenlet-3.2.4-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:1a921e542453fe531144e91e1feedf12e07351b1cf6c9e8a3325ea600a715a31", size = 272814, upload-time = "2025-08-07T13:15:50.011Z" }, { url = "https://files.pythonhosted.org/packages/62/dd/b9f59862e9e257a16e4e610480cfffd29e3fae018a68c2332090b53aac3d/greenlet-3.2.4-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cd3c8e693bff0fff6ba55f140bf390fa92c994083f838fece0f63be121334945", size = 641073, upload-time = "2025-08-07T13:42:57.23Z" }, { url = "https://files.pythonhosted.org/packages/f7/0b/bc13f787394920b23073ca3b6c4a7a21396301ed75a655bcb47196b50e6e/greenlet-3.2.4-cp313-cp313-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:710638eb93b1fa52823aa91bf75326f9ecdfd5e0466f00789246a5280f4ba0fc", size = 655191, upload-time = "2025-08-07T13:45:29.752Z" }, - { url = "https://files.pythonhosted.org/packages/f2/d6/6adde57d1345a8d0f14d31e4ab9c23cfe8e2cd39c3baf7674b4b0338d266/greenlet-3.2.4-cp313-cp313-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:c5111ccdc9c88f423426df3fd1811bfc40ed66264d35aa373420a34377efc98a", size = 649516, upload-time = "2025-08-07T13:53:16.314Z" }, { url = "https://files.pythonhosted.org/packages/7f/3b/3a3328a788d4a473889a2d403199932be55b1b0060f4ddd96ee7cdfcad10/greenlet-3.2.4-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:d76383238584e9711e20ebe14db6c88ddcedc1829a9ad31a584389463b5aa504", size = 652169, upload-time = "2025-08-07T13:18:32.861Z" }, { url = "https://files.pythonhosted.org/packages/ee/43/3cecdc0349359e1a527cbf2e3e28e5f8f06d3343aaf82ca13437a9aa290f/greenlet-3.2.4-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:23768528f2911bcd7e475210822ffb5254ed10d71f4028387e5a99b4c6699671", size = 610497, upload-time = "2025-08-07T13:18:31.636Z" }, { url = "https://files.pythonhosted.org/packages/b8/19/06b6cf5d604e2c382a6f31cafafd6f33d5dea706f4db7bdab184bad2b21d/greenlet-3.2.4-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:00fadb3fedccc447f517ee0d3fd8fe49eae949e1cd0f6a611818f4f6fb7dc83b", size = 1121662, upload-time = "2025-08-07T13:42:41.117Z" }, @@ -1881,7 +1877,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/22/5c/85273fd7cc388285632b0498dbbab97596e04b154933dfe0f3e68156c68c/greenlet-3.2.4-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:49a30d5fda2507ae77be16479bdb62a660fa51b1eb4928b524975b3bde77b3c0", size = 273586, upload-time = "2025-08-07T13:16:08.004Z" }, { url = "https://files.pythonhosted.org/packages/d1/75/10aeeaa3da9332c2e761e4c50d4c3556c21113ee3f0afa2cf5769946f7a3/greenlet-3.2.4-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:299fd615cd8fc86267b47597123e3f43ad79c9d8a22bebdce535e53550763e2f", size = 686346, upload-time = "2025-08-07T13:42:59.944Z" }, { url = "https://files.pythonhosted.org/packages/c0/aa/687d6b12ffb505a4447567d1f3abea23bd20e73a5bed63871178e0831b7a/greenlet-3.2.4-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:c17b6b34111ea72fc5a4e4beec9711d2226285f0386ea83477cbb97c30a3f3a5", size = 699218, upload-time = "2025-08-07T13:45:30.969Z" }, - { url = "https://files.pythonhosted.org/packages/dc/8b/29aae55436521f1d6f8ff4e12fb676f3400de7fcf27fccd1d4d17fd8fecd/greenlet-3.2.4-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:b4a1870c51720687af7fa3e7cda6d08d801dae660f75a76f3845b642b4da6ee1", size = 694659, upload-time = "2025-08-07T13:53:17.759Z" }, { url = "https://files.pythonhosted.org/packages/92/2e/ea25914b1ebfde93b6fc4ff46d6864564fba59024e928bdc7de475affc25/greenlet-3.2.4-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:061dc4cf2c34852b052a8620d40f36324554bc192be474b9e9770e8c042fd735", size = 695355, upload-time = "2025-08-07T13:18:34.517Z" }, { url = "https://files.pythonhosted.org/packages/72/60/fc56c62046ec17f6b0d3060564562c64c862948c9d4bc8aa807cf5bd74f4/greenlet-3.2.4-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:44358b9bf66c8576a9f57a590d5f5d6e72fa4228b763d0e43fee6d3b06d3a337", size = 657512, upload-time = "2025-08-07T13:18:33.969Z" }, { url = "https://files.pythonhosted.org/packages/23/6e/74407aed965a4ab6ddd93a7ded3180b730d281c77b765788419484cdfeef/greenlet-3.2.4-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:2917bdf657f5859fbf3386b12d68ede4cf1f04c90c3a6bc1f013dd68a22e2269", size = 1612508, upload-time = "2025-11-04T12:42:23.427Z" }, @@ -4414,6 +4409,7 @@ hf = [ { name = "huggingface-hub" }, ] hive = [ + { name = "pure-sasl" }, { name = "thrift" }, ] hive-kerberos = [ @@ -4520,6 +4516,7 @@ requires-dist = [ { name = "pandas", marker = "extra == 'ray'", specifier = ">=1.0.0" }, { name = "polars", marker = "extra == 'polars'", specifier = ">=1.21.0,<2" }, { name = "psycopg2-binary", marker = "extra == 'sql-postgres'", specifier = ">=2.9.6" }, + { name = "pure-sasl", marker = "extra == 'hive'", specifier = ">=0.6.0,<1.0.0" }, { name = "pyarrow", marker = "extra == 'duckdb'", specifier = ">=17.0.0" }, { name = "pyarrow", marker = "extra == 'pandas'", specifier = ">=17.0.0" }, { name = "pyarrow", marker = "extra == 'pyarrow'", specifier = ">=17.0.0" },