diff --git a/examples/350-asterisk-freeswitch-deepgram-stt-python/.env.example b/examples/350-asterisk-freeswitch-deepgram-stt-python/.env.example new file mode 100644 index 0000000..99314a3 --- /dev/null +++ b/examples/350-asterisk-freeswitch-deepgram-stt-python/.env.example @@ -0,0 +1,2 @@ +# Deepgram — https://console.deepgram.com/ +DEEPGRAM_API_KEY= diff --git a/examples/350-asterisk-freeswitch-deepgram-stt-python/README.md b/examples/350-asterisk-freeswitch-deepgram-stt-python/README.md new file mode 100644 index 0000000..8512450 --- /dev/null +++ b/examples/350-asterisk-freeswitch-deepgram-stt-python/README.md @@ -0,0 +1,113 @@ +# Asterisk / FreeSWITCH PBX to Deepgram Streaming STT + +Bridge real-time phone call audio from Asterisk or FreeSWITCH into Deepgram's live speech-to-text API. This example shows how to capture RTP/PCM audio from a PBX and stream it over a WebSocket to Deepgram for real-time transcription. + +## What you'll build + +A Python WebSocket server that acts as a bridge between your PBX and Deepgram. Incoming calls on Asterisk (via AudioSocket) or FreeSWITCH (via mod_audio_stream) send their audio to this server, which forwards it to Deepgram's streaming STT API and prints live transcripts to the console. + +## Prerequisites + +- Python 3.11+ +- Deepgram account — [get a free API key](https://console.deepgram.com/) +- Asterisk 16+ with `app_audiosocket` module, **or** FreeSWITCH with `mod_audio_stream` + +## Environment variables + +| Variable | Where to find it | +|----------|-----------------| +| `DEEPGRAM_API_KEY` | [Deepgram console](https://console.deepgram.com/) | + +Copy `.env.example` to `.env` and fill in your values. + +## Install and run + +```bash +cd examples/260-asterisk-freeswitch-deepgram-stt-python + +pip install -r requirements.txt + +cp .env.example .env +# Edit .env and add your DEEPGRAM_API_KEY + +python src/bridge.py +``` + +The bridge listens on `ws://0.0.0.0:8765` by default. Use `--port` to change it. + +### Asterisk dialplan configuration + +Add to your Asterisk dialplan (`extensions.conf`) to route call audio to the bridge: + +```ini +[transcribe] +exten => _X.,1,Answer() + same => n,AudioSocket(ws://bridge-host:8765/asterisk) + same => n,Hangup() +``` + +Asterisk AudioSocket sends signed-linear 16-bit PCM at 8 kHz mono by default. The bridge parses AudioSocket's TLV framing (type-length-value) to extract audio frames. + +### FreeSWITCH dialplan configuration + +Add to your FreeSWITCH dialplan to stream call audio to the bridge: + +```xml + + +``` + +FreeSWITCH `mod_audio_stream` sends raw PCM frames directly — no framing protocol, just binary audio on the WebSocket. + +## Key parameters + +| Parameter | Value | Description | +|-----------|-------|-------------| +| `model` | `nova-3-phonecall` | Deepgram model optimised for telephony audio (8/16 kHz) | +| `encoding` | `linear16` | Signed 16-bit little-endian PCM — the native format of both PBX platforms | +| `sample_rate` | `8000` / `16000` | 8 kHz for Asterisk default, 16 kHz for FreeSWITCH (higher = better accuracy) | +| `smart_format` | `True` | Adds punctuation, capitalisation, and number formatting | +| `interim_results` | `True` | Returns partial transcripts while the caller is still speaking | +| `utterance_end_ms` | `1000` | Fires an utterance-end event after 1 second of silence | + +## How it works + +1. **PBX receives a call** — Asterisk or FreeSWITCH answers and is configured to stream audio to this bridge +2. **Audio reaches the bridge** — Asterisk sends AudioSocket TLV frames to `/asterisk`; FreeSWITCH sends raw PCM to `/freeswitch` +3. **Bridge opens a Deepgram connection** — using the Python SDK's `client.listen.v1.connect()` with telephony-optimised settings +4. **Audio is forwarded** — each PCM chunk is sent to Deepgram via `connection.send_media()` +5. **Transcripts arrive** — Deepgram fires `EventType.MESSAGE` callbacks with interim and final transcripts, which the bridge logs to the console +6. **Call ends** — the PBX closes the WebSocket; the bridge sends `close_stream` to Deepgram + +## Architecture + +``` +Phone Call + | + | RTP audio + v +Asterisk / FreeSWITCH PBX + | + | WebSocket (AudioSocket TLV or raw PCM) + v +bridge.py (this server) + | + | Deepgram Python SDK (WebSocket) + v +Deepgram Live STT (nova-3-phonecall) + | + | transcript events + v +Console output (or your application) +``` + +## Related + +- [Deepgram FreeSWITCH integration](https://developers.deepgram.com/docs/freeswitch) +- [Deepgram Live STT docs](https://developers.deepgram.com/docs/getting-started-with-live-streaming-audio) +- [Asterisk AudioSocket](https://docs.asterisk.org/Asterisk_16_Documentation/API_Documentation/Dialplan_Applications/AudioSocket/) +- [FreeSWITCH mod_audio_stream](https://developer.signalwire.com/freeswitch/FreeSWITCH-Explained/Modules/mod_audio_stream/) + +## Starter templates + +If you want a ready-to-run base for your own project, check the [deepgram-starters](https://github.com/orgs/deepgram-starters/repositories) org — there are starter repos for every language and every Deepgram product. diff --git a/examples/350-asterisk-freeswitch-deepgram-stt-python/requirements.txt b/examples/350-asterisk-freeswitch-deepgram-stt-python/requirements.txt new file mode 100644 index 0000000..2217528 --- /dev/null +++ b/examples/350-asterisk-freeswitch-deepgram-stt-python/requirements.txt @@ -0,0 +1,3 @@ +deepgram-sdk==6.1.1 +websockets>=15.0 +python-dotenv>=1.0.0 diff --git a/examples/350-asterisk-freeswitch-deepgram-stt-python/src/__init__.py b/examples/350-asterisk-freeswitch-deepgram-stt-python/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/350-asterisk-freeswitch-deepgram-stt-python/src/bridge.py b/examples/350-asterisk-freeswitch-deepgram-stt-python/src/bridge.py new file mode 100644 index 0000000..a451a45 --- /dev/null +++ b/examples/350-asterisk-freeswitch-deepgram-stt-python/src/bridge.py @@ -0,0 +1,247 @@ +"""WebSocket bridge: PBX audio (Asterisk / FreeSWITCH) -> Deepgram Live STT. + +Asterisk (via ARI external media or AudioSocket) and FreeSWITCH +(via mod_audio_stream) can both send real-time call audio to a WebSocket +endpoint. This server accepts that audio and forwards it to Deepgram's +streaming STT API, printing live transcripts to the console. + +Usage: + python src/bridge.py # default: ws://0.0.0.0:8765 + python src/bridge.py --port 9000 # custom port + +Asterisk dialplan (AudioSocket): + exten => _X.,1,Answer() + same => n,AudioSocket(ws://bridge-host:8765/asterisk) + +FreeSWITCH dialplan (mod_audio_stream): + +""" + +import argparse +import asyncio +import json +import logging +import os +import struct +import sys + +from dotenv import load_dotenv + +load_dotenv() + +import websockets +import websockets.asyncio.server +from deepgram import AsyncDeepgramClient +from deepgram.core.events import EventType +from deepgram.listen.v1.types import ListenV1Results + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +log = logging.getLogger(__name__) + +# Asterisk AudioSocket protocol constants — AudioSocket wraps audio in +# simple TLV (type-length-value) frames so we can distinguish audio data +# from control messages (hang-up, UUID) on the same TCP/WS connection. +AUDIOSOCKET_TYPE_UUID = 0x01 +AUDIOSOCKET_TYPE_AUDIO = 0x10 +AUDIOSOCKET_TYPE_HANGUP = 0x00 +AUDIOSOCKET_HEADER_SIZE = 3 # 1 byte type + 2 bytes length (big-endian) + + +def parse_audiosocket_frame(data: bytes) -> tuple[int, bytes]: + """Parse an Asterisk AudioSocket frame into (type, payload). + + AudioSocket frames are: [1 byte type][2 bytes big-endian length][payload]. + Returns (frame_type, payload_bytes). Raises ValueError on malformed frames. + """ + if len(data) < AUDIOSOCKET_HEADER_SIZE: + raise ValueError(f"Frame too short: {len(data)} bytes") + frame_type = data[0] + payload_len = struct.unpack(">H", data[1:3])[0] + payload = data[3 : 3 + payload_len] + return frame_type, payload + + +async def handle_deepgram_events(dg_connection, call_id: str) -> None: + """Register event handlers for Deepgram transcript and error events.""" + + async def on_message(message) -> None: + if isinstance(message, ListenV1Results): + transcript = message.channel.alternatives[0].transcript + if transcript.strip(): + tag = "final" if message.is_final else "interim" + log.info("[%s] [%s] %s", call_id, tag, transcript) + + async def on_error(error) -> None: + log.error("[%s] Deepgram error: %s", call_id, error) + + dg_connection.on(EventType.MESSAGE, on_message) + dg_connection.on(EventType.ERROR, on_error) + + +async def open_deepgram_connection( + encoding: str = "linear16", + sample_rate: int = 8000, + channels: int = 1, +): + """Open a Deepgram live STT connection with the given audio parameters. + + Returns (dg_connection, listener_task). Caller must cancel the task and + call send_close_stream() when done. + """ + client = AsyncDeepgramClient(api_key=os.environ["DEEPGRAM_API_KEY"]) + # ← connect() opens a persistent WebSocket to Deepgram's STT API + dg_connection = await client.listen.v1.connect( + model="nova-3-phonecall", # ← optimised for telephony audio (8/16 kHz) + encoding=encoding, + sample_rate=sample_rate, + channels=channels, + smart_format=True, + interim_results=True, + utterance_end_ms=1000, + tag="deepgram-examples", # ← REQUIRED: tags traffic in the Deepgram console + ) + listener_task = asyncio.create_task(dg_connection.start_listening()) + return dg_connection, listener_task + + +async def handle_asterisk(websocket) -> None: + """Handle an Asterisk AudioSocket connection. + + Asterisk AudioSocket sends TLV-framed messages: a UUID frame at + connection start, then audio frames (signed-linear 16-bit, 8 kHz mono + by default), and a hangup frame when the call ends. + """ + call_id = "asterisk-unknown" + dg_connection = None + listener_task = None + + try: + # Asterisk AudioSocket default: signed linear 16-bit, 8 kHz, mono. + # If your Asterisk is configured for 16 kHz (codec_slin16), change + # sample_rate to 16000 for better accuracy. + dg_connection, listener_task = await open_deepgram_connection( + encoding="linear16", sample_rate=8000, channels=1 + ) + await handle_deepgram_events(dg_connection, call_id) + log.info("[%s] Deepgram connection opened", call_id) + + async for raw in websocket: + if isinstance(raw, str): + continue + + frame_type, payload = parse_audiosocket_frame(raw) + + if frame_type == AUDIOSOCKET_TYPE_UUID: + call_id = payload.decode("utf-8", errors="replace").strip("\x00") + log.info("[%s] Asterisk call connected", call_id) + + elif frame_type == AUDIOSOCKET_TYPE_AUDIO: + if payload: + await dg_connection.send_media(payload) + + elif frame_type == AUDIOSOCKET_TYPE_HANGUP: + log.info("[%s] Asterisk call hung up", call_id) + break + + except websockets.exceptions.ConnectionClosed: + log.info("[%s] Asterisk WebSocket closed", call_id) + finally: + if dg_connection: + try: + await dg_connection.send_close_stream() + except Exception: + pass + if listener_task: + listener_task.cancel() + + +async def handle_freeswitch(websocket) -> None: + """Handle a FreeSWITCH mod_audio_stream connection. + + FreeSWITCH mod_audio_stream sends raw PCM audio as binary WebSocket + frames — no framing protocol, no JSON metadata. The audio format is + set in the dialplan action (typically L16 at 16 kHz mono). + """ + call_id = "freeswitch-unknown" + dg_connection = None + listener_task = None + + try: + # FreeSWITCH mod_audio_stream default when configured with "16000 mono L16" + dg_connection, listener_task = await open_deepgram_connection( + encoding="linear16", sample_rate=16000, channels=1 + ) + await handle_deepgram_events(dg_connection, call_id) + log.info("[%s] Deepgram connection opened", call_id) + + async for raw in websocket: + if isinstance(raw, str): + # mod_audio_stream may send a JSON metadata frame at start + try: + meta = json.loads(raw) + call_id = meta.get("uuid", call_id) + log.info("[%s] FreeSWITCH stream metadata: %s", call_id, meta) + except json.JSONDecodeError: + pass + continue + + if raw: + await dg_connection.send_media(raw) + + except websockets.exceptions.ConnectionClosed: + log.info("[%s] FreeSWITCH WebSocket closed", call_id) + finally: + if dg_connection: + try: + await dg_connection.send_close_stream() + except Exception: + pass + if listener_task: + listener_task.cancel() + + +async def router(websocket) -> None: + """Route incoming WebSocket connections based on the URL path. + + /asterisk -> Asterisk AudioSocket handler (TLV-framed audio) + /freeswitch -> FreeSWITCH mod_audio_stream handler (raw PCM) + """ + path = websocket.request.path if hasattr(websocket, "request") else "/" + log.info("New connection on %s from %s", path, websocket.remote_address) + + if path.startswith("/asterisk"): + await handle_asterisk(websocket) + elif path.startswith("/freeswitch"): + await handle_freeswitch(websocket) + else: + log.warning("Unknown path %s — closing. Use /asterisk or /freeswitch", path) + await websocket.close(1008, "Use /asterisk or /freeswitch path") + + +async def serve(host: str = "0.0.0.0", port: int = 8765) -> None: + """Start the WebSocket bridge server.""" + if not os.environ.get("DEEPGRAM_API_KEY"): + log.error("DEEPGRAM_API_KEY not set. Copy .env.example to .env and add your key.") + sys.exit(1) + + log.info("PBX-to-Deepgram bridge listening on ws://%s:%d", host, port) + log.info(" /asterisk — Asterisk AudioSocket endpoint") + log.info(" /freeswitch — FreeSWITCH mod_audio_stream endpoint") + + async with websockets.asyncio.server.serve(router, host, port): + await asyncio.Future() # run forever + + +def main(): + parser = argparse.ArgumentParser(description="PBX to Deepgram STT bridge") + parser.add_argument("--host", default="0.0.0.0") + parser.add_argument("--port", type=int, default=8765) + args = parser.parse_args() + asyncio.run(serve(args.host, args.port)) + + +if __name__ == "__main__": + main() diff --git a/examples/350-asterisk-freeswitch-deepgram-stt-python/tests/test_example.py b/examples/350-asterisk-freeswitch-deepgram-stt-python/tests/test_example.py new file mode 100644 index 0000000..11bbd88 --- /dev/null +++ b/examples/350-asterisk-freeswitch-deepgram-stt-python/tests/test_example.py @@ -0,0 +1,103 @@ +import os +import struct +import sys +from pathlib import Path + +# ── Credential check ──────────────────────────────────────────────────────── +# Exit code convention across all examples in this repo: +# 0 = all tests passed +# 1 = real test failure (code bug, assertion error, unexpected API response) +# 2 = missing credentials (expected in CI until secrets are configured) +env_example = Path(__file__).parent.parent / ".env.example" +required = [ + line.split("=")[0].strip() + for line in env_example.read_text().splitlines() + if line and not line.startswith("#") and "=" in line and line[0].isupper() +] +missing = [k for k in required if not os.environ.get(k)] +if missing: + print(f"MISSING_CREDENTIALS: {','.join(missing)}", file=sys.stderr) + sys.exit(2) +# ──────────────────────────────────────────────────────────────────────────── + +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +from bridge import ( + AUDIOSOCKET_TYPE_AUDIO, + AUDIOSOCKET_TYPE_HANGUP, + AUDIOSOCKET_TYPE_UUID, + parse_audiosocket_frame, +) +from deepgram import DeepgramClient + + +def test_parse_audiosocket_uuid_frame(): + """Verify AudioSocket UUID frame parsing.""" + uuid_bytes = b"test-call-uuid-1234\x00" + frame = struct.pack(">BH", AUDIOSOCKET_TYPE_UUID, len(uuid_bytes)) + uuid_bytes + frame_type, payload = parse_audiosocket_frame(frame) + assert frame_type == AUDIOSOCKET_TYPE_UUID + assert b"test-call-uuid-1234" in payload + print("OK parse_audiosocket_uuid_frame") + + +def test_parse_audiosocket_audio_frame(): + """Verify AudioSocket audio frame parsing with PCM data.""" + pcm_data = b"\x00\x01" * 160 # 320 bytes = 20ms of 8kHz 16-bit mono + frame = struct.pack(">BH", AUDIOSOCKET_TYPE_AUDIO, len(pcm_data)) + pcm_data + frame_type, payload = parse_audiosocket_frame(frame) + assert frame_type == AUDIOSOCKET_TYPE_AUDIO + assert payload == pcm_data + print("OK parse_audiosocket_audio_frame") + + +def test_parse_audiosocket_hangup_frame(): + """Verify AudioSocket hangup frame parsing.""" + frame = struct.pack(">BH", AUDIOSOCKET_TYPE_HANGUP, 0) + frame_type, payload = parse_audiosocket_frame(frame) + assert frame_type == AUDIOSOCKET_TYPE_HANGUP + assert payload == b"" + print("OK parse_audiosocket_hangup_frame") + + +def test_parse_audiosocket_malformed(): + """Verify malformed frames raise ValueError.""" + try: + parse_audiosocket_frame(b"\x00") + assert False, "Should have raised ValueError" + except ValueError: + pass + print("OK parse_audiosocket_malformed") + + +def test_deepgram_prerecorded_stt(): + """Verify the Deepgram API key works by running a pre-recorded transcription. + + We use a pre-recorded call instead of a live WebSocket to keep the test + fast and deterministic — the same SDK client and key are used for both. + """ + client = DeepgramClient() + response = client.listen.v1.media.transcribe_url( + url="https://dpgr.am/spacewalk.wav", + model="nova-3", + smart_format=True, + tag="deepgram-examples", + ) + transcript = response.results.channels[0].alternatives[0].transcript + assert len(transcript) > 10, f"Transcript too short: {transcript}" + + lower = transcript.lower() + expected = ["spacewalk", "astronaut", "nasa"] + found = [w for w in expected if w in lower] + assert len(found) > 0, f"Expected keywords not found in: {transcript[:200]}" + + print("OK deepgram_prerecorded_stt") + print(f" Transcript preview: '{transcript[:80]}...'") + + +if __name__ == "__main__": + test_parse_audiosocket_uuid_frame() + test_parse_audiosocket_audio_frame() + test_parse_audiosocket_hangup_frame() + test_parse_audiosocket_malformed() + test_deepgram_prerecorded_stt()