diff --git a/CHANGELOG.md b/CHANGELOG.md index f8dfa2e..053dee5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## 0.66.0 - 2025-11-18 + +#### Enhancements +- Added a property `Live.session_id` which returns the streaming session ID when the client is connected +- Streams added with `Live.add_stream()` which do not define an exception handler will now emit a warning if an exception is raised while executing the callback +- Callback functions added with `Live.add_callback()` which do not define an exception handler will now emit a warning if an exception is raised while executing the callback +- Upgraded `databento-dbn` to 0.44.0 + - Added logic to set `code` when upgrading version 1 `SystemMsg` to newer versions + +#### Bug fixes +- Streams opened by `Live.add_stream()` will now close properly when the streaming session is closed + ## 0.65.0 - 2025-11-11 #### Deprecations diff --git a/README.md b/README.md index 643e4dc..f316c4f 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ The library is fully compatible with distributions of Anaconda 2023.x and above. The minimum dependencies as found in the `pyproject.toml` are also listed below: - python = "^3.10" - aiohttp = "^3.8.3" -- databento-dbn = "~0.43.0" +- databento-dbn = "~0.44.0" - numpy = ">=1.23.5" - pandas = ">=1.5.3" - pip-system-certs = ">=4.0" (Windows only) diff --git a/databento/common/types.py b/databento/common/types.py index 3dfca45..0241661 100644 --- a/databento/common/types.py +++ b/databento/common/types.py @@ -1,12 +1,20 @@ import datetime as dt +import logging from collections.abc import Callable +from os import PathLike +import pathlib from typing import Generic +from typing import IO from typing import TypedDict from typing import TypeVar +import warnings import databento_dbn import pandas as pd +from databento.common.error import BentoWarning + +logger = logging.getLogger(__name__) DBNRecord = ( databento_dbn.BBOMsg @@ -32,10 +40,6 @@ | databento_dbn.ErrorMsgV1 ) -RecordCallback = Callable[[DBNRecord], None] -ExceptionCallback = Callable[[Exception], None] -ReconnectCallback = Callable[[pd.Timestamp, pd.Timestamp], None] - _T = TypeVar("_T") @@ -88,3 +92,196 @@ class MappingIntervalDict(TypedDict): start_date: dt.date end_date: dt.date symbol: str + + +RecordCallback = Callable[[DBNRecord], None] +ExceptionCallback = Callable[[Exception], None] +ReconnectCallback = Callable[[pd.Timestamp, pd.Timestamp], None] + + +class ClientStream: + def __init__( + self, + stream: IO[bytes] | PathLike[str] | str, + exc_fn: ExceptionCallback | None = None, + max_warnings: int = 10, + ) -> None: + is_managed = False + + if isinstance(stream, (str, PathLike)): + stream = pathlib.Path(stream).open("xb") + is_managed = True + + if not hasattr(stream, "write"): + raise ValueError(f"{type(stream).__name__} does not support write()") + + if not hasattr(stream, "writable") or not stream.writable(): + raise ValueError(f"{type(stream).__name__} is not a writable stream") + + if exc_fn is not None and not callable(exc_fn): + raise ValueError(f"{exc_fn} is not callable") + + self._stream = stream + self._exc_fn = exc_fn + self._max_warnings = max(0, max_warnings) + self._warning_count = 0 + self._is_managed = is_managed + + @property + def stream_name(self) -> str: + return getattr(self._stream, "__name__", str(self._stream)) + + @property + def is_closed(self) -> bool: + """ + Return `True` if the underlying stream is closed. + + Returns + ------- + bool + + """ + return self._stream.closed + + @property + def is_managed(self) -> bool: + """ + Return `True` if the underlying stream was opened by the + `ClientStream`. This can be used to determine if the stream should be + closed automatically. + + Returns + ------- + bool + + """ + return self._is_managed + + @property + def exc_callback_name(self) -> str: + return getattr(self._exc_fn, "__name__", str(self._exc_fn)) + + def close(self) -> None: + """ + Close the underlying stream. + """ + self._stream.close() + + def flush(self) -> None: + """ + Flush the underlying stream. + """ + self._stream.flush() + + def write(self, data: bytes) -> None: + """ + Write data to the underlying stream. Any exceptions encountered will be + dispatched to the exception callback, if defined. + + Parameters + ---------- + data : bytes + + """ + try: + self._stream.write(data) + except Exception as exc: + if self._exc_fn is None: + self._warn( + f"stream '{self.stream_name}' encountered an exception without an exception handler: {repr(exc)}", + ) + else: + try: + self._exc_fn(exc) + except Exception as inner_exc: + self._warn( + f"exception callback '{self.exc_callback_name}' encountered an exception: {repr(inner_exc)}", + ) + raise inner_exc from exc + raise exc + + def _warn(self, msg: str) -> None: + logger.warning(msg) + if self._warning_count < self._max_warnings: + self._warning_count += 1 + warnings.warn( + msg, + BentoWarning, + stacklevel=3, + ) + if self._warning_count == self._max_warnings: + warnings.warn( + f"suppressing further warnings for '{self.stream_name}'", + BentoWarning, + stacklevel=3, + ) + + +class ClientRecordCallback: + def __init__( + self, + fn: RecordCallback, + exc_fn: ExceptionCallback | None = None, + max_warnings: int = 10, + ) -> None: + if not callable(fn): + raise ValueError(f"{fn} is not callable") + if exc_fn is not None and not callable(exc_fn): + raise ValueError(f"{exc_fn} is not callable") + + self._fn = fn + self._exc_fn = exc_fn + self._max_warnings = max(0, max_warnings) + self._warning_count = 0 + + @property + def callback_name(self) -> str: + return getattr(self._fn, "__name__", str(self._fn)) + + @property + def exc_callback_name(self) -> str: + return getattr(self._exc_fn, "__name__", str(self._exc_fn)) + + def call(self, record: DBNRecord) -> None: + """ + Execute the callback function, passing `record` in as the first + argument. Any exceptions encountered will be dispatched to the + exception callback, if defined. + + Parameters + ---------- + record : DBNRecord + + """ + try: + self._fn(record) + except Exception as exc: + if self._exc_fn is None: + self._warn( + f"callback '{self.callback_name}' encountered an exception without an exception callback: {repr(exc)}", + ) + else: + try: + self._exc_fn(exc) + except Exception as inner_exc: + self._warn( + f"exception callback '{self.exc_callback_name}' encountered an exception: {repr(inner_exc)}", + ) + raise inner_exc from exc + raise exc + + def _warn(self, msg: str) -> None: + logger.warning(msg) + if self._warning_count < self._max_warnings: + self._warning_count += 1 + warnings.warn( + msg, + BentoWarning, + stacklevel=3, + ) + if self._warning_count == self._max_warnings: + warnings.warn( + f"suppressing further warnings for '{self.callback_name}'", + BentoWarning, + stacklevel=3, + ) diff --git a/databento/live/client.py b/databento/live/client.py index fae8c3c..8513fa2 100644 --- a/databento/live/client.py +++ b/databento/live/client.py @@ -3,7 +3,6 @@ import asyncio import logging import os -import pathlib import queue import threading from collections.abc import Iterable @@ -24,6 +23,8 @@ from databento.common.error import BentoError from databento.common.parsing import optional_datetime_to_unix_nanoseconds from databento.common.publishers import Dataset +from databento.common.types import ClientRecordCallback +from databento.common.types import ClientStream from databento.common.types import DBNRecord from databento.common.types import ExceptionCallback from databento.common.types import ReconnectCallback @@ -111,7 +112,7 @@ def __init__( reconnect_policy=reconnect_policy, ) - self._session._user_callbacks.append((self._map_symbol, None)) + self._session._user_callbacks.append(ClientRecordCallback(self._map_symbol)) with Live._lock: if not Live._thread.is_alive(): @@ -214,6 +215,19 @@ def port(self) -> int: """ return self._port + @property + def session_id(self) -> str | None: + """ + Return the session ID for the current session. If `None`, the client is + not connected. + + Returns + ------- + str | None + + """ + return self._session.session_id + @property def symbology_map(self) -> dict[int, str | int]: """ @@ -257,7 +271,9 @@ def add_callback( A callback to register for handling live records as they arrive. exception_callback : Callable[[Exception], None], optional An error handling callback to process exceptions that are raised - in `record_callback`. + in `record_callback`. If no exception callback is provided, + any exceptions encountered will be logged and raised as warnings + for visibility. Raises ------ @@ -270,15 +286,13 @@ def add_callback( Live.add_stream """ - if not callable(record_callback): - raise ValueError(f"{record_callback} is not callable") - - if exception_callback is not None and not callable(exception_callback): - raise ValueError(f"{exception_callback} is not callable") + client_callback = ClientRecordCallback( + fn=record_callback, + exc_fn=exception_callback, + ) - callback_name = getattr(record_callback, "__name__", str(record_callback)) - logger.info("adding user callback %s", callback_name) - self._session._user_callbacks.append((record_callback, exception_callback)) + logger.info("adding user callback %s", client_callback.callback_name) + self._session._user_callbacks.append(client_callback) def add_stream( self, @@ -294,7 +308,9 @@ def add_stream( The IO stream to write to when handling live records as they arrive. exception_callback : Callable[[Exception], None], optional An error handling callback to process exceptions that are raised - when writing to the stream. + when writing to the stream. If no exception callback is provided, + any exceptions encountered will be logged and raised as warnings + for visibility. Raises ------ @@ -309,23 +325,12 @@ def add_stream( Live.add_callback """ - if isinstance(stream, (str, PathLike)): - stream = pathlib.Path(stream).open("xb") - - if not hasattr(stream, "write"): - raise ValueError(f"{type(stream).__name__} does not support write()") - - if not hasattr(stream, "writable") or not stream.writable(): - raise ValueError(f"{type(stream).__name__} is not a writable stream") + client_stream = ClientStream(stream=stream, exc_fn=exception_callback) - if exception_callback is not None and not callable(exception_callback): - raise ValueError(f"{exception_callback} is not callable") - - stream_name = getattr(stream, "name", str(stream)) - logger.info("adding user stream %s", stream_name) + logger.info("adding user stream %s", client_stream.stream_name) if self.metadata is not None: - stream.write(bytes(self.metadata)) - self._session._user_streams.append((stream, exception_callback)) + client_stream.write(self.metadata.encode()) + self._session._user_streams.append(client_stream) def add_reconnect_callback( self, @@ -365,7 +370,9 @@ def add_reconnect_callback( callback_name = getattr(reconnect_callback, "__name__", str(reconnect_callback)) logger.info("adding user reconnect callback %s", callback_name) - self._session._user_reconnect_callbacks.append((reconnect_callback, exception_callback)) + self._session._user_reconnect_callbacks.append( + (reconnect_callback, exception_callback), + ) def start( self, diff --git a/databento/live/session.py b/databento/live/session.py index 06852e2..454c371 100644 --- a/databento/live/session.py +++ b/databento/live/session.py @@ -8,7 +8,6 @@ import threading from collections.abc import Iterable from functools import partial -from typing import IO from typing import Final import databento_dbn @@ -20,10 +19,11 @@ from databento.common.enums import ReconnectPolicy from databento.common.error import BentoError from databento.common.publishers import Dataset +from databento.common.types import ClientRecordCallback +from databento.common.types import ClientStream from databento.common.types import DBNRecord from databento.common.types import ExceptionCallback from databento.common.types import ReconnectCallback -from databento.common.types import RecordCallback from databento.live.gateway import SubscriptionRequest from databento.live.protocol import DatabentoLiveProtocol @@ -72,7 +72,12 @@ def disable(self) -> None: """ self._enabled.clear() - def put(self, item: DBNRecord, block: bool = True, timeout: float | None = None) -> None: + def put( + self, + item: DBNRecord, + block: bool = True, + timeout: float | None = None, + ) -> None: """ Put an item on the queue if the queue is enabled. @@ -143,6 +148,12 @@ class SessionMetadata: def __bool__(self) -> bool: return self.data is not None + @property + def has_ts_out(self) -> bool: + if self.data is None: + return False + return self.data.ts_out + def check(self, other: databento_dbn.Metadata) -> None: """ Verify the Metadata is compatible with another Metadata message. This @@ -185,8 +196,8 @@ def __init__( api_key: str, dataset: Dataset | str, dbn_queue: DBNQueue, - user_callbacks: list[tuple[RecordCallback, ExceptionCallback | None]], - user_streams: list[tuple[IO[bytes], ExceptionCallback | None]], + user_streams: list[ClientStream], + user_callbacks: list[ClientRecordCallback], loop: asyncio.AbstractEventLoop, metadata: SessionMetadata, ts_out: bool = False, @@ -205,21 +216,15 @@ def received_metadata(self, metadata: databento_dbn.Metadata) -> None: if self._metadata: self._metadata.check(metadata) else: - metadata_bytes = metadata.encode() - for stream, exc_callback in self._user_streams: + for stream in self._user_streams: try: - stream.write(metadata_bytes) + stream.write(metadata.encode()) except Exception as exc: - stream_name = getattr(stream, "name", str(stream)) logger.error( - "error writing %d bytes to `%s` stream", - len(metadata_bytes), - stream_name, + "error writing metadata to `%s` stream", + stream.stream_name, exc_info=exc, ) - if exc_callback is not None: - exc_callback(exc) - self._metadata.data = metadata return super().received_metadata(metadata) @@ -233,40 +238,32 @@ def received_record(self, record: DBNRecord) -> None: return super().received_record(record) def _dispatch_callbacks(self, record: DBNRecord) -> None: - for callback, exc_callback in self._user_callbacks: + for callback in self._user_callbacks: try: - callback(record) + callback.call(record) except Exception as exc: logger.error( "error dispatching %s to `%s` callback", type(record).__name__, - getattr(callback, "__name__", str(callback)), + callback.callback_name, exc_info=exc, ) - if exc_callback is not None: - exc_callback(exc) def _dispatch_writes(self, record: DBNRecord) -> None: - if hasattr(record, "ts_out"): - ts_out_bytes = struct.pack("Q", record.ts_out) - else: - ts_out_bytes = b"" - - record_bytes = bytes(record) + ts_out_bytes - - for stream, exc_callback in self._user_streams: + record_bytes = bytes(record) + ts_out_bytes = struct.pack("Q", record.ts_out) if self._metadata.has_ts_out else b"" + for stream in self._user_streams: try: stream.write(record_bytes) + stream.write(ts_out_bytes) except Exception as exc: - stream_name = getattr(stream, "name", str(stream)) logger.error( - "error writing %d bytes to `%s` stream", - len(record_bytes), - stream_name, + "error writing %s record (%d bytes) to `%s` stream", + type(record).__name__, + len(record_bytes) + len(ts_out_bytes), + stream.stream_name, exc_info=exc, ) - if exc_callback is not None: - exc_callback(exc) def _queue_for_iteration(self, record: DBNRecord) -> None: self._dbn_queue.put(record) @@ -317,8 +314,8 @@ def __init__( self._loop = loop self._metadata = SessionMetadata() self._user_gateway: str | None = user_gateway - self._user_callbacks: list[tuple[RecordCallback, ExceptionCallback | None]] = [] - self._user_streams: list[tuple[IO[bytes], ExceptionCallback | None]] = [] + self._user_streams: list[ClientStream] = [] + self._user_callbacks: list[ClientRecordCallback] = [] self._user_reconnect_callbacks: list[tuple[ReconnectCallback, ExceptionCallback | None]] = ( [] ) @@ -498,6 +495,7 @@ def subscribe( with self._lock: if self._protocol is None: + self._session_id = None self._connect(dataset=dataset) self._subscription_counter += 1 @@ -528,27 +526,29 @@ async def wait_for_close(self) -> None: return try: - await self._protocol.authenticated - except Exception as exc: - raise BentoError(exc) from None - - try: - if self._reconnect_task is not None: - await self._reconnect_task - else: - await self._protocol.disconnected - except Exception as exc: - raise BentoError(exc) from None + try: + await self._protocol.authenticated + except Exception as exc: + raise BentoError(exc) from None - self._cleanup() + try: + if self._reconnect_task is not None: + await self._reconnect_task + else: + await self._protocol.disconnected + except Exception as exc: + raise BentoError(exc) from None + finally: + self._cleanup() def _cleanup(self) -> None: logger.debug("cleaning up session_id=%s", self.session_id) self._user_callbacks.clear() - for item in self._user_streams: - stream, _ = item - if not stream.closed: + for stream in self._user_streams: + if not stream.is_closed: stream.flush() + if stream.is_managed: + stream.close() self._user_callbacks.clear() self._user_streams.clear() @@ -654,7 +654,10 @@ async def _reconnect(self) -> None: should_restart = self.is_streaming() if self._protocol._last_ts_event is not None: - gap_start = pd.Timestamp(self._protocol._last_ts_event, tz="UTC") + gap_start = pd.Timestamp( + self._protocol._last_ts_event, + tz="UTC", + ) elif self._metadata.data is not None: gap_start = pd.Timestamp(self._metadata.data.start, tz="UTC") else: diff --git a/databento/version.py b/databento/version.py index e082e13..8833468 100644 --- a/databento/version.py +++ b/databento/version.py @@ -1 +1 @@ -__version__ = "0.65.0" +__version__ = "0.66.0" diff --git a/pyproject.toml b/pyproject.toml index bf1388a..72cadfd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,23 +1,16 @@ [project] name = "databento" -version = "0.65.0" +version = "0.66.0" description = "Official Python client library for Databento" readme = "README.md" requires-python = ">=3.10" license = "Apache-2.0" authors = [{ name = "Databento", email = "support@databento.com" }] -classifiers = [ - "Development Status :: 4 - Beta", - "Operating System :: OS Independent", - "Topic :: Software Development :: Libraries", - "Topic :: Software Development :: Libraries :: Python Modules", - "Topic :: Office/Business :: Financial", - "Topic :: Office/Business :: Financial :: Investment", -] +dynamic = [ "classifiers" ] dependencies = [ "aiohttp>=3.8.3,<4.0.0; python_version < '3.12'", "aiohttp>=3.9.0,<4.0.0; python_version >= '3.12'", - "databento-dbn~=0.43.0", + "databento-dbn~=0.44.0", "numpy>=1.23.5; python_version < '3.12'", "numpy>=1.26.0; python_version >= '3.12'", "pandas>=1.5.3", @@ -39,6 +32,14 @@ packages = [ { include = "databento" }, { include = "databento/py.typed" }, ] +classifiers = [ + "Development Status :: 4 - Beta", + "Operating System :: OS Independent", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: Office/Business :: Financial", + "Topic :: Office/Business :: Financial :: Investment", +] [tool.poetry.dependencies] python = ">=3.10,<3.14" diff --git a/tests/test_live_client.py b/tests/test_live_client.py index c8625e9..db500e0 100644 --- a/tests/test_live_client.py +++ b/tests/test_live_client.py @@ -254,6 +254,7 @@ async def test_live_connect_auth( assert message.auth.endswith(live_client.key[-BUCKET_ID_LENGTH:]) assert message.dataset == live_client.dataset assert message.encoding == Encoding.DBN + assert live_client.session_id is not None async def test_live_client_reuse( @@ -273,8 +274,10 @@ async def test_live_client_reuse( message_type=gateway.AuthenticationRequest, ) + first_session_id = live_client.session_id live_client.start() live_client.stop() + assert live_client.session_id == first_session_id await asyncio.sleep(1) @@ -287,10 +290,14 @@ async def test_live_client_reuse( message_type=gateway.AuthenticationRequest, ) + second_session_id = live_client.session_id live_client.start() live_client.stop() await live_client.wait_for_close() + assert live_client.session_id == second_session_id + assert first_session_id != second_session_id + async def test_live_connect_auth_with_heartbeat_interval( mock_live_server: MockLiveServerInterface, @@ -976,15 +983,15 @@ def test_live_add_callback( """ # Arrange - def callback(_: object) -> None: + def test_callback(_: object) -> None: pass # Act - live_client.add_callback(callback) + live_client.add_callback(test_callback) # Assert assert len(live_client._session._user_callbacks) == 2 # include map_symbols callback - assert (callback, None) in live_client._session._user_callbacks + assert live_client._session._user_callbacks[-1].callback_name == "test_callback" def test_live_add_stream( @@ -1001,7 +1008,7 @@ def test_live_add_stream( # Assert assert len(live_client._session._user_streams) == 1 - assert (stream, None) in live_client._session._user_streams + assert stream == live_client._session._user_streams[0]._stream def test_live_add_stream_invalid(