From 198bc3acfc9a345bcd83985df78fd5513b942154 Mon Sep 17 00:00:00 2001 From: Luke Oliff Date: Thu, 15 Jan 2026 11:35:49 +0000 Subject: [PATCH 1/8] refactor(examples): reorganize with scalable numbering and production-ready patterns Reorganize all example files with a more scalable numbering system organized by feature area: - 01-09: Authentication - 10-19: Transcription (Listen) - 20-29: Text-to-Speech (Speak) - 30-39: Voice Agent - 40-49: Text Intelligence (Read) - 50-59: Management API - 60-69: On-Premises - 70-79: Configuration & Advanced Changes: - Renamed all examples to follow new numbering scheme - Updated WebSocket examples (13, 14) with production-ready streaming patterns - Removed artificial delays that don't reflect real usage - Simplified to straightforward file streaming approach - Added clear async implementation examples in comments - Updated README.md to reflect new organization The new numbering makes it easier to add future examples without renumbering existing ones. --- examples/07-transcription-live-websocket.py | 61 ---------- ...py => 10-transcription-prerecorded-url.py} | 0 ...y => 11-transcription-prerecorded-file.py} | 0 ... 12-transcription-prerecorded-callback.py} | 0 examples/13-transcription-live-websocket.py | 102 ++++++++++++++++ .../14-transcription-live-websocket-v2.py | 109 ++++++++++++++++++ ...y => 15-transcription-advanced-options.py} | 0 ...-single.py => 20-text-to-speech-single.py} | 0 ...ming.py => 21-text-to-speech-streaming.py} | 0 .../26-transcription-live-websocket-v2.py | 55 --------- .../{09-voice-agent.py => 30-voice-agent.py} | 0 ...ntelligence.py => 40-text-intelligence.py} | 0 ...-projects.py => 50-management-projects.py} | 0 ...nagement-keys.py => 51-management-keys.py} | 0 ...nt-members.py => 52-management-members.py} | 0 ...nt-invites.py => 53-management-invites.py} | 0 ...gement-usage.py => 54-management-usage.py} | 0 ...nt-billing.py => 55-management-billing.py} | 0 ...ment-models.py => 56-management-models.py} | 0 ...redentials.py => 60-onprem-credentials.py} | 0 ...quest-options.py => 70-request-options.py} | 0 ...error-handling.py => 71-error-handling.py} | 0 examples/README.md | 58 +++++----- 23 files changed, 240 insertions(+), 145 deletions(-) delete mode 100644 examples/07-transcription-live-websocket.py rename examples/{04-transcription-prerecorded-url.py => 10-transcription-prerecorded-url.py} (100%) rename examples/{05-transcription-prerecorded-file.py => 11-transcription-prerecorded-file.py} (100%) rename examples/{06-transcription-prerecorded-callback.py => 12-transcription-prerecorded-callback.py} (100%) create mode 100644 examples/13-transcription-live-websocket.py create mode 100644 examples/14-transcription-live-websocket-v2.py rename examples/{22-transcription-advanced-options.py => 15-transcription-advanced-options.py} (100%) rename examples/{10-text-to-speech-single.py => 20-text-to-speech-single.py} (100%) rename examples/{11-text-to-speech-streaming.py => 21-text-to-speech-streaming.py} (100%) delete mode 100644 examples/26-transcription-live-websocket-v2.py rename examples/{09-voice-agent.py => 30-voice-agent.py} (100%) rename examples/{12-text-intelligence.py => 40-text-intelligence.py} (100%) rename examples/{13-management-projects.py => 50-management-projects.py} (100%) rename examples/{14-management-keys.py => 51-management-keys.py} (100%) rename examples/{15-management-members.py => 52-management-members.py} (100%) rename examples/{16-management-invites.py => 53-management-invites.py} (100%) rename examples/{17-management-usage.py => 54-management-usage.py} (100%) rename examples/{18-management-billing.py => 55-management-billing.py} (100%) rename examples/{19-management-models.py => 56-management-models.py} (100%) rename examples/{20-onprem-credentials.py => 60-onprem-credentials.py} (100%) rename examples/{23-request-options.py => 70-request-options.py} (100%) rename examples/{24-error-handling.py => 71-error-handling.py} (100%) diff --git a/examples/07-transcription-live-websocket.py b/examples/07-transcription-live-websocket.py deleted file mode 100644 index e12c9f67..00000000 --- a/examples/07-transcription-live-websocket.py +++ /dev/null @@ -1,61 +0,0 @@ -""" -Example: Live Transcription with WebSocket (Listen V1) - -This example shows how to stream audio for real-time transcription using WebSocket. -""" - -from typing import Union - -from dotenv import load_dotenv - -load_dotenv() - -from deepgram import DeepgramClient -from deepgram.core.events import EventType -from deepgram.listen.v1.types import ( - ListenV1Metadata, - ListenV1Results, - ListenV1SpeechStarted, - ListenV1UtteranceEnd, -) - -ListenV1SocketClientResponse = Union[ListenV1Results, ListenV1Metadata, ListenV1UtteranceEnd, ListenV1SpeechStarted] - -client = DeepgramClient() - -try: - with client.listen.v1.connect(model="nova-3") as connection: - - def on_message(message: ListenV1SocketClientResponse) -> None: - msg_type = getattr(message, "type", "Unknown") - print(f"Received {msg_type} event") - - # Extract transcription from Results events - if isinstance(message, ListenV1Results): - if message.channel and message.channel.alternatives: - transcript = message.channel.alternatives[0].transcript - if transcript: - print(f"Transcript: {transcript}") - - connection.on(EventType.OPEN, lambda _: print("Connection opened")) - connection.on(EventType.MESSAGE, on_message) - connection.on(EventType.CLOSE, lambda _: print("Connection closed")) - connection.on(EventType.ERROR, lambda error: print(f"Error: {error}")) - - # Start listening - this blocks until the connection closes - # In production, you would send audio data here: - # audio_path = os.path.join(os.path.dirname(__file__), "..", "fixtures", "audio.wav") - # with open(audio_path, "rb") as audio_file: - # audio_data = audio_file.read() - # connection.send_listen_v_1_media(audio_data) - - connection.start_listening() - - # For async version: - # from deepgram import AsyncDeepgramClient - # async with client.listen.v1.connect(model="nova-3") as connection: - # # ... same event handlers ... - # await connection.start_listening() - -except Exception as e: - print(f"Error: {e}") diff --git a/examples/04-transcription-prerecorded-url.py b/examples/10-transcription-prerecorded-url.py similarity index 100% rename from examples/04-transcription-prerecorded-url.py rename to examples/10-transcription-prerecorded-url.py diff --git a/examples/05-transcription-prerecorded-file.py b/examples/11-transcription-prerecorded-file.py similarity index 100% rename from examples/05-transcription-prerecorded-file.py rename to examples/11-transcription-prerecorded-file.py diff --git a/examples/06-transcription-prerecorded-callback.py b/examples/12-transcription-prerecorded-callback.py similarity index 100% rename from examples/06-transcription-prerecorded-callback.py rename to examples/12-transcription-prerecorded-callback.py diff --git a/examples/13-transcription-live-websocket.py b/examples/13-transcription-live-websocket.py new file mode 100644 index 00000000..c181c7f0 --- /dev/null +++ b/examples/13-transcription-live-websocket.py @@ -0,0 +1,102 @@ +""" +Example: Live Transcription with WebSocket (Listen V1) + +This example demonstrates how to use WebSocket for real-time audio transcription. +In production, you would stream audio from a microphone or other live source. +This example uses an audio file to demonstrate the streaming pattern. +""" + +import os +import threading +from typing import Union + +from dotenv import load_dotenv + +load_dotenv() + +from deepgram import DeepgramClient +from deepgram.core.events import EventType +from deepgram.listen.v1.types import ( + ListenV1Metadata, + ListenV1Results, + ListenV1SpeechStarted, + ListenV1UtteranceEnd, +) + +ListenV1SocketClientResponse = Union[ListenV1Results, ListenV1Metadata, ListenV1UtteranceEnd, ListenV1SpeechStarted] + +# Chunk size in bytes (e.g., 8KB chunks for efficient streaming) +CHUNK_SIZE = 8192 + +client = DeepgramClient() + +try: + with client.listen.v1.connect(model="nova-3") as connection: + + def on_message(message: ListenV1SocketClientResponse) -> None: + # Extract transcription from Results events + if isinstance(message, ListenV1Results): + if message.channel and message.channel.alternatives: + transcript = message.channel.alternatives[0].transcript + if transcript: + print(f"Transcript: {transcript}") + + def on_open(_) -> None: + print("Connection opened") + + def on_close(_) -> None: + print("Connection closed") + + def on_error(error) -> None: + print(f"Error: {error}") + + # Register event handlers + connection.on(EventType.OPEN, on_open) + connection.on(EventType.MESSAGE, on_message) + connection.on(EventType.CLOSE, on_close) + connection.on(EventType.ERROR, on_error) + + # Start listening in a background thread + threading.Thread(target=connection.start_listening, daemon=True).start() + + # Stream audio file + # In production, replace this with audio from microphone or other live source + audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") + + with open(audio_path, "rb") as audio_file: + print(f"Streaming audio from {audio_path}") + + while True: + chunk = audio_file.read(CHUNK_SIZE) + if not chunk: + break + + connection.send_listen_v_1_media(chunk) + + print("Finished sending audio") + + # For async version: + # from deepgram import AsyncDeepgramClient + # + # async with client.listen.v1.connect(model="nova-3") as connection: + # async def on_message(message): + # if isinstance(message, ListenV1Results): + # if message.channel and message.channel.alternatives: + # transcript = message.channel.alternatives[0].transcript + # if transcript: + # print(f"Transcript: {transcript}") + # + # connection.on(EventType.MESSAGE, on_message) + # + # # Start listening + # listen_task = asyncio.create_task(connection.start_listening()) + # + # # Stream audio + # with open(audio_path, "rb") as audio_file: + # while chunk := audio_file.read(CHUNK_SIZE): + # await connection.send_listen_v_1_media(chunk) + # + # await listen_task + +except Exception as e: + print(f"Error: {e}") diff --git a/examples/14-transcription-live-websocket-v2.py b/examples/14-transcription-live-websocket-v2.py new file mode 100644 index 00000000..6c7f6520 --- /dev/null +++ b/examples/14-transcription-live-websocket-v2.py @@ -0,0 +1,109 @@ +""" +Example: Live Transcription with WebSocket V2 (Listen V2) + +This example demonstrates how to use Listen V2 for advanced conversational speech +recognition with contextual turn detection. + +Note: Listen V2 requires 16kHz linear16 PCM audio format. +In production, you would stream audio from a microphone or other live source. +This example uses an audio file to demonstrate the streaming pattern. +""" + +import os +import threading +from typing import Union + +from dotenv import load_dotenv + +load_dotenv() + +from deepgram import DeepgramClient +from deepgram.core.events import EventType +from deepgram.listen.v2.types import ( + ListenV2Connected, + ListenV2FatalError, + ListenV2TurnInfo, +) + +ListenV2SocketClientResponse = Union[ListenV2Connected, ListenV2TurnInfo, ListenV2FatalError] + +# Chunk size in bytes (e.g., 8KB chunks for efficient streaming) +CHUNK_SIZE = 8192 + +client = DeepgramClient() + +try: + # Listen V2 requires specific audio format: 16kHz linear16 PCM + with client.listen.v2.connect( + model="flux-general-en", + encoding="linear16", + sample_rate="16000" + ) as connection: + + def on_message(message: ListenV2SocketClientResponse) -> None: + # Handle TurnInfo events containing transcription and turn metadata + if isinstance(message, ListenV2TurnInfo): + print(f"Turn {message.turn_index}: {message.transcript}") + print(f" Event: {message.event}") + + def on_open(_) -> None: + print("Connection opened") + + def on_close(_) -> None: + print("Connection closed") + + def on_error(error) -> None: + print(f"Error: {error}") + + # Register event handlers + connection.on(EventType.OPEN, on_open) + connection.on(EventType.MESSAGE, on_message) + connection.on(EventType.CLOSE, on_close) + connection.on(EventType.ERROR, on_error) + + # Start listening in a background thread + threading.Thread(target=connection.start_listening, daemon=True).start() + + # Stream audio file + # In production, replace this with audio from microphone or other live source + # IMPORTANT: Audio must be 16kHz linear16 PCM for Listen V2 + audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") + + with open(audio_path, "rb") as audio_file: + print(f"Streaming audio from {audio_path}") + + while True: + chunk = audio_file.read(CHUNK_SIZE) + if not chunk: + break + + connection.send_listen_v_2_media(chunk) + + print("Finished sending audio") + + # For async version: + # from deepgram import AsyncDeepgramClient + # + # async with client.listen.v2.connect( + # model="flux-general-en", + # encoding="linear16", + # sample_rate="16000" + # ) as connection: + # async def on_message(message): + # if isinstance(message, ListenV2TurnInfo): + # print(f"Turn {message.turn_index}: {message.transcript}") + # + # connection.on(EventType.MESSAGE, on_message) + # + # # Start listening + # listen_task = asyncio.create_task(connection.start_listening()) + # + # # Stream audio + # with open(audio_path, "rb") as audio_file: + # while chunk := audio_file.read(CHUNK_SIZE): + # await connection.send_listen_v_2_media(chunk) + # + # await listen_task + +except Exception as e: + print(f"Error: {e}") diff --git a/examples/22-transcription-advanced-options.py b/examples/15-transcription-advanced-options.py similarity index 100% rename from examples/22-transcription-advanced-options.py rename to examples/15-transcription-advanced-options.py diff --git a/examples/10-text-to-speech-single.py b/examples/20-text-to-speech-single.py similarity index 100% rename from examples/10-text-to-speech-single.py rename to examples/20-text-to-speech-single.py diff --git a/examples/11-text-to-speech-streaming.py b/examples/21-text-to-speech-streaming.py similarity index 100% rename from examples/11-text-to-speech-streaming.py rename to examples/21-text-to-speech-streaming.py diff --git a/examples/26-transcription-live-websocket-v2.py b/examples/26-transcription-live-websocket-v2.py deleted file mode 100644 index 222c1441..00000000 --- a/examples/26-transcription-live-websocket-v2.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -Example: Live Transcription with WebSocket V2 (Listen V2) - -This example shows how to use Listen V2 for advanced conversational speech recognition -with contextual turn detection. -""" - -from typing import Union - -from dotenv import load_dotenv - -load_dotenv() - -from deepgram import DeepgramClient -from deepgram.core.events import EventType -from deepgram.listen.v2.types import ( - ListenV2Connected, - ListenV2FatalError, - ListenV2TurnInfo, -) - -ListenV2SocketClientResponse = Union[ListenV2Connected, ListenV2TurnInfo, ListenV2FatalError] - -client = DeepgramClient() - -try: - with client.listen.v2.connect(model="flux-general-en", encoding="linear16", sample_rate="16000") as connection: - - def on_message(message: ListenV2SocketClientResponse) -> None: - msg_type = getattr(message, "type", "Unknown") - print(f"Received {msg_type} event") - - # Extract transcription from TurnInfo events - if isinstance(message, ListenV2TurnInfo): - print(f"Turn transcript: {message.transcript}") - print(f"Turn event: {message.event}") - print(f"Turn index: {message.turn_index}") - - connection.on(EventType.OPEN, lambda _: print("Connection opened")) - connection.on(EventType.MESSAGE, on_message) - connection.on(EventType.CLOSE, lambda _: print("Connection closed")) - connection.on(EventType.ERROR, lambda error: print(f"Error: {error}")) - - # Start listening - this blocks until the connection closes - # In production, you would send audio data here using connection.send_listen_v_2_media() - connection.start_listening() - - # For async version: - # from deepgram import AsyncDeepgramClient - # async with client.listen.v2.connect(...) as connection: - # # ... same event handlers ... - # await connection.start_listening() - -except Exception as e: - print(f"Error: {e}") diff --git a/examples/09-voice-agent.py b/examples/30-voice-agent.py similarity index 100% rename from examples/09-voice-agent.py rename to examples/30-voice-agent.py diff --git a/examples/12-text-intelligence.py b/examples/40-text-intelligence.py similarity index 100% rename from examples/12-text-intelligence.py rename to examples/40-text-intelligence.py diff --git a/examples/13-management-projects.py b/examples/50-management-projects.py similarity index 100% rename from examples/13-management-projects.py rename to examples/50-management-projects.py diff --git a/examples/14-management-keys.py b/examples/51-management-keys.py similarity index 100% rename from examples/14-management-keys.py rename to examples/51-management-keys.py diff --git a/examples/15-management-members.py b/examples/52-management-members.py similarity index 100% rename from examples/15-management-members.py rename to examples/52-management-members.py diff --git a/examples/16-management-invites.py b/examples/53-management-invites.py similarity index 100% rename from examples/16-management-invites.py rename to examples/53-management-invites.py diff --git a/examples/17-management-usage.py b/examples/54-management-usage.py similarity index 100% rename from examples/17-management-usage.py rename to examples/54-management-usage.py diff --git a/examples/18-management-billing.py b/examples/55-management-billing.py similarity index 100% rename from examples/18-management-billing.py rename to examples/55-management-billing.py diff --git a/examples/19-management-models.py b/examples/56-management-models.py similarity index 100% rename from examples/19-management-models.py rename to examples/56-management-models.py diff --git a/examples/20-onprem-credentials.py b/examples/60-onprem-credentials.py similarity index 100% rename from examples/20-onprem-credentials.py rename to examples/60-onprem-credentials.py diff --git a/examples/23-request-options.py b/examples/70-request-options.py similarity index 100% rename from examples/23-request-options.py rename to examples/70-request-options.py diff --git a/examples/24-error-handling.py b/examples/71-error-handling.py similarity index 100% rename from examples/24-error-handling.py rename to examples/71-error-handling.py diff --git a/examples/README.md b/examples/README.md index 4499527e..32e099e8 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,54 +1,54 @@ # Deepgram Python SDK Examples -This directory contains comprehensive examples demonstrating how to use the Deepgram Python SDK. These examples cover all major use cases and demonstrate production-ready patterns. +This directory contains comprehensive examples demonstrating how to use the Deepgram Python SDK. Examples are organized by feature area, with each section starting at a multiple of 10. ## Examples Overview -### Authentication +### 01-09: Authentication - **01-authentication-api-key.py** - API key authentication - **02-authentication-access-token.py** - Access token authentication -### Transcription +### 10-19: Transcription (Listen) -- **04-transcription-prerecorded-url.py** - Transcribe audio from URL -- **05-transcription-prerecorded-file.py** - Transcribe audio from local file -- **06-transcription-prerecorded-callback.py** - Async transcription with callbacks -- **07-transcription-live-websocket.py** - Live transcription via WebSocket (Listen V1) -- **22-transcription-advanced-options.py** - Advanced transcription options -- **26-transcription-live-websocket-v2.py** - Live transcription via WebSocket (Listen V2) +- **10-transcription-prerecorded-url.py** - Transcribe audio from URL +- **11-transcription-prerecorded-file.py** - Transcribe audio from local file +- **12-transcription-prerecorded-callback.py** - Async transcription with callbacks +- **13-transcription-live-websocket.py** - Live transcription via WebSocket (Listen V1) +- **14-transcription-live-websocket-v2.py** - Live transcription via WebSocket (Listen V2) +- **15-transcription-advanced-options.py** - Advanced transcription options -### Voice Agent +### 20-29: Text-to-Speech (Speak) -- **09-voice-agent.py** - Voice Agent configuration and usage +- **20-text-to-speech-single.py** - Single request TTS (REST API) +- **21-text-to-speech-streaming.py** - Streaming TTS via WebSocket -### Text-to-Speech +### 30-39: Voice Agent -- **10-text-to-speech-single.py** - Single request TTS -- **11-text-to-speech-streaming.py** - Streaming TTS via WebSocket +- **30-voice-agent.py** - Voice Agent configuration and usage -### Text Intelligence +### 40-49: Text Intelligence (Read) -- **12-text-intelligence.py** - Text analysis using AI features +- **40-text-intelligence.py** - Text analysis using AI features -### Management API +### 50-59: Management API -- **13-management-projects.py** - Project management (list, get, update, delete) -- **14-management-keys.py** - API key management (list, get, create, delete) -- **15-management-members.py** - Member management (list, remove, scopes) -- **16-management-invites.py** - Invitation management (list, send, delete, leave) -- **17-management-usage.py** - Usage statistics and request information -- **18-management-billing.py** - Billing and balance information -- **19-management-models.py** - Model information +- **50-management-projects.py** - Project management (list, get, update, delete) +- **51-management-keys.py** - API key management (list, get, create, delete) +- **52-management-members.py** - Member management (list, remove, scopes) +- **53-management-invites.py** - Invitation management (list, send, delete, leave) +- **54-management-usage.py** - Usage statistics and request information +- **55-management-billing.py** - Billing and balance information +- **56-management-models.py** - Model information -### On-Premises +### 60-69: On-Premises -- **20-onprem-credentials.py** - On-premises credentials management +- **60-onprem-credentials.py** - On-premises credentials management -### Configuration & Advanced +### 70-79: Configuration & Advanced -- **23-request-options.py** - Request options including additional query parameters -- **24-error-handling.py** - Error handling patterns +- **70-request-options.py** - Request options including additional query parameters +- **71-error-handling.py** - Error handling patterns ## Usage From d257ca2eca9c1b2ef553237d6332dde702e5c5f5 Mon Sep 17 00:00:00 2001 From: Luke Oliff Date: Thu, 15 Jan 2026 11:40:21 +0000 Subject: [PATCH 2/8] style(examples): clean up whitespace in WebSocket examples Remove trailing whitespace and format code consistently in WebSocket streaming examples. --- examples/13-transcription-live-websocket.py | 14 ++++++------- .../14-transcription-live-websocket-v2.py | 20 ++++++++----------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/examples/13-transcription-live-websocket.py b/examples/13-transcription-live-websocket.py index c181c7f0..ba0ec00a 100644 --- a/examples/13-transcription-live-websocket.py +++ b/examples/13-transcription-live-websocket.py @@ -62,17 +62,17 @@ def on_error(error) -> None: # Stream audio file # In production, replace this with audio from microphone or other live source audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") - + with open(audio_path, "rb") as audio_file: print(f"Streaming audio from {audio_path}") - + while True: chunk = audio_file.read(CHUNK_SIZE) if not chunk: break - + connection.send_listen_v_1_media(chunk) - + print("Finished sending audio") # For async version: @@ -87,15 +87,15 @@ def on_error(error) -> None: # print(f"Transcript: {transcript}") # # connection.on(EventType.MESSAGE, on_message) - # + # # # Start listening # listen_task = asyncio.create_task(connection.start_listening()) - # + # # # Stream audio # with open(audio_path, "rb") as audio_file: # while chunk := audio_file.read(CHUNK_SIZE): # await connection.send_listen_v_1_media(chunk) - # + # # await listen_task except Exception as e: diff --git a/examples/14-transcription-live-websocket-v2.py b/examples/14-transcription-live-websocket-v2.py index 6c7f6520..617ebc77 100644 --- a/examples/14-transcription-live-websocket-v2.py +++ b/examples/14-transcription-live-websocket-v2.py @@ -34,11 +34,7 @@ try: # Listen V2 requires specific audio format: 16kHz linear16 PCM - with client.listen.v2.connect( - model="flux-general-en", - encoding="linear16", - sample_rate="16000" - ) as connection: + with client.listen.v2.connect(model="flux-general-en", encoding="linear16", sample_rate="16000") as connection: def on_message(message: ListenV2SocketClientResponse) -> None: # Handle TurnInfo events containing transcription and turn metadata @@ -68,17 +64,17 @@ def on_error(error) -> None: # In production, replace this with audio from microphone or other live source # IMPORTANT: Audio must be 16kHz linear16 PCM for Listen V2 audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") - + with open(audio_path, "rb") as audio_file: print(f"Streaming audio from {audio_path}") - + while True: chunk = audio_file.read(CHUNK_SIZE) if not chunk: break - + connection.send_listen_v_2_media(chunk) - + print("Finished sending audio") # For async version: @@ -94,15 +90,15 @@ def on_error(error) -> None: # print(f"Turn {message.turn_index}: {message.transcript}") # # connection.on(EventType.MESSAGE, on_message) - # + # # # Start listening # listen_task = asyncio.create_task(connection.start_listening()) - # + # # # Stream audio # with open(audio_path, "rb") as audio_file: # while chunk := audio_file.read(CHUNK_SIZE): # await connection.send_listen_v_2_media(chunk) - # + # # await listen_task except Exception as e: From b026d609ea2012b4a21b287ca53b08f2f22e8eef Mon Sep 17 00:00:00 2001 From: Luke Oliff Date: Thu, 15 Jan 2026 11:53:40 +0000 Subject: [PATCH 3/8] fix(examples): update WebSocket method names to match SDK v6 API Update all WebSocket examples to use the correct method names: Listen V1/V2: - send_media() instead of send_listen_v_1_media() or send_listen_v_2_media() Speak V1: - send_text() instead of send_speak_v_1_text() - send_flush() instead of send_speak_v_1_flush() - send_close() instead of send_speak_v_1_close() Agent V1: - send_settings() instead of send_agent_v_1_settings() - send_media() instead of send_agent_v_1_media() Updated in examples: - 13-transcription-live-websocket.py - 14-transcription-live-websocket-v2.py - 21-text-to-speech-streaming.py - 30-voice-agent.py --- examples/13-transcription-live-websocket.py | 4 ++-- examples/14-transcription-live-websocket-v2.py | 4 ++-- examples/21-text-to-speech-streaming.py | 12 ++++++------ examples/30-voice-agent.py | 6 +++--- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/examples/13-transcription-live-websocket.py b/examples/13-transcription-live-websocket.py index ba0ec00a..d9119a82 100644 --- a/examples/13-transcription-live-websocket.py +++ b/examples/13-transcription-live-websocket.py @@ -71,7 +71,7 @@ def on_error(error) -> None: if not chunk: break - connection.send_listen_v_1_media(chunk) + connection.send_media(chunk) print("Finished sending audio") @@ -94,7 +94,7 @@ def on_error(error) -> None: # # Stream audio # with open(audio_path, "rb") as audio_file: # while chunk := audio_file.read(CHUNK_SIZE): - # await connection.send_listen_v_1_media(chunk) + # await connection.send_media(chunk) # # await listen_task diff --git a/examples/14-transcription-live-websocket-v2.py b/examples/14-transcription-live-websocket-v2.py index 617ebc77..801e3d67 100644 --- a/examples/14-transcription-live-websocket-v2.py +++ b/examples/14-transcription-live-websocket-v2.py @@ -73,7 +73,7 @@ def on_error(error) -> None: if not chunk: break - connection.send_listen_v_2_media(chunk) + connection.send_media(chunk) print("Finished sending audio") @@ -97,7 +97,7 @@ def on_error(error) -> None: # # Stream audio # with open(audio_path, "rb") as audio_file: # while chunk := audio_file.read(CHUNK_SIZE): - # await connection.send_listen_v_2_media(chunk) + # await connection.send_media(chunk) # # await listen_task diff --git a/examples/21-text-to-speech-streaming.py b/examples/21-text-to-speech-streaming.py index cb8f31f5..23d43638 100644 --- a/examples/21-text-to-speech-streaming.py +++ b/examples/21-text-to-speech-streaming.py @@ -40,15 +40,15 @@ def on_message(message: SpeakV1SocketClientResponse) -> None: # Note: start_listening() blocks, so send all messages first # For better control with bidirectional communication, use the async version text_message = SpeakV1Text(text="Hello, this is a text to speech example.") - connection.send_speak_v_1_text(text_message) + connection.send_text(text_message) # Flush to ensure all text is processed flush_message = SpeakV1Flush() - connection.send_speak_v_1_flush(flush_message) + connection.send_flush(flush_message) # Close the connection when done close_message = SpeakV1Close() - connection.send_speak_v_1_close(close_message) + connection.send_close(close_message) # Start listening - this blocks until the connection closes # All messages should be sent before calling this in sync mode @@ -58,9 +58,9 @@ def on_message(message: SpeakV1SocketClientResponse) -> None: # from deepgram import AsyncDeepgramClient # async with client.speak.v1.connect(...) as connection: # listen_task = asyncio.create_task(connection.start_listening()) - # await connection.send_speak_v_1_text(SpeakV1Text(text="...")) - # await connection.send_speak_v_1_flush(SpeakV1Flush()) - # await connection.send_speak_v_1_close(SpeakV1Close()) + # await connection.send_text(SpeakV1Text(text="...")) + # await connection.send_flush(SpeakV1Flush()) + # await connection.send_close(SpeakV1Close()) # await listen_task except Exception as e: diff --git a/examples/30-voice-agent.py b/examples/30-voice-agent.py index 82ed081e..35f40540 100644 --- a/examples/30-voice-agent.py +++ b/examples/30-voice-agent.py @@ -65,7 +65,7 @@ ) print("Sending agent settings...") - agent.send_agent_v_1_settings(settings) + agent.send_settings(settings) def on_message(message: AgentV1SocketClientResponse) -> None: if isinstance(message, bytes): @@ -84,7 +84,7 @@ def on_message(message: AgentV1SocketClientResponse) -> None: # In production, you would send audio from your microphone or audio source: # with open("audio.wav", "rb") as audio_file: # audio_data = audio_file.read() - # agent.send_agent_v_1_media(audio_data) + # agent.send_media(audio_data) agent.start_listening() @@ -92,7 +92,7 @@ def on_message(message: AgentV1SocketClientResponse) -> None: # from deepgram import AsyncDeepgramClient # async with client.agent.v1.connect() as agent: # # ... same configuration ... - # await agent.send_agent_v_1_settings(settings) + # await agent.send_settings(settings) # await agent.start_listening() except Exception as e: From 973d2b4f797157552736cddfb327dc3096c03893 Mon Sep 17 00:00:00 2001 From: Luke Oliff Date: Thu, 15 Jan 2026 21:50:04 +0000 Subject: [PATCH 4/8] refactor(examples): add real-time streaming simulation to websocket examples - Add chunk delay calculation to simulate microphone audio streaming - Refactor audio sending into background thread functions - Align v2 example chunking behavior with v1 example - Improve async examples with proper streaming delays --- examples/13-transcription-live-websocket.py | 70 +++++++++++++------ .../14-transcription-live-websocket-v2.py | 69 ++++++++++++------ 2 files changed, 98 insertions(+), 41 deletions(-) diff --git a/examples/13-transcription-live-websocket.py b/examples/13-transcription-live-websocket.py index d9119a82..5deaee98 100644 --- a/examples/13-transcription-live-websocket.py +++ b/examples/13-transcription-live-websocket.py @@ -8,6 +8,7 @@ import os import threading +import time from typing import Union from dotenv import load_dotenv @@ -17,6 +18,7 @@ from deepgram import DeepgramClient from deepgram.core.events import EventType from deepgram.listen.v1.types import ( + ListenV1Finalize, ListenV1Metadata, ListenV1Results, ListenV1SpeechStarted, @@ -25,8 +27,15 @@ ListenV1SocketClientResponse = Union[ListenV1Results, ListenV1Metadata, ListenV1UtteranceEnd, ListenV1SpeechStarted] -# Chunk size in bytes (e.g., 8KB chunks for efficient streaming) -CHUNK_SIZE = 8192 +# Audio streaming configuration +CHUNK_SIZE = 8192 # Bytes to send at a time +SAMPLE_RATE = 44100 # Hz (typical for WAV files) +SAMPLE_WIDTH = 2 # 16-bit audio = 2 bytes per sample +CHANNELS = 1 # Mono audio + +# Calculate delay between chunks to simulate real-time streaming +# This makes the audio stream at its natural playback rate +CHUNK_DELAY = CHUNK_SIZE / (SAMPLE_RATE * SAMPLE_WIDTH * CHANNELS) client = DeepgramClient() @@ -56,26 +65,36 @@ def on_error(error) -> None: connection.on(EventType.CLOSE, on_close) connection.on(EventType.ERROR, on_error) - # Start listening in a background thread - threading.Thread(target=connection.start_listening, daemon=True).start() + # Define a function to send audio in a background thread + def send_audio(): + audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") + + with open(audio_path, "rb") as audio_file: + print(f"Streaming audio from {audio_path}") + + while True: + chunk = audio_file.read(CHUNK_SIZE) + if not chunk: + break - # Stream audio file - # In production, replace this with audio from microphone or other live source - audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") + connection.send_media(chunk) - with open(audio_path, "rb") as audio_file: - print(f"Streaming audio from {audio_path}") + # Simulate real-time streaming by adding delay between chunks + time.sleep(CHUNK_DELAY) - while True: - chunk = audio_file.read(CHUNK_SIZE) - if not chunk: - break + print("Finished sending audio") - connection.send_media(chunk) + connection.send_finalize(ListenV1Finalize(type="Finalize")) - print("Finished sending audio") + # Start sending audio in a background thread + threading.Thread(target=send_audio, daemon=True).start() + + # Start listening - this blocks until the connection closes or times out + # The connection will stay open until the server closes it or it times out + connection.start_listening() # For async version: + # import asyncio # from deepgram import AsyncDeepgramClient # # async with client.listen.v1.connect(model="nova-3") as connection: @@ -88,14 +107,25 @@ def on_error(error) -> None: # # connection.on(EventType.MESSAGE, on_message) # - # # Start listening + # # Define coroutine to send audio + # async def send_audio(): + # audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") + # with open(audio_path, "rb") as audio_file: + # while chunk := audio_file.read(CHUNK_SIZE): + # await connection.send_media(chunk) + # # Simulate real-time streaming + # await asyncio.sleep(CHUNK_DELAY) + # print("Finished sending audio") + # await connection.send_finalize(ListenV1Finalize(type="Finalize")) + # + # # Start both tasks # listen_task = asyncio.create_task(connection.start_listening()) + # send_task = asyncio.create_task(send_audio()) # - # # Stream audio - # with open(audio_path, "rb") as audio_file: - # while chunk := audio_file.read(CHUNK_SIZE): - # await connection.send_media(chunk) + # # Wait for send to complete + # await send_task # + # # Continue listening until connection closes or times out # await listen_task except Exception as e: diff --git a/examples/14-transcription-live-websocket-v2.py b/examples/14-transcription-live-websocket-v2.py index 801e3d67..e84b11d3 100644 --- a/examples/14-transcription-live-websocket-v2.py +++ b/examples/14-transcription-live-websocket-v2.py @@ -11,6 +11,7 @@ import os import threading +import time from typing import Union from dotenv import load_dotenv @@ -27,8 +28,16 @@ ListenV2SocketClientResponse = Union[ListenV2Connected, ListenV2TurnInfo, ListenV2FatalError] -# Chunk size in bytes (e.g., 8KB chunks for efficient streaming) -CHUNK_SIZE = 8192 +# Audio streaming configuration +# IMPORTANT: Listen V2 requires 16kHz linear16 PCM audio +CHUNK_SIZE = 8192 # Bytes to send at a time +SAMPLE_RATE = 16000 # Hz (required for Listen V2) +SAMPLE_WIDTH = 2 # 16-bit audio = 2 bytes per sample +CHANNELS = 1 # Mono audio + +# Calculate delay between chunks to simulate real-time streaming +# This makes the audio stream at its natural playback rate +CHUNK_DELAY = CHUNK_SIZE / (SAMPLE_RATE * SAMPLE_WIDTH * CHANNELS) client = DeepgramClient() @@ -57,27 +66,35 @@ def on_error(error) -> None: connection.on(EventType.CLOSE, on_close) connection.on(EventType.ERROR, on_error) - # Start listening in a background thread - threading.Thread(target=connection.start_listening, daemon=True).start() + # Define a function to send audio in a background thread + def send_audio(): + # IMPORTANT: Audio must be 16kHz linear16 PCM for Listen V2 + audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") + + with open(audio_path, "rb") as audio_file: + print(f"Streaming audio from {audio_path}") + + while True: + chunk = audio_file.read(CHUNK_SIZE) + if not chunk: + break - # Stream audio file - # In production, replace this with audio from microphone or other live source - # IMPORTANT: Audio must be 16kHz linear16 PCM for Listen V2 - audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") + connection.send_media(chunk) - with open(audio_path, "rb") as audio_file: - print(f"Streaming audio from {audio_path}") + # Simulate real-time streaming by adding delay between chunks + time.sleep(CHUNK_DELAY) - while True: - chunk = audio_file.read(CHUNK_SIZE) - if not chunk: - break + print("Finished sending audio") - connection.send_media(chunk) + # Start sending audio in a background thread + threading.Thread(target=send_audio, daemon=True).start() - print("Finished sending audio") + # Start listening - this blocks until the connection closes or times out + # The connection will stay open until the server closes it or it times out + connection.start_listening() # For async version: + # import asyncio # from deepgram import AsyncDeepgramClient # # async with client.listen.v2.connect( @@ -91,14 +108,24 @@ def on_error(error) -> None: # # connection.on(EventType.MESSAGE, on_message) # - # # Start listening + # # Define coroutine to send audio + # async def send_audio(): + # audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") + # with open(audio_path, "rb") as audio_file: + # while chunk := audio_file.read(CHUNK_SIZE): + # await connection.send_media(chunk) + # # Simulate real-time streaming + # await asyncio.sleep(CHUNK_DELAY) + # print("Finished sending audio") + # + # # Start both tasks # listen_task = asyncio.create_task(connection.start_listening()) + # send_task = asyncio.create_task(send_audio()) # - # # Stream audio - # with open(audio_path, "rb") as audio_file: - # while chunk := audio_file.read(CHUNK_SIZE): - # await connection.send_media(chunk) + # # Wait for send to complete + # await send_task # + # # Continue listening until connection closes or times out # await listen_task except Exception as e: From 6c90292c2b356c91b3b789c84851e0409ad715c6 Mon Sep 17 00:00:00 2001 From: Luke Oliff Date: Tue, 20 Jan 2026 15:04:43 +0000 Subject: [PATCH 5/8] feat(examples): add voice agent connection and interaction examples MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add minimal connection example (30-voice-agent-connection.py) - Add comprehensive interaction example (31-voice-agent-interaction.py) - Follow proper order: connect → register listeners → wait for Welcome → send settings → wait for SettingsApplied → send user message - Use valid Anthropic model (claude-sonnet-4-20250514) - Save binary audio data to output file similar to TTS examples --- examples/30-voice-agent-connection.py | 29 ++++ examples/30-voice-agent.py | 99 -------------- examples/31-voice-agent-interaction.py | 180 +++++++++++++++++++++++++ 3 files changed, 209 insertions(+), 99 deletions(-) create mode 100644 examples/30-voice-agent-connection.py delete mode 100644 examples/30-voice-agent.py create mode 100644 examples/31-voice-agent-interaction.py diff --git a/examples/30-voice-agent-connection.py b/examples/30-voice-agent-connection.py new file mode 100644 index 00000000..b8c0be49 --- /dev/null +++ b/examples/30-voice-agent-connection.py @@ -0,0 +1,29 @@ +""" +Example: Voice Agent (Agent V1) - Basic Connection + +This example shows how to connect to the Deepgram Voice Agent WebSocket endpoint. +Connects to agent.deepgram.com/v1/agent/converse +""" + +from dotenv import load_dotenv + +load_dotenv() + +from deepgram import DeepgramClient + +client = DeepgramClient() + +try: + # Connect to agent.deepgram.com/v1/agent/converse WebSocket endpoint + with client.agent.v1.connect() as agent: + print("Connected to Voice Agent WebSocket") + print("Connection ready") + + # Connection will remain open until context manager exits + # In a real application, you would register event handlers and start listening here + +except Exception as e: + import traceback + + print(f"Error: {e}") + traceback.print_exc() diff --git a/examples/30-voice-agent.py b/examples/30-voice-agent.py deleted file mode 100644 index 35f40540..00000000 --- a/examples/30-voice-agent.py +++ /dev/null @@ -1,99 +0,0 @@ -""" -Example: Voice Agent (Agent V1) - -This example shows how to set up a voice agent that can listen, think, and speak. -""" - -from typing import Union - -from dotenv import load_dotenv - -load_dotenv() - -from deepgram import DeepgramClient -from deepgram.agent.v1.types import ( - AgentV1Agent, - AgentV1AudioConfig, - AgentV1AudioInput, - AgentV1DeepgramSpeakProvider, - AgentV1Listen, - AgentV1ListenProvider, - AgentV1OpenAiThinkProvider, - AgentV1Settings, - AgentV1SpeakProviderConfig, - AgentV1Think, -) -from deepgram.core.events import EventType - -AgentV1SocketClientResponse = Union[str, bytes] - -client = DeepgramClient() - -try: - with client.agent.v1.connect() as agent: - # Configure the agent settings - settings = AgentV1Settings( - audio=AgentV1AudioConfig( - input=AgentV1AudioInput( - encoding="linear16", - sample_rate=44100, - ) - ), - agent=AgentV1Agent( - listen=AgentV1Listen( - provider=AgentV1ListenProvider( - type="deepgram", - model="nova-3", - smart_format=True, - ) - ), - think=AgentV1Think( - provider=AgentV1OpenAiThinkProvider( - type="open_ai", - model="gpt-4o-mini", - temperature=0.7, - ), - prompt="You are a helpful AI assistant.", - ), - speak=AgentV1SpeakProviderConfig( - provider=AgentV1DeepgramSpeakProvider( - type="deepgram", - model="aura-2-asteria-en", - ) - ), - ), - ) - - print("Sending agent settings...") - agent.send_settings(settings) - - def on_message(message: AgentV1SocketClientResponse) -> None: - if isinstance(message, bytes): - print("Received audio data") - # In production, you would play this audio or write it to a file - else: - msg_type = getattr(message, "type", "Unknown") - print(f"Received {msg_type} event") - - agent.on(EventType.OPEN, lambda _: print("Connection opened")) - agent.on(EventType.MESSAGE, on_message) - agent.on(EventType.CLOSE, lambda _: print("Connection closed")) - agent.on(EventType.ERROR, lambda error: print(f"Error: {error}")) - - # Start listening - this blocks until the connection closes - # In production, you would send audio from your microphone or audio source: - # with open("audio.wav", "rb") as audio_file: - # audio_data = audio_file.read() - # agent.send_media(audio_data) - - agent.start_listening() - - # For async version: - # from deepgram import AsyncDeepgramClient - # async with client.agent.v1.connect() as agent: - # # ... same configuration ... - # await agent.send_settings(settings) - # await agent.start_listening() - -except Exception as e: - print(f"Error: {e}") diff --git a/examples/31-voice-agent-interaction.py b/examples/31-voice-agent-interaction.py new file mode 100644 index 00000000..f9ad8a01 --- /dev/null +++ b/examples/31-voice-agent-interaction.py @@ -0,0 +1,180 @@ +""" +Example: Voice Agent (Agent V1) + +This example shows how to set up a voice agent that can listen, think, and speak. +This example simulates a two-way conversation by sending user messages and receiving +agent responses with audio output. +""" + +import threading +from pathlib import Path +from typing import Union + +from dotenv import load_dotenv + +load_dotenv() + +from deepgram import DeepgramClient +from deepgram.agent.v1.types import ( + AgentV1InjectUserMessage, + AgentV1Settings, + AgentV1SettingsAgent, + AgentV1SettingsAgentListen, + AgentV1SettingsAgentListenProvider, + AgentV1SettingsAgentSpeakOneItem, + AgentV1SettingsAgentSpeakOneItemProvider_Deepgram, + AgentV1SettingsAgentThink, + AgentV1SettingsAgentThinkProvider_Anthropic, + AgentV1SettingsAudio, +) +from deepgram.core.events import EventType + +AgentV1SocketClientResponse = Union[str, bytes] + +client = DeepgramClient() + +# Create output file path for audio data +output_path = Path("output.raw").resolve() + +try: + with open(output_path, "wb") as audio_file: + with client.agent.v1.connect() as agent: + # Step 1: Connect (already done by entering context manager) + + # Step 2: Register all listeners first + welcome_received = threading.Event() + settings_applied = threading.Event() + + def on_message(message: AgentV1SocketClientResponse) -> None: + if isinstance(message, bytes): + # Step 7: Save binary message data to output file + print("Received audio data") + audio_file.write(message) + else: + msg_type = getattr(message, "type", "Unknown") + print(f"Received {msg_type} event") + # Print event details for debugging + if hasattr(message, "__dict__"): + print(f" Event details: {message.__dict__}") + + # Step 3: Wait for Welcome event + if msg_type == "Welcome": + print("Welcome event received") + welcome_received.set() + + # Step 5: Wait for SettingsApplied event + if msg_type == "SettingsApplied": + print("Settings applied event received") + settings_applied.set() + + def on_open(_) -> None: + print("Connection opened") + + def on_close(_) -> None: + print("Connection closed") + + def on_error(error) -> None: + print(f"Error: {error}") + + # Register event handlers + agent.on(EventType.OPEN, on_open) + agent.on(EventType.MESSAGE, on_message) + agent.on(EventType.CLOSE, on_close) + agent.on(EventType.ERROR, on_error) + + # Start listening in a background thread so we can wait for events + def listen(): + agent.start_listening() + + listen_thread = threading.Thread(target=listen, daemon=True) + listen_thread.start() + + # Step 3: Wait for Welcome event + print("Waiting for Welcome event...") + welcome_received.wait(timeout=10) + if not welcome_received.is_set(): + raise TimeoutError("Did not receive Welcome event") + + # Step 4: Send settings + print("Sending agent settings...") + settings = AgentV1Settings( + # Audio input defaults to encoding=linear16 and sample_rate=24000 if omitted + audio=AgentV1SettingsAudio(), + agent=AgentV1SettingsAgent( + listen=AgentV1SettingsAgentListen( + provider={ + "version": "v1", + "type": "deepgram", + "model": "nova-3", + "smart_format": True, + } + ), + think=AgentV1SettingsAgentThink( + provider=AgentV1SettingsAgentThinkProvider_Anthropic( + type="anthropic", + model="claude-sonnet-4-20250514", + temperature=0.7, + ), + prompt="You are a helpful AI assistant.", + ), + speak=[ + AgentV1SettingsAgentSpeakOneItem( + provider=AgentV1SettingsAgentSpeakOneItemProvider_Deepgram( + type="deepgram", + model="aura-2-asteria-en", + ) + ) + ], + ), + ) + agent.send_settings(settings) + + # Step 5: Wait for SettingsApplied event + print("Waiting for SettingsApplied event...") + settings_applied.wait(timeout=10) + if not settings_applied.is_set(): + raise TimeoutError("Did not receive SettingsApplied event") + + # --- Now ready to be an agent --- + print("Agent is ready!") + + # Step 6: Insert user message + print("Sending user message...") + user_message = AgentV1InjectUserMessage(content="Hello! Can you tell me a fun fact about space?") + agent.send_inject_user_message(user_message) + + # Wait for the connection to close (or timeout after reasonable time) + listen_thread.join(timeout=30) + + print(f"Audio saved to {output_path}") + + # For async version: + # import asyncio + # from deepgram import AsyncDeepgramClient + # async with client.agent.v1.connect() as agent: + # # Register handlers + # welcome_received = asyncio.Event() + # settings_applied = asyncio.Event() + # + # async def on_message(message): + # if isinstance(message, bytes): + # audio_file.write(message) + # elif getattr(message, "type", None) == "Welcome": + # welcome_received.set() + # elif getattr(message, "type", None) == "SettingsApplied": + # settings_applied.set() + # + # agent.on(EventType.MESSAGE, on_message) + # + # listen_task = asyncio.create_task(agent.start_listening()) + # await welcome_received.wait() + # await agent.send_settings(settings) + # await settings_applied.wait() + # await agent.send_inject_user_message(user_message) + # await listen_task + +except Exception as e: + import traceback + + print(f"Error: {e}") + traceback.print_exc() From e562da75d211619e22ec72a34dfd2970092d76c5 Mon Sep 17 00:00:00 2001 From: Luke Oliff Date: Tue, 20 Jan 2026 15:05:22 +0000 Subject: [PATCH 6/8] refactor(examples): improve TTS examples consistency - Use Path.resolve() for output paths in TTS examples - Add minor formatting improvements and comments --- examples/20-text-to-speech-single.py | 12 ++- examples/21-text-to-speech-streaming.py | 106 +++++++++++++++--------- 2 files changed, 78 insertions(+), 40 deletions(-) diff --git a/examples/20-text-to-speech-single.py b/examples/20-text-to-speech-single.py index 43a458ea..33a8881d 100644 --- a/examples/20-text-to-speech-single.py +++ b/examples/20-text-to-speech-single.py @@ -6,6 +6,8 @@ as they arrive from the API, allowing you to process audio incrementally. """ +from pathlib import Path + from dotenv import load_dotenv load_dotenv() @@ -26,7 +28,7 @@ ) # Process chunks as they arrive (streaming) - output_path = "output.mp3" + output_path = Path("output.mp3").resolve() chunk_count = 0 with open(output_path, "wb") as audio_file: for chunk in audio_chunks: @@ -39,9 +41,15 @@ # For async version: # from deepgram import AsyncDeepgramClient # client = AsyncDeepgramClient() - # async for chunk in await client.speak.v1.audio.generate(...): + # async for chunk in await client.speak.v1.audio.generate( + # text="Hello, this is a sample text to speech conversion.", + # model="aura-2-asteria-en", + # ): # # Process chunks as they arrive # audio_file.write(chunk) + # With access token: + # client = DeepgramClient(access_token="your-access-token") + except Exception as e: print(f"Error: {e}") diff --git a/examples/21-text-to-speech-streaming.py b/examples/21-text-to-speech-streaming.py index 23d43638..0aa60ec8 100644 --- a/examples/21-text-to-speech-streaming.py +++ b/examples/21-text-to-speech-streaming.py @@ -2,8 +2,10 @@ Example: Text-to-Speech Streaming with WebSocket This example shows how to stream text-to-speech conversion using WebSocket. +In production, you would send text dynamically as it becomes available. """ +from pathlib import Path from typing import Union from dotenv import load_dotenv @@ -18,50 +20,78 @@ client = DeepgramClient() +# Create output file path +output_path = Path("output.raw").resolve() + try: - with client.speak.v1.connect(model="aura-2-asteria-en", encoding="linear16", sample_rate=24000) as connection: - - def on_message(message: SpeakV1SocketClientResponse) -> None: - if isinstance(message, bytes): - print("Received audio data") - # In production, you would write this audio data to a file or play it - # with open("output.raw", "ab") as audio_file: - # audio_file.write(message) - else: - msg_type = getattr(message, "type", "Unknown") - print(f"Received {msg_type} event") - - connection.on(EventType.OPEN, lambda _: print("Connection opened")) - connection.on(EventType.MESSAGE, on_message) - connection.on(EventType.CLOSE, lambda _: print("Connection closed")) - connection.on(EventType.ERROR, lambda error: print(f"Error: {error}")) - - # For sync version: Send messages before starting to listen - # Note: start_listening() blocks, so send all messages first - # For better control with bidirectional communication, use the async version - text_message = SpeakV1Text(text="Hello, this is a text to speech example.") - connection.send_text(text_message) - - # Flush to ensure all text is processed - flush_message = SpeakV1Flush() - connection.send_flush(flush_message) - - # Close the connection when done - close_message = SpeakV1Close() - connection.send_close(close_message) - - # Start listening - this blocks until the connection closes - # All messages should be sent before calling this in sync mode - connection.start_listening() + with open(output_path, "wb") as audio_file: + with client.speak.v1.connect(model="aura-2-asteria-en", encoding="linear16", sample_rate=24000) as connection: + + def on_message(message: SpeakV1SocketClientResponse) -> None: + if isinstance(message, bytes): + print("Received audio data") + audio_file.write(message) + else: + msg_type = getattr(message, "type", "Unknown") + print(f"Received {msg_type} event") + + def on_open(_) -> None: + print("Connection opened") + + def on_close(_) -> None: + print("Connection closed") + + def on_error(error) -> None: + print(f"Error: {error}") + + # Register event handlers + connection.on(EventType.OPEN, on_open) + connection.on(EventType.MESSAGE, on_message) + connection.on(EventType.CLOSE, on_close) + connection.on(EventType.ERROR, on_error) + + # For sync version: Send messages before starting to listen + # Note: start_listening() blocks, so send all messages first + # For better control with bidirectional communication, use the async version + text_message = SpeakV1Text(text="Hello, this is a text to speech example.") + connection.send_text(text_message) + + # Flush to ensure all text is processed + flush_message = SpeakV1Flush(type="Flush") + connection.send_flush(flush_message) + + # Close the connection when done + close_message = SpeakV1Close(type="Close") + connection.send_close(close_message) + + # Start listening - this blocks until the connection closes + # All messages should be sent before calling this in sync mode + connection.start_listening() + + print(f"Audio saved to {output_path}") # For async version: + # import asyncio + # from pathlib import Path # from deepgram import AsyncDeepgramClient - # async with client.speak.v1.connect(...) as connection: + # output_path = Path("output.raw").resolve() + # async with client.speak.v1.connect(model="aura-2-asteria-en", encoding="linear16", sample_rate=24000) as connection: + # async def on_message(message): + # if isinstance(message, bytes): + # with open(output_path, "ab") as audio_file: + # audio_file.write(message) + # else: + # msg_type = getattr(message, "type", "Unknown") + # print(f"Received {msg_type} event") + # + # connection.on(EventType.MESSAGE, on_message) + # # listen_task = asyncio.create_task(connection.start_listening()) - # await connection.send_text(SpeakV1Text(text="...")) - # await connection.send_flush(SpeakV1Flush()) - # await connection.send_close(SpeakV1Close()) + # await connection.send_text(SpeakV1Text(text="Hello, this is a text to speech example.")) + # await connection.send_flush(SpeakV1Flush(type="Flush")) + # await connection.send_close(SpeakV1Close(type="Close")) # await listen_task + # print(f"Audio saved to {output_path}") except Exception as e: print(f"Error: {e}") From 006dd77a779cdc15f610c2fc7eccb162ff15d72e Mon Sep 17 00:00:00 2001 From: Luke Oliff Date: Tue, 20 Jan 2026 15:09:20 +0000 Subject: [PATCH 7/8] fix(examples): update text intelligence example to use correct response structure - Access results from response.results object instead of direct attributes - Properly access sentiments, summary, topics, and intents from nested structure - Add example output for segments and improve formatting --- examples/40-text-intelligence.py | 54 +++++++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/examples/40-text-intelligence.py b/examples/40-text-intelligence.py index 18d1807b..7a0698a5 100644 --- a/examples/40-text-intelligence.py +++ b/examples/40-text-intelligence.py @@ -24,18 +24,56 @@ ) print("Analysis received:") - print(f"Sentiment: {response.sentiment}") - if response.summary: - print(f"Summary: {response.summary}") - if response.topics: - print(f"Topics: {response.topics}") - if response.intents: - print(f"Intents: {response.intents}") + + # Access results from the response.results object + if response.results.sentiments: + if response.results.sentiments.average: + print(f"Average Sentiment: {response.results.sentiments.average.sentiment} (score: {response.results.sentiments.average.sentiment_score})") + if response.results.sentiments.segments: + print(f"Sentiment segments: {len(response.results.sentiments.segments)} found") + + if response.results.summary: + if response.results.summary.results and response.results.summary.results.summary: + print(f"Summary: {response.results.summary.results.summary.summary}") + + if response.results.topics: + if response.results.topics.results and response.results.topics.results.topics: + segments = response.results.topics.results.topics.segments + if segments: + print(f"Topics found: {len(segments)} segments") + # Access topics from segments + for segment in segments[:3]: # Show first 3 segments + if segment.topics: + topic_names = [topic.topic for topic in segment.topics if hasattr(topic, 'topic')] + if topic_names: + print(f" - {', '.join(topic_names)}") + + if response.results.intents: + if response.results.intents.results and response.results.intents.results.intents: + segments = response.results.intents.results.intents.segments + if segments: + print(f"Intents found: {len(segments)} segments") + # Access intents from segments + for segment in segments[:3]: # Show first 3 segments + if segment.intents: + intent_names = [intent.intent for intent in segment.intents if hasattr(intent, 'intent')] + if intent_names: + print(f" - {', '.join(intent_names)}") # For async version: # from deepgram import AsyncDeepgramClient # client = AsyncDeepgramClient() - # response = await client.read.v1.text.analyze(...) + # response = await client.read.v1.text.analyze( + # request={"text": "Hello, world! This is a sample text for analysis."}, + # language="en", + # sentiment=True, + # summarize=True, + # topics=True, + # intents=True, + # ) + + # With access token: + # client = DeepgramClient(access_token="your-access-token") except Exception as e: print(f"Error: {e}") From 1c49957e77fca0cfa0a60ea999228184bd5c57a1 Mon Sep 17 00:00:00 2001 From: Luke Oliff Date: Thu, 22 Jan 2026 09:46:50 +0000 Subject: [PATCH 8/8] feat(examples): add WebSocket connection options example Demonstrates how to pass custom request options (headers) when connecting to Deepgram WebSocket API. --- examples/72-ws-connect-options.py | 120 ++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 examples/72-ws-connect-options.py diff --git a/examples/72-ws-connect-options.py b/examples/72-ws-connect-options.py new file mode 100644 index 00000000..39ce407e --- /dev/null +++ b/examples/72-ws-connect-options.py @@ -0,0 +1,120 @@ +""" +Example: WebSocket Request Options - Additional Headers and Query Parameters + +This example shows how to use request_options with WebSocket connections +to add additional headers and query parameters to the connection request. +""" + +import os +import threading +import time + +from dotenv import load_dotenv + +load_dotenv() + +from deepgram import DeepgramClient +from deepgram.core.events import EventType +from deepgram.listen.v1.types import ListenV1Results + +# Audio streaming configuration +CHUNK_SIZE = 8192 +SAMPLE_RATE = 44100 +SAMPLE_WIDTH = 2 +CHANNELS = 1 +CHUNK_DELAY = CHUNK_SIZE / (SAMPLE_RATE * SAMPLE_WIDTH * CHANNELS) + +client = DeepgramClient() + +try: + print("Connecting to Deepgram WebSocket with custom request options...") + + # Connect with additional headers and query parameters + with client.listen.v1.connect( + model="nova-3", + language="en", + smart_format=True, + request_options={ + "additional_headers": { + "X-Custom-Header": "custom-value", + "X-Request-ID": "example-request-123", + }, + # Note: additional_query_parameters are currently not working + # for WebSocket connections, but the structure is shown here + # for future compatibility + "additional_query_parameters": { + "detect_language": ["en", "es"], + }, + }, + ) as connection: + + print("Connected successfully with custom headers!") + + # Register event handlers + def on_open(_): + print("Connection opened") + + def on_message(message): + if isinstance(message, ListenV1Results): + if message.channel and message.channel.alternatives: + transcript = message.channel.alternatives[0].transcript + if transcript: + print(f"Transcript: {transcript}") + + def on_error(error): + print(f"Error: {error}") + + def on_close(_): + print("Connection closed") + + connection.on(EventType.OPEN, on_open) + connection.on(EventType.MESSAGE, on_message) + connection.on(EventType.ERROR, on_error) + connection.on(EventType.CLOSE, on_close) + + # Define a function to send audio in a background thread + def send_audio(): + audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav") + + with open(audio_path, "rb") as audio_file: + print(f"Streaming audio from {audio_path}") + + while True: + chunk = audio_file.read(CHUNK_SIZE) + if not chunk: + break + + connection.send_media(chunk) + time.sleep(CHUNK_DELAY) + + print("Finished sending audio") + + # Start sending audio in a background thread + threading.Thread(target=send_audio, daemon=True).start() + + # Start listening - this blocks until the connection closes or times out + connection.start_listening() + + # Additional request_options that can be used: + # with client.listen.v1.connect( + # model="nova-3", + # language="en", + # smart_format=True, + # request_options={ + # "additional_headers": { + # "X-Custom-Header": "custom-value", + # "X-Request-ID": "example-request-123", + # "X-Client-Version": "1.0.0", + # }, + # "additional_query_parameters": { + # "detect_language": ["en", "es"], + # # Note: Additional query parameters for WebSocket are + # # currently not working, but may be supported in the future + # }, + # "timeout_in_seconds": 30, + # } + # ) as connection: + # # ... register handlers and start listening + +except Exception as e: + print(f"Error: {e}")