Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## 0.66.0 - 2025-11-18

#### Enhancements
- Added a property `Live.session_id` which returns the streaming session ID when the client is connected
- Streams added with `Live.add_stream()` which do not define an exception handler will now emit a warning if an exception is raised while executing the callback
- Callback functions added with `Live.add_callback()` which do not define an exception handler will now emit a warning if an exception is raised while executing the callback
- Upgraded `databento-dbn` to 0.44.0
- Added logic to set `code` when upgrading version 1 `SystemMsg` to newer versions

#### Bug fixes
- Streams opened by `Live.add_stream()` will now close properly when the streaming session is closed

## 0.65.0 - 2025-11-11

#### Deprecations
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The library is fully compatible with distributions of Anaconda 2023.x and above.
The minimum dependencies as found in the `pyproject.toml` are also listed below:
- python = "^3.10"
- aiohttp = "^3.8.3"
- databento-dbn = "~0.43.0"
- databento-dbn = "~0.44.0"
- numpy = ">=1.23.5"
- pandas = ">=1.5.3"
- pip-system-certs = ">=4.0" (Windows only)
Expand Down
205 changes: 201 additions & 4 deletions databento/common/types.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import datetime as dt
import logging
from collections.abc import Callable
from os import PathLike
import pathlib
from typing import Generic
from typing import IO
from typing import TypedDict
from typing import TypeVar
import warnings

import databento_dbn
import pandas as pd

from databento.common.error import BentoWarning

logger = logging.getLogger(__name__)

DBNRecord = (
databento_dbn.BBOMsg
Expand All @@ -32,10 +40,6 @@
| databento_dbn.ErrorMsgV1
)

RecordCallback = Callable[[DBNRecord], None]
ExceptionCallback = Callable[[Exception], None]
ReconnectCallback = Callable[[pd.Timestamp, pd.Timestamp], None]

_T = TypeVar("_T")


Expand Down Expand Up @@ -88,3 +92,196 @@ class MappingIntervalDict(TypedDict):
start_date: dt.date
end_date: dt.date
symbol: str


RecordCallback = Callable[[DBNRecord], None]
ExceptionCallback = Callable[[Exception], None]
ReconnectCallback = Callable[[pd.Timestamp, pd.Timestamp], None]


class ClientStream:
def __init__(
self,
stream: IO[bytes] | PathLike[str] | str,
exc_fn: ExceptionCallback | None = None,
max_warnings: int = 10,
) -> None:
is_managed = False

if isinstance(stream, (str, PathLike)):
stream = pathlib.Path(stream).open("xb")
is_managed = True

if not hasattr(stream, "write"):
raise ValueError(f"{type(stream).__name__} does not support write()")

if not hasattr(stream, "writable") or not stream.writable():
raise ValueError(f"{type(stream).__name__} is not a writable stream")

if exc_fn is not None and not callable(exc_fn):
raise ValueError(f"{exc_fn} is not callable")

self._stream = stream
self._exc_fn = exc_fn
self._max_warnings = max(0, max_warnings)
self._warning_count = 0
self._is_managed = is_managed

@property
def stream_name(self) -> str:
return getattr(self._stream, "__name__", str(self._stream))

@property
def is_closed(self) -> bool:
"""
Return `True` if the underlying stream is closed.

Returns
-------
bool

"""
return self._stream.closed

@property
def is_managed(self) -> bool:
"""
Return `True` if the underlying stream was opened by the
`ClientStream`. This can be used to determine if the stream should be
closed automatically.

Returns
-------
bool

"""
return self._is_managed

@property
def exc_callback_name(self) -> str:
return getattr(self._exc_fn, "__name__", str(self._exc_fn))

def close(self) -> None:
"""
Close the underlying stream.
"""
self._stream.close()

def flush(self) -> None:
"""
Flush the underlying stream.
"""
self._stream.flush()

def write(self, data: bytes) -> None:
"""
Write data to the underlying stream. Any exceptions encountered will be
dispatched to the exception callback, if defined.

Parameters
----------
data : bytes

"""
try:
self._stream.write(data)
except Exception as exc:
if self._exc_fn is None:
self._warn(
f"stream '{self.stream_name}' encountered an exception without an exception handler: {repr(exc)}",
)
else:
try:
self._exc_fn(exc)
except Exception as inner_exc:
self._warn(
f"exception callback '{self.exc_callback_name}' encountered an exception: {repr(inner_exc)}",
)
raise inner_exc from exc
raise exc

def _warn(self, msg: str) -> None:
logger.warning(msg)
if self._warning_count < self._max_warnings:
self._warning_count += 1
warnings.warn(
msg,
BentoWarning,
stacklevel=3,
)
if self._warning_count == self._max_warnings:
warnings.warn(
f"suppressing further warnings for '{self.stream_name}'",
BentoWarning,
stacklevel=3,
)


class ClientRecordCallback:
def __init__(
self,
fn: RecordCallback,
exc_fn: ExceptionCallback | None = None,
max_warnings: int = 10,
) -> None:
if not callable(fn):
raise ValueError(f"{fn} is not callable")
if exc_fn is not None and not callable(exc_fn):
raise ValueError(f"{exc_fn} is not callable")

self._fn = fn
self._exc_fn = exc_fn
self._max_warnings = max(0, max_warnings)
self._warning_count = 0

@property
def callback_name(self) -> str:
return getattr(self._fn, "__name__", str(self._fn))

@property
def exc_callback_name(self) -> str:
return getattr(self._exc_fn, "__name__", str(self._exc_fn))

def call(self, record: DBNRecord) -> None:
"""
Execute the callback function, passing `record` in as the first
argument. Any exceptions encountered will be dispatched to the
exception callback, if defined.

Parameters
----------
record : DBNRecord

"""
try:
self._fn(record)
except Exception as exc:
if self._exc_fn is None:
self._warn(
f"callback '{self.callback_name}' encountered an exception without an exception callback: {repr(exc)}",
)
else:
try:
self._exc_fn(exc)
except Exception as inner_exc:
self._warn(
f"exception callback '{self.exc_callback_name}' encountered an exception: {repr(inner_exc)}",
)
raise inner_exc from exc
raise exc

def _warn(self, msg: str) -> None:
logger.warning(msg)
if self._warning_count < self._max_warnings:
self._warning_count += 1
warnings.warn(
msg,
BentoWarning,
stacklevel=3,
)
if self._warning_count == self._max_warnings:
warnings.warn(
f"suppressing further warnings for '{self.callback_name}'",
BentoWarning,
stacklevel=3,
)
63 changes: 35 additions & 28 deletions databento/live/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
import logging
import os
import pathlib
import queue
import threading
from collections.abc import Iterable
Expand All @@ -24,6 +23,8 @@
from databento.common.error import BentoError
from databento.common.parsing import optional_datetime_to_unix_nanoseconds
from databento.common.publishers import Dataset
from databento.common.types import ClientRecordCallback
from databento.common.types import ClientStream
from databento.common.types import DBNRecord
from databento.common.types import ExceptionCallback
from databento.common.types import ReconnectCallback
Expand Down Expand Up @@ -111,7 +112,7 @@ def __init__(
reconnect_policy=reconnect_policy,
)

self._session._user_callbacks.append((self._map_symbol, None))
self._session._user_callbacks.append(ClientRecordCallback(self._map_symbol))

with Live._lock:
if not Live._thread.is_alive():
Expand Down Expand Up @@ -214,6 +215,19 @@ def port(self) -> int:
"""
return self._port

@property
def session_id(self) -> str | None:
"""
Return the session ID for the current session. If `None`, the client is
not connected.

Returns
-------
str | None

"""
return self._session.session_id

@property
def symbology_map(self) -> dict[int, str | int]:
"""
Expand Down Expand Up @@ -257,7 +271,9 @@ def add_callback(
A callback to register for handling live records as they arrive.
exception_callback : Callable[[Exception], None], optional
An error handling callback to process exceptions that are raised
in `record_callback`.
in `record_callback`. If no exception callback is provided,
any exceptions encountered will be logged and raised as warnings
for visibility.

Raises
------
Expand All @@ -270,15 +286,13 @@ def add_callback(
Live.add_stream

"""
if not callable(record_callback):
raise ValueError(f"{record_callback} is not callable")

if exception_callback is not None and not callable(exception_callback):
raise ValueError(f"{exception_callback} is not callable")
client_callback = ClientRecordCallback(
fn=record_callback,
exc_fn=exception_callback,
)

callback_name = getattr(record_callback, "__name__", str(record_callback))
logger.info("adding user callback %s", callback_name)
self._session._user_callbacks.append((record_callback, exception_callback))
logger.info("adding user callback %s", client_callback.callback_name)
self._session._user_callbacks.append(client_callback)

def add_stream(
self,
Expand All @@ -294,7 +308,9 @@ def add_stream(
The IO stream to write to when handling live records as they arrive.
exception_callback : Callable[[Exception], None], optional
An error handling callback to process exceptions that are raised
when writing to the stream.
when writing to the stream. If no exception callback is provided,
any exceptions encountered will be logged and raised as warnings
for visibility.

Raises
------
Expand All @@ -309,23 +325,12 @@ def add_stream(
Live.add_callback

"""
if isinstance(stream, (str, PathLike)):
stream = pathlib.Path(stream).open("xb")

if not hasattr(stream, "write"):
raise ValueError(f"{type(stream).__name__} does not support write()")

if not hasattr(stream, "writable") or not stream.writable():
raise ValueError(f"{type(stream).__name__} is not a writable stream")
client_stream = ClientStream(stream=stream, exc_fn=exception_callback)

if exception_callback is not None and not callable(exception_callback):
raise ValueError(f"{exception_callback} is not callable")

stream_name = getattr(stream, "name", str(stream))
logger.info("adding user stream %s", stream_name)
logger.info("adding user stream %s", client_stream.stream_name)
if self.metadata is not None:
stream.write(bytes(self.metadata))
self._session._user_streams.append((stream, exception_callback))
client_stream.write(self.metadata.encode())
self._session._user_streams.append(client_stream)

def add_reconnect_callback(
self,
Expand Down Expand Up @@ -365,7 +370,9 @@ def add_reconnect_callback(

callback_name = getattr(reconnect_callback, "__name__", str(reconnect_callback))
logger.info("adding user reconnect callback %s", callback_name)
self._session._user_reconnect_callbacks.append((reconnect_callback, exception_callback))
self._session._user_reconnect_callbacks.append(
(reconnect_callback, exception_callback),
)

def start(
self,
Expand Down
Loading