From db0b76ff2c8d5f7fb8ddca3eb91a32216966e038 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Wed, 12 Nov 2025 13:26:57 -0800 Subject: [PATCH 1/6] FIX: Fix missing Python version classifiers --- pyproject.toml | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bf1388a..09dade3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,14 +6,7 @@ readme = "README.md" requires-python = ">=3.10" license = "Apache-2.0" authors = [{ name = "Databento", email = "support@databento.com" }] -classifiers = [ - "Development Status :: 4 - Beta", - "Operating System :: OS Independent", - "Topic :: Software Development :: Libraries", - "Topic :: Software Development :: Libraries :: Python Modules", - "Topic :: Office/Business :: Financial", - "Topic :: Office/Business :: Financial :: Investment", -] +dynamic = [ "classifiers" ] dependencies = [ "aiohttp>=3.8.3,<4.0.0; python_version < '3.12'", "aiohttp>=3.9.0,<4.0.0; python_version >= '3.12'", @@ -39,6 +32,14 @@ packages = [ { include = "databento" }, { include = "databento/py.typed" }, ] +classifiers = [ + "Development Status :: 4 - Beta", + "Operating System :: OS Independent", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: Office/Business :: Financial", + "Topic :: Office/Business :: Financial :: Investment", +] [tool.poetry.dependencies] python = ">=3.10,<3.14" From 0e724c27789bcca82910fd49c125fdc28d6948a9 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Thu, 13 Nov 2025 09:54:39 -0800 Subject: [PATCH 2/6] ADD: Add session ID property to the Live client --- CHANGELOG.md | 5 +++++ databento/live/client.py | 17 ++++++++++++++++- databento/live/session.py | 13 +++++++++++-- tests/test_live_client.py | 7 +++++++ 4 files changed, 39 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8dfa2e..26af5c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.66.0 - TBD + +#### Enhancements +- Added a property `Live.session_id` which returns the streaming session ID when the client is connected + ## 0.65.0 - 2025-11-11 #### Deprecations diff --git a/databento/live/client.py b/databento/live/client.py index fae8c3c..6cdd619 100644 --- a/databento/live/client.py +++ b/databento/live/client.py @@ -214,6 +214,19 @@ def port(self) -> int: """ return self._port + @property + def session_id(self) -> str | None: + """ + Return the session ID for the current session. If `None`, the client is + not connected. + + Returns + ------- + str | None + + """ + return self._session.session_id + @property def symbology_map(self) -> dict[int, str | int]: """ @@ -365,7 +378,9 @@ def add_reconnect_callback( callback_name = getattr(reconnect_callback, "__name__", str(reconnect_callback)) logger.info("adding user reconnect callback %s", callback_name) - self._session._user_reconnect_callbacks.append((reconnect_callback, exception_callback)) + self._session._user_reconnect_callbacks.append( + (reconnect_callback, exception_callback), + ) def start( self, diff --git a/databento/live/session.py b/databento/live/session.py index 06852e2..424cba9 100644 --- a/databento/live/session.py +++ b/databento/live/session.py @@ -72,7 +72,12 @@ def disable(self) -> None: """ self._enabled.clear() - def put(self, item: DBNRecord, block: bool = True, timeout: float | None = None) -> None: + def put( + self, + item: DBNRecord, + block: bool = True, + timeout: float | None = None, + ) -> None: """ Put an item on the queue if the queue is enabled. @@ -498,6 +503,7 @@ def subscribe( with self._lock: if self._protocol is None: + self._session_id = None self._connect(dataset=dataset) self._subscription_counter += 1 @@ -654,7 +660,10 @@ async def _reconnect(self) -> None: should_restart = self.is_streaming() if self._protocol._last_ts_event is not None: - gap_start = pd.Timestamp(self._protocol._last_ts_event, tz="UTC") + gap_start = pd.Timestamp( + self._protocol._last_ts_event, + tz="UTC", + ) elif self._metadata.data is not None: gap_start = pd.Timestamp(self._metadata.data.start, tz="UTC") else: diff --git a/tests/test_live_client.py b/tests/test_live_client.py index c8625e9..c098ba8 100644 --- a/tests/test_live_client.py +++ b/tests/test_live_client.py @@ -254,6 +254,7 @@ async def test_live_connect_auth( assert message.auth.endswith(live_client.key[-BUCKET_ID_LENGTH:]) assert message.dataset == live_client.dataset assert message.encoding == Encoding.DBN + assert live_client.session_id is not None async def test_live_client_reuse( @@ -273,8 +274,10 @@ async def test_live_client_reuse( message_type=gateway.AuthenticationRequest, ) + first_session_id = live_client.session_id live_client.start() live_client.stop() + assert live_client.session_id == first_session_id await asyncio.sleep(1) @@ -287,10 +290,14 @@ async def test_live_client_reuse( message_type=gateway.AuthenticationRequest, ) + second_session_id = live_client.session_id live_client.start() live_client.stop() await live_client.wait_for_close() + assert live_client.session_id == second_session_id + assert first_session_id != second_session_id + async def test_live_connect_auth_with_heartbeat_interval( mock_live_server: MockLiveServerInterface, From f050f6bc8cb2370cf7f2770c0296867e6728c29a Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Tue, 18 Nov 2025 10:51:23 -0800 Subject: [PATCH 3/6] MOD: Upgrade databento-dbn to 0.44.0 --- CHANGELOG.md | 2 ++ README.md | 2 +- pyproject.toml | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 26af5c9..b0418ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ #### Enhancements - Added a property `Live.session_id` which returns the streaming session ID when the client is connected +- Upgraded `databento-dbn` to 0.44.0 + - Added logic to set `code` when upgrading version 1 `SystemMsg` to newer versions ## 0.65.0 - 2025-11-11 diff --git a/README.md b/README.md index 643e4dc..f316c4f 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ The library is fully compatible with distributions of Anaconda 2023.x and above. The minimum dependencies as found in the `pyproject.toml` are also listed below: - python = "^3.10" - aiohttp = "^3.8.3" -- databento-dbn = "~0.43.0" +- databento-dbn = "~0.44.0" - numpy = ">=1.23.5" - pandas = ">=1.5.3" - pip-system-certs = ">=4.0" (Windows only) diff --git a/pyproject.toml b/pyproject.toml index 09dade3..b9e5d59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ dynamic = [ "classifiers" ] dependencies = [ "aiohttp>=3.8.3,<4.0.0; python_version < '3.12'", "aiohttp>=3.9.0,<4.0.0; python_version >= '3.12'", - "databento-dbn~=0.43.0", + "databento-dbn~=0.44.0", "numpy>=1.23.5; python_version < '3.12'", "numpy>=1.26.0; python_version >= '3.12'", "pandas>=1.5.3", From 5ab11459c273d35364a5ef7671bcf7119b3a9ee7 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Mon, 17 Nov 2025 10:18:55 -0800 Subject: [PATCH 4/6] ADD: Live client stream exception warning --- CHANGELOG.md | 4 ++ databento/common/types.py | 127 +++++++++++++++++++++++++++++++++++++- databento/live/client.py | 26 +++----- databento/live/session.py | 56 ++++++++--------- tests/test_live_client.py | 2 +- 5 files changed, 164 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b0418ff..f918df4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,13 @@ #### Enhancements - Added a property `Live.session_id` which returns the streaming session ID when the client is connected +- Streams added with `Live.add_stream()` which do not define an exception handler will now emit a warning if an exception is raised while executing the callback - Upgraded `databento-dbn` to 0.44.0 - Added logic to set `code` when upgrading version 1 `SystemMsg` to newer versions +#### Bug fixes +- Streams opened by `Live.add_stream()` will now close properly when the streaming session is closed + ## 0.65.0 - 2025-11-11 #### Deprecations diff --git a/databento/common/types.py b/databento/common/types.py index 3dfca45..23298e3 100644 --- a/databento/common/types.py +++ b/databento/common/types.py @@ -1,12 +1,19 @@ import datetime as dt +import logging +import warnings from collections.abc import Callable -from typing import Generic +from os import PathLike +import pathlib +from typing import IO, Generic from typing import TypedDict from typing import TypeVar import databento_dbn import pandas as pd +from databento.common.error import BentoWarning + +logger = logging.getLogger(__name__) DBNRecord = ( databento_dbn.BBOMsg @@ -88,3 +95,121 @@ class MappingIntervalDict(TypedDict): start_date: dt.date end_date: dt.date symbol: str + + +class ClientStream: + def __init__( + self, + stream: IO[bytes] | PathLike[str] | str, + exc_fn: ExceptionCallback | None = None, + max_warnings: int = 10, + ) -> None: + is_managed = False + + if isinstance(stream, (str, PathLike)): + stream = pathlib.Path(stream).open("xb") + is_managed = True + + if not hasattr(stream, "write"): + raise ValueError(f"{type(stream).__name__} does not support write()") + + if not hasattr(stream, "writable") or not stream.writable(): + raise ValueError(f"{type(stream).__name__} is not a writable stream") + + if exc_fn is not None and not callable(exc_fn): + raise ValueError(f"{exc_fn} is not callable") + + self._stream = stream + self._exc_fn = exc_fn + self._max_warnings = max(0, max_warnings) + self._warning_count = 0 + self._is_managed = is_managed + + @property + def stream_name(self) -> str: + return getattr(self._stream, "__name__", str(self._stream)) + + @property + def is_closed(self) -> bool: + """ + Return `True` if the underlying stream is closed. + + Returns + ------- + bool + + """ + return self._stream.closed + + @property + def is_managed(self) -> bool: + """ + Return `True` if the underlying stream was opened by the + `ClientStream`. This can be used to determine if the stream should be + closed automatically. + + Returns + ------- + bool + + """ + return self._is_managed + + @property + def exc_callback_name(self) -> str: + return getattr(self._exc_fn, "__name__", str(self._exc_fn)) + + def close(self) -> None: + """ + Close the underlying stream. + """ + self._stream.close() + + def flush(self) -> None: + """ + Flush the underlying stream. + """ + self._stream.flush() + + def write(self, data: bytes) -> None: + """ + Write data to the underlying stream. Any exceptions encountered will be + dispatched to the exception callback, if defined. + + Parameters + ---------- + data : bytes + + """ + try: + self._stream.write(data) + except Exception as exc: + if self._exc_fn is None: + self._warn( + f"stream '{self.stream_name}' encountered an exception without an exception handler: {repr(exc)}", + ) + else: + try: + self._exc_fn(exc) + except Exception as inner_exc: + self._warn( + f"exception callback '{self.exc_callback_name}' encountered an exception: {repr(inner_exc)}", + ) + raise inner_exc from exc + raise exc + + def _warn(self, msg: str) -> None: + logger.warning(msg) + if self._warning_count < self._max_warnings: + self._warning_count += 1 + warnings.warn( + msg, + BentoWarning, + stacklevel=3, + ) + if self._warning_count == self._max_warnings: + warnings.warn( + f"suppressing further warnings for '{self.stream_name}'", + BentoWarning, + stacklevel=3, + ) diff --git a/databento/live/client.py b/databento/live/client.py index 6cdd619..a128856 100644 --- a/databento/live/client.py +++ b/databento/live/client.py @@ -3,7 +3,6 @@ import asyncio import logging import os -import pathlib import queue import threading from collections.abc import Iterable @@ -24,7 +23,7 @@ from databento.common.error import BentoError from databento.common.parsing import optional_datetime_to_unix_nanoseconds from databento.common.publishers import Dataset -from databento.common.types import DBNRecord +from databento.common.types import ClientStream, DBNRecord from databento.common.types import ExceptionCallback from databento.common.types import ReconnectCallback from databento.common.types import RecordCallback @@ -307,7 +306,9 @@ def add_stream( The IO stream to write to when handling live records as they arrive. exception_callback : Callable[[Exception], None], optional An error handling callback to process exceptions that are raised - when writing to the stream. + when writing to the stream. If no exception callback is provided, + any exceptions encountered will be logged and raised as warnings + for visibility. Raises ------ @@ -322,23 +323,12 @@ def add_stream( Live.add_callback """ - if isinstance(stream, (str, PathLike)): - stream = pathlib.Path(stream).open("xb") + client_stream = ClientStream(stream=stream, exc_fn=exception_callback) - if not hasattr(stream, "write"): - raise ValueError(f"{type(stream).__name__} does not support write()") - - if not hasattr(stream, "writable") or not stream.writable(): - raise ValueError(f"{type(stream).__name__} is not a writable stream") - - if exception_callback is not None and not callable(exception_callback): - raise ValueError(f"{exception_callback} is not callable") - - stream_name = getattr(stream, "name", str(stream)) - logger.info("adding user stream %s", stream_name) + logger.info("adding user stream %s", client_stream.stream_name) if self.metadata is not None: - stream.write(bytes(self.metadata)) - self._session._user_streams.append((stream, exception_callback)) + client_stream.write(self.metadata.encode()) + self._session._user_streams.append(client_stream) def add_reconnect_callback( self, diff --git a/databento/live/session.py b/databento/live/session.py index 424cba9..dfb73a0 100644 --- a/databento/live/session.py +++ b/databento/live/session.py @@ -8,7 +8,6 @@ import threading from collections.abc import Iterable from functools import partial -from typing import IO from typing import Final import databento_dbn @@ -20,7 +19,7 @@ from databento.common.enums import ReconnectPolicy from databento.common.error import BentoError from databento.common.publishers import Dataset -from databento.common.types import DBNRecord +from databento.common.types import ClientStream, DBNRecord from databento.common.types import ExceptionCallback from databento.common.types import ReconnectCallback from databento.common.types import RecordCallback @@ -148,6 +147,12 @@ class SessionMetadata: def __bool__(self) -> bool: return self.data is not None + @property + def has_ts_out(self) -> bool: + if self.data is None: + return False + return self.data.ts_out + def check(self, other: databento_dbn.Metadata) -> None: """ Verify the Metadata is compatible with another Metadata message. This @@ -191,7 +196,7 @@ def __init__( dataset: Dataset | str, dbn_queue: DBNQueue, user_callbacks: list[tuple[RecordCallback, ExceptionCallback | None]], - user_streams: list[tuple[IO[bytes], ExceptionCallback | None]], + user_streams: list[ClientStream], loop: asyncio.AbstractEventLoop, metadata: SessionMetadata, ts_out: bool = False, @@ -210,21 +215,15 @@ def received_metadata(self, metadata: databento_dbn.Metadata) -> None: if self._metadata: self._metadata.check(metadata) else: - metadata_bytes = metadata.encode() - for stream, exc_callback in self._user_streams: + for stream in self._user_streams: try: - stream.write(metadata_bytes) + stream.write(metadata.encode()) except Exception as exc: - stream_name = getattr(stream, "name", str(stream)) logger.error( - "error writing %d bytes to `%s` stream", - len(metadata_bytes), - stream_name, + "error writing metadata to `%s` stream", + stream.stream_name, exc_info=exc, ) - if exc_callback is not None: - exc_callback(exc) - self._metadata.data = metadata return super().received_metadata(metadata) @@ -252,26 +251,20 @@ def _dispatch_callbacks(self, record: DBNRecord) -> None: exc_callback(exc) def _dispatch_writes(self, record: DBNRecord) -> None: - if hasattr(record, "ts_out"): - ts_out_bytes = struct.pack("Q", record.ts_out) - else: - ts_out_bytes = b"" - - record_bytes = bytes(record) + ts_out_bytes - - for stream, exc_callback in self._user_streams: + record_bytes = bytes(record) + ts_out_bytes = struct.pack("Q", record.ts_out) if self._metadata.has_ts_out else b"" + for stream in self._user_streams: try: stream.write(record_bytes) + stream.write(ts_out_bytes) except Exception as exc: - stream_name = getattr(stream, "name", str(stream)) logger.error( - "error writing %d bytes to `%s` stream", - len(record_bytes), - stream_name, + "error writing %s record (%d bytes) to `%s` stream", + type(record).__name__, + len(record_bytes) + len(ts_out_bytes), + stream.stream_name, exc_info=exc, ) - if exc_callback is not None: - exc_callback(exc) def _queue_for_iteration(self, record: DBNRecord) -> None: self._dbn_queue.put(record) @@ -323,7 +316,7 @@ def __init__( self._metadata = SessionMetadata() self._user_gateway: str | None = user_gateway self._user_callbacks: list[tuple[RecordCallback, ExceptionCallback | None]] = [] - self._user_streams: list[tuple[IO[bytes], ExceptionCallback | None]] = [] + self._user_streams: list[ClientStream] = [] self._user_reconnect_callbacks: list[tuple[ReconnectCallback, ExceptionCallback | None]] = ( [] ) @@ -551,10 +544,11 @@ async def wait_for_close(self) -> None: def _cleanup(self) -> None: logger.debug("cleaning up session_id=%s", self.session_id) self._user_callbacks.clear() - for item in self._user_streams: - stream, _ = item - if not stream.closed: + for stream in self._user_streams: + if not stream.is_closed: stream.flush() + if stream.is_managed: + stream.close() self._user_callbacks.clear() self._user_streams.clear() diff --git a/tests/test_live_client.py b/tests/test_live_client.py index c098ba8..51a2afe 100644 --- a/tests/test_live_client.py +++ b/tests/test_live_client.py @@ -1008,7 +1008,7 @@ def test_live_add_stream( # Assert assert len(live_client._session._user_streams) == 1 - assert (stream, None) in live_client._session._user_streams + assert stream == live_client._session._user_streams[0]._stream def test_live_add_stream_invalid( From 52e8ce07861c50e5cdc59e0fe74cf7f0fe76e3b8 Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Thu, 13 Nov 2025 15:31:20 -0800 Subject: [PATCH 5/6] ADD: Live client callback exception warning --- CHANGELOG.md | 1 + databento/common/types.py | 84 ++++++++++++++++++++++++++++++++++++--- databento/live/client.py | 24 ++++++----- databento/live/session.py | 42 ++++++++++---------- tests/test_live_client.py | 6 +-- 5 files changed, 116 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f918df4..95da9f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ #### Enhancements - Added a property `Live.session_id` which returns the streaming session ID when the client is connected - Streams added with `Live.add_stream()` which do not define an exception handler will now emit a warning if an exception is raised while executing the callback +- Callback functions added with `Live.add_callback()` which do not define an exception handler will now emit a warning if an exception is raised while executing the callback - Upgraded `databento-dbn` to 0.44.0 - Added logic to set `code` when upgrading version 1 `SystemMsg` to newer versions diff --git a/databento/common/types.py b/databento/common/types.py index 23298e3..0241661 100644 --- a/databento/common/types.py +++ b/databento/common/types.py @@ -1,12 +1,13 @@ import datetime as dt import logging -import warnings from collections.abc import Callable from os import PathLike import pathlib -from typing import IO, Generic +from typing import Generic +from typing import IO from typing import TypedDict from typing import TypeVar +import warnings import databento_dbn import pandas as pd @@ -39,10 +40,6 @@ | databento_dbn.ErrorMsgV1 ) -RecordCallback = Callable[[DBNRecord], None] -ExceptionCallback = Callable[[Exception], None] -ReconnectCallback = Callable[[pd.Timestamp, pd.Timestamp], None] - _T = TypeVar("_T") @@ -97,6 +94,11 @@ class MappingIntervalDict(TypedDict): symbol: str +RecordCallback = Callable[[DBNRecord], None] +ExceptionCallback = Callable[[Exception], None] +ReconnectCallback = Callable[[pd.Timestamp, pd.Timestamp], None] + + class ClientStream: def __init__( self, @@ -213,3 +215,73 @@ def _warn(self, msg: str) -> None: BentoWarning, stacklevel=3, ) + + +class ClientRecordCallback: + def __init__( + self, + fn: RecordCallback, + exc_fn: ExceptionCallback | None = None, + max_warnings: int = 10, + ) -> None: + if not callable(fn): + raise ValueError(f"{fn} is not callable") + if exc_fn is not None and not callable(exc_fn): + raise ValueError(f"{exc_fn} is not callable") + + self._fn = fn + self._exc_fn = exc_fn + self._max_warnings = max(0, max_warnings) + self._warning_count = 0 + + @property + def callback_name(self) -> str: + return getattr(self._fn, "__name__", str(self._fn)) + + @property + def exc_callback_name(self) -> str: + return getattr(self._exc_fn, "__name__", str(self._exc_fn)) + + def call(self, record: DBNRecord) -> None: + """ + Execute the callback function, passing `record` in as the first + argument. Any exceptions encountered will be dispatched to the + exception callback, if defined. + + Parameters + ---------- + record : DBNRecord + + """ + try: + self._fn(record) + except Exception as exc: + if self._exc_fn is None: + self._warn( + f"callback '{self.callback_name}' encountered an exception without an exception callback: {repr(exc)}", + ) + else: + try: + self._exc_fn(exc) + except Exception as inner_exc: + self._warn( + f"exception callback '{self.exc_callback_name}' encountered an exception: {repr(inner_exc)}", + ) + raise inner_exc from exc + raise exc + + def _warn(self, msg: str) -> None: + logger.warning(msg) + if self._warning_count < self._max_warnings: + self._warning_count += 1 + warnings.warn( + msg, + BentoWarning, + stacklevel=3, + ) + if self._warning_count == self._max_warnings: + warnings.warn( + f"suppressing further warnings for '{self.callback_name}'", + BentoWarning, + stacklevel=3, + ) diff --git a/databento/live/client.py b/databento/live/client.py index a128856..8513fa2 100644 --- a/databento/live/client.py +++ b/databento/live/client.py @@ -23,7 +23,9 @@ from databento.common.error import BentoError from databento.common.parsing import optional_datetime_to_unix_nanoseconds from databento.common.publishers import Dataset -from databento.common.types import ClientStream, DBNRecord +from databento.common.types import ClientRecordCallback +from databento.common.types import ClientStream +from databento.common.types import DBNRecord from databento.common.types import ExceptionCallback from databento.common.types import ReconnectCallback from databento.common.types import RecordCallback @@ -110,7 +112,7 @@ def __init__( reconnect_policy=reconnect_policy, ) - self._session._user_callbacks.append((self._map_symbol, None)) + self._session._user_callbacks.append(ClientRecordCallback(self._map_symbol)) with Live._lock: if not Live._thread.is_alive(): @@ -269,7 +271,9 @@ def add_callback( A callback to register for handling live records as they arrive. exception_callback : Callable[[Exception], None], optional An error handling callback to process exceptions that are raised - in `record_callback`. + in `record_callback`. If no exception callback is provided, + any exceptions encountered will be logged and raised as warnings + for visibility. Raises ------ @@ -282,15 +286,13 @@ def add_callback( Live.add_stream """ - if not callable(record_callback): - raise ValueError(f"{record_callback} is not callable") - - if exception_callback is not None and not callable(exception_callback): - raise ValueError(f"{exception_callback} is not callable") + client_callback = ClientRecordCallback( + fn=record_callback, + exc_fn=exception_callback, + ) - callback_name = getattr(record_callback, "__name__", str(record_callback)) - logger.info("adding user callback %s", callback_name) - self._session._user_callbacks.append((record_callback, exception_callback)) + logger.info("adding user callback %s", client_callback.callback_name) + self._session._user_callbacks.append(client_callback) def add_stream( self, diff --git a/databento/live/session.py b/databento/live/session.py index dfb73a0..454c371 100644 --- a/databento/live/session.py +++ b/databento/live/session.py @@ -19,10 +19,11 @@ from databento.common.enums import ReconnectPolicy from databento.common.error import BentoError from databento.common.publishers import Dataset -from databento.common.types import ClientStream, DBNRecord +from databento.common.types import ClientRecordCallback +from databento.common.types import ClientStream +from databento.common.types import DBNRecord from databento.common.types import ExceptionCallback from databento.common.types import ReconnectCallback -from databento.common.types import RecordCallback from databento.live.gateway import SubscriptionRequest from databento.live.protocol import DatabentoLiveProtocol @@ -195,8 +196,8 @@ def __init__( api_key: str, dataset: Dataset | str, dbn_queue: DBNQueue, - user_callbacks: list[tuple[RecordCallback, ExceptionCallback | None]], user_streams: list[ClientStream], + user_callbacks: list[ClientRecordCallback], loop: asyncio.AbstractEventLoop, metadata: SessionMetadata, ts_out: bool = False, @@ -237,18 +238,16 @@ def received_record(self, record: DBNRecord) -> None: return super().received_record(record) def _dispatch_callbacks(self, record: DBNRecord) -> None: - for callback, exc_callback in self._user_callbacks: + for callback in self._user_callbacks: try: - callback(record) + callback.call(record) except Exception as exc: logger.error( "error dispatching %s to `%s` callback", type(record).__name__, - getattr(callback, "__name__", str(callback)), + callback.callback_name, exc_info=exc, ) - if exc_callback is not None: - exc_callback(exc) def _dispatch_writes(self, record: DBNRecord) -> None: record_bytes = bytes(record) @@ -315,8 +314,8 @@ def __init__( self._loop = loop self._metadata = SessionMetadata() self._user_gateway: str | None = user_gateway - self._user_callbacks: list[tuple[RecordCallback, ExceptionCallback | None]] = [] self._user_streams: list[ClientStream] = [] + self._user_callbacks: list[ClientRecordCallback] = [] self._user_reconnect_callbacks: list[tuple[ReconnectCallback, ExceptionCallback | None]] = ( [] ) @@ -527,19 +526,20 @@ async def wait_for_close(self) -> None: return try: - await self._protocol.authenticated - except Exception as exc: - raise BentoError(exc) from None - - try: - if self._reconnect_task is not None: - await self._reconnect_task - else: - await self._protocol.disconnected - except Exception as exc: - raise BentoError(exc) from None + try: + await self._protocol.authenticated + except Exception as exc: + raise BentoError(exc) from None - self._cleanup() + try: + if self._reconnect_task is not None: + await self._reconnect_task + else: + await self._protocol.disconnected + except Exception as exc: + raise BentoError(exc) from None + finally: + self._cleanup() def _cleanup(self) -> None: logger.debug("cleaning up session_id=%s", self.session_id) diff --git a/tests/test_live_client.py b/tests/test_live_client.py index 51a2afe..db500e0 100644 --- a/tests/test_live_client.py +++ b/tests/test_live_client.py @@ -983,15 +983,15 @@ def test_live_add_callback( """ # Arrange - def callback(_: object) -> None: + def test_callback(_: object) -> None: pass # Act - live_client.add_callback(callback) + live_client.add_callback(test_callback) # Assert assert len(live_client._session._user_callbacks) == 2 # include map_symbols callback - assert (callback, None) in live_client._session._user_callbacks + assert live_client._session._user_callbacks[-1].callback_name == "test_callback" def test_live_add_stream( From 0557d333064a203ba75dd7a90024997cf8a2a9ea Mon Sep 17 00:00:00 2001 From: Nick Macholl Date: Tue, 18 Nov 2025 11:58:27 -0800 Subject: [PATCH 6/6] VER: Release 0.66.0 --- CHANGELOG.md | 2 +- databento/version.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95da9f2..053dee5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.66.0 - TBD +## 0.66.0 - 2025-11-18 #### Enhancements - Added a property `Live.session_id` which returns the streaming session ID when the client is connected diff --git a/databento/version.py b/databento/version.py index e082e13..8833468 100644 --- a/databento/version.py +++ b/databento/version.py @@ -1 +1 @@ -__version__ = "0.65.0" +__version__ = "0.66.0" diff --git a/pyproject.toml b/pyproject.toml index b9e5d59..72cadfd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "databento" -version = "0.65.0" +version = "0.66.0" description = "Official Python client library for Databento" readme = "README.md" requires-python = ">=3.10"