feat(streaming-v3) Add AsyncStreamingClient#197
Conversation
Adds an asyncio-native counterpart to the thread-based StreamingClient. Mirrors the public API one-to-one (connect / disconnect / stream / set_params / force_endpoint / on / create_temporary_token) as coroutines. Event handlers may be plain callables or async functions — async handlers are awaited inline by the single internal read task. Shared with the sync client: URL/header construction, parameter normalization, message parsing, and connection-closed error mapping. The async client deletes the cross-thread `_pending_close_error` hand-off — asyncio's `await ws.recv()` raises `ConnectionClosed` immediately on socket close, so the read task always sees the close naturally and remains the sole dispatch site. - New `assemblyai/streaming/v3/async_client.py` (`AsyncStreamingClient`, `_AsyncHTTPClient`) - Exported from `assemblyai.streaming.v3` - Helper extractions in `client.py` (`_build_uri`, `_build_headers`, `_emit_param_warnings`) reused by both clients - `tests/unit/test_streaming_async.py` covering connect, stream (bytes / sync iter / async iter), terminate, dedup, clean close, handler errors, sync+async handler mix, and temporary tokens - `pytest-asyncio` added to tox test deps Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The new asyncio client API (websockets >= 13) takes ``additional_headers``; the legacy ``websockets.client.connect`` on websockets 11/12 takes ``extra_headers``. Resolve once at import time and route through the ``websocket_connect_async`` wrapper so callers + tests see one entry point. Required for the websockets-11.0 leg of the tox matrix. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds an asyncio-native AsyncStreamingClient to assemblyai.streaming.v3 that mirrors the existing thread-based StreamingClient API, while refactoring shared URI/header/warning helpers into client.py and adding async unit tests.
Changes:
- Added
AsyncStreamingClientimplementation (async connect/disconnect/stream/etc.) with mixed sync+async event handlers. - Extracted shared
_build_uri,_build_headers, and_emit_param_warningshelpers from the sync client for reuse. - Added async-focused unit tests and included
pytest-asyncioin the tox test dependencies.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
tox.ini |
Adds pytest-asyncio to support running the new async unit tests. |
tests/unit/test_streaming_async.py |
Introduces async unit tests for the new client (connection, streaming, handlers, error paths). |
assemblyai/streaming/v3/client.py |
Refactors shared URI/header/warning helpers for sync+async clients. |
assemblyai/streaming/v3/async_client.py |
Implements the new asyncio-native streaming client and async token helper. |
assemblyai/streaming/v3/__init__.py |
Exports AsyncStreamingClient from the package. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async def connect(self, params: StreamingParameters) -> None: | ||
| _emit_param_warnings(params) | ||
|
|
||
| uri = _build_uri(self._options.api_host, params) | ||
| headers = _build_headers(self._options) | ||
|
|
||
| try: | ||
| self._websocket = await asyncio.wait_for( | ||
| websocket_connect_async(uri, additional_headers=headers), | ||
| timeout=15, | ||
| ) | ||
| except websockets.exceptions.InvalidStatus as exc: | ||
| status_code = getattr(getattr(exc, "response", None), "status_code", None) | ||
| await self._report_connection_closed( | ||
| StreamingError( | ||
| message=f"WebSocket handshake rejected (HTTP {status_code})", | ||
| code=status_code, | ||
| ) | ||
| ) | ||
| return | ||
| except ( | ||
| websockets.exceptions.InvalidHandshake, | ||
| websockets.exceptions.ConnectionClosed, | ||
| OSError, | ||
| asyncio.TimeoutError, | ||
| TimeoutError, | ||
| ) as exc: | ||
| await self._report_connection_closed(exc) | ||
| return | ||
|
|
||
| self._read_task = asyncio.create_task( | ||
| self._read_loop(), name="AsyncStreamingClient._read_loop" | ||
| ) | ||
| self._write_task = asyncio.create_task( | ||
| self._write_loop(), name="AsyncStreamingClient._write_loop" | ||
| ) | ||
|
|
| class _AsyncHTTPClient: | ||
| def __init__(self, api_host: str, api_key: Optional[str] = None): | ||
| vi = sys.version_info | ||
| python_version = f"{vi.major}.{vi.minor}.{vi.micro}" | ||
| user_agent = ( | ||
| f"{httpx._client.USER_AGENT} AssemblyAI/1.0 " | ||
| f"(sdk=Python/{__version__} runtime_env=Python/{python_version})" | ||
| ) | ||
|
|
||
| headers = {"User-Agent": user_agent} | ||
|
|
||
| if api_key: | ||
| headers["Authorization"] = api_key | ||
|
|
||
| self._http_client = httpx.AsyncClient( | ||
| base_url="https://" + api_host, | ||
| headers=headers, | ||
| ) |
| deadline = asyncio.get_event_loop().time() + timeout | ||
| while asyncio.get_event_loop().time() < deadline: | ||
| read_done = client._read_task is None or client._read_task.done() | ||
| write_done = client._write_task is None or client._write_task.done() | ||
| if read_done and write_done and client._stop_event.is_set(): | ||
| return | ||
| await asyncio.sleep(0.01) | ||
|
|
||
|
|
| if self._stop_event.is_set(): | ||
| return | ||
|
|
Four fixes from the Copilot review of PR #197: 1. Read-loop race: the write task only set ``_stop_event`` on ``ConnectionClosed`` and trusted the read task's next ``recv()`` to raise. But the read task checks stop at the top of its loop, and if it processed a buffered message between recv calls and saw stop before the next ``recv()``, it would exit silently with the close undispatched. The write task now calls ``await self._report_connection_closed(exc)`` directly — that method's flag check + set is synchronous (no ``await`` between them), so the read task's parallel call is a safe no-op. 2. Reconnect guard: ``connect()`` now raises ``RuntimeError`` if the client already has an active websocket or read task, matching the sync client's "single-use" lifecycle. 3. HTTP client cleanup: ``_AsyncHTTPClient`` gains ``aclose()`` and ``disconnect()`` calls it, so ``httpx.AsyncClient`` doesn't leak its connection pool and trigger "unclosed client" warnings. 4. Test helper: ``_wait_for_tasks`` now raises ``AssertionError`` on timeout instead of returning silently (so stalls fail tests deterministically), and uses ``asyncio.get_running_loop().time()``. Adds three regression tests covering each behavioral fix. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Closing — the SDK repo is a Copybara mirror of |
Summary
Adds
AsyncStreamingClient— an asyncio-native counterpart to the existing thread-basedStreamingClient. The public API mirrors the sync version one-to-one (connect/disconnect/stream/set_params/force_endpoint/on/create_temporary_token) but as coroutines. Zero changes to the sync API.Handlers may be plain callables or async functions — async handlers are awaited inline by the single internal read task.
Design notes
_build_uri/_build_headers/_emit_param_warningsinclient.py._pending_close_errorcross-thread hand-off (needed becauserecv(timeout=1)polls) is deleted entirely in the async version.await ws.recv()raisesConnectionClosedimmediately on socket close, so the read task always sees the close naturally and remains the sole handler-dispatch site — same error-dedup behavior, less machinery.disconnect()cancels the read/write tasks after_stop_eventis set (and after lettingTerminateSessiondrain whenterminate=True).stream()acceptsbytes, syncIterable[bytes], orAsyncIterable[bytes].websockets.asyncio.client.connectwhen available (websockets >= 13) and falls back towebsockets.client.connectfor the supported lower bound (websockets>=11.0persetup.py).pytest-asynciototox.initest deps only.Files
assemblyai/streaming/v3/async_client.pytests/unit/test_streaming_async.py(15 tests)assemblyai/streaming/v3/__init__.py(exportAsyncStreamingClient)assemblyai/streaming/v3/client.py(extract shared helpers)tox.ini(addpytest-asyncio)Test plan
pytest tests/unit/test_streaming_async.py— 15 new tests passpytest tests/unit/test_streaming.py— 33 existing sync tests pass (regression)ruff format --check+ruff checkclean on changed filesmypy --follow-imports=silent --ignore-missing-importsclean on changed files (matches CI lint)🤖 Generated with Claude Code