feat(assemblyai): new STT plugin for Universal-Streaming (v3)#1219
feat(assemblyai): new STT plugin for Universal-Streaming (v3)#1219toubatbrian merged 3 commits intolivekit:mainfrom
Conversation
Adds a new `@livekit/agents-plugin-assemblyai` STT plugin that connects to
AssemblyAI's Universal-Streaming v3 WebSocket API. Ported line-by-line from
the Python `livekit-plugins-assemblyai` plugin; every non-trivial block has
a `// Ref: python .../stt.py - N-M lines` comment pointing back to the
source so future syncs are straightforward.
Architecture follows the existing Deepgram STT plugin: retry loop around a
single WebSocket lifetime, concurrent sendTask/listenTask/configTask under
`#runWS`, graceful close via `{"type":"Terminate"}`.
Key features:
- All AssemblyAI v3 options exposed: speechModel (universal-streaming-*,
u3-rt-pro), sampleRate, bufferSizeMs, min/maxTurnSilence, formatTurns,
keytermsPrompt, prompt (u3-rt-pro only), vadThreshold, endOfTurnConfidenceThreshold,
speakerLabels, maxSpeakers, domain, custom baseUrl (for EU region).
- Live config updates via `UpdateConfiguration` messages over the active
socket — no reconnect needed, unlike Deepgram's updateOptions.
- `forceEndpoint()` to finalize the current turn on demand.
- `sessionId` / `expiresAt` getters for debugging; `sessionId` is also
propagated as `requestId` on RECOGNITION_USAGE events.
- Turn-event mapping: `Turn` message → INTERIM_TRANSCRIPT (cumulative),
PREFLIGHT_TRANSCRIPT (chunk-based from `utterance`), FINAL_TRANSCRIPT +
END_OF_SPEECH on `end_of_turn` (with `turn_is_formatted` check when
`formatTurns` is enabled).
- Word timestamps converted from ms → seconds for `createTimedString`.
- `u3-pro` → `u3-rt-pro` rename handled with deprecation warning (matches
Python).
Known framework gap: `stt.SpeechData` in `@livekit/agents` doesn't yet have
a `speakerId` field, so per-word speaker labels from AssemblyAI's
diarization are not surfaced on emitted events. The `speakerLabels` option
is exposed and sent to the server; a follow-up PR can wire through speaker
ids once the base type is extended. See the TSDoc on `speakerLabels` for
the exact forwarding point.
Smoke-tested end-to-end against the AssemblyAI API using the shared
`@livekit/agents-plugins-test` STT harness; transcription matches the
reference transcript within the 20% Levenshtein threshold.
Also adds `ASSEMBLYAI_API_KEY` to `turbo.json` globalEnv (required for
lint to pass).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
🦋 Changeset detectedLatest commit: 0925809 The changes in this PR will be included in the next version bump. This PR includes changesets to release 23 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
@codex review and check the plugin package format against others. Review for web-socket patterns compared with other STT plugins and check for alignment or discrepancies |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f374b42b21
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| } else if (data.sampleRate === this.#opts.sampleRate || data.channels === 1) { | ||
| // Matches the Deepgram plugin's permissive check — the base class |
There was a problem hiding this comment.
Reject mismatched channel count before writing PCM frames
The guard here accepts frames when either sample rate matches or channels are mono, so a stereo frame at sampleRate === this.#opts.sampleRate is treated as valid. Because AudioByteStream is created as 1-channel, interleaved stereo bytes are reinterpreted as mono PCM, which can distort transcription and inflate timing/usage metrics. This is reachable when upstream audio is already at target sample rate but not downmixed (the base SpeechStream only resamples sample rate). Require channels === 1 in addition to the sample-rate tolerance before calling audioStream.write.
Useful? React with 👍 / 👎.
| while (!this.input.closed && !this.closed) { | ||
| try { | ||
| const ws = await this.#connectWS(); | ||
| await this.#runWS(ws); |
There was a problem hiding this comment.
Reset reconnect retry counter after successful WS sessions
The retry budget is only incremented on disconnect errors and never reset after a successful #runWS cycle, so long-lived streams can fail permanently after 32 reconnects even if each reconnect succeeds. AssemblyAI sessions are time-bounded (expires_at is tracked), so periodic socket closes are expected in production; without resetting retries after a healthy session, normal lifecycle reconnects eventually trip the fatal failed to connect path.
Useful? React with 👍 / 👎.
|
To use Codex here, create an environment for this repo. |
2 similar comments
|
To use Codex here, create an environment for this repo. |
|
To use Codex here, create an environment for this repo. |
|
Addressed the two review items in Changes:
Validation:
|
| try { | ||
| await Promise.all([sendTask(), listenTask.result, wsMonitor.result]); | ||
| } finally { | ||
| closing = true; | ||
| listenTask.cancel(); | ||
| configTask.cancel(); | ||
| if (messageHandler) ws.off('message', messageHandler); | ||
| try { | ||
| ws.close(); | ||
| } catch { | ||
| // ignore | ||
| } | ||
| } |
There was a problem hiding this comment.
🔴 Orphaned sendTask after WebSocket reconnection causes audio frame loss
When the WebSocket closes unexpectedly, wsMonitor.result rejects causing Promise.all at line 441 to reject. The finally block (lines 442-452) cancels listenTask and configTask, but sendTask() is a plain async function — not a Task — so it is not cancelled. It remains blocked on await Promise.race([this.input.next(), abortPromise]) (line 360). The abortPromise only resolves when the entire SpeechStream is closed (not on reconnect), and this.closed is also not set during reconnect.
When run() catches the error and loops back to create a new WebSocket session, a new sendTask starts, also calling this.input.next(). Now two concurrent readers compete for frames from the same AsyncIterableQueue. Audio frames consumed by the orphaned old sendTask are sent to the old (closed) WebSocket via ws.send() (silently dropped by the ws library), while only the remaining frames reach the new session. This causes intermittent audio data loss during reconnection, degrading transcription.
Comparison with Deepgram plugin pattern
Deepgram's stt.ts has a similar structure but the issue manifests differently there because Deepgram passes the raw wsMonitor Task object (not wsMonitor.result) to Promise.all — meaning the Task resolves immediately as a non-thenable, so Promise.all effectively only waits for sendTask and listenTask. In AssemblyAI, wsMonitor.result is correctly awaited, which means the wsMonitor rejection properly triggers the cleanup path — but that cleanup path doesn't cancel the sendTask.
Prompt for agents
In #runWS (stt.ts), the sendTask is a plain async function passed directly to Promise.all. When Promise.all rejects (e.g. wsMonitor detects unexpected WebSocket close), the sendTask continues running in the background because there is no mechanism to cancel it. The finally block cancels listenTask and configTask (which are Task instances with cancel() support) but not sendTask.
The fix is to give sendTask its own AbortController (or reuse an existing signal) so it can be cancelled. Options:
1. Convert sendTask to use Task.from() like listenTask and configTask, and cancel it in the finally block.
2. Create a local AbortController for the WS session. Pass its signal to sendTask (in addition to the stream-level abortSignal). In the finally block, abort it.
3. Add a check inside sendTask's loop that tests the local `closing` variable. Since `closing` is set in the finally block, the sendTask would exit on its next iteration. However, this requires the sendTask to be awaiting something that yields — the `this.input.next()` await may not return promptly.
Option 1 or 2 are preferred because they provide immediate cancellation. With option 2, the sendTask's Promise.race would include the session-scoped abort promise, ensuring it exits promptly when the session ends.
Was this helpful? React with 👍 or 👎 to provide feedback.
|
Addressed the follow-up reconnect issue in Changes:
This prevents an orphaned Validation:
|
| min_turn_silence: minSilence, | ||
| max_turn_silence: maxSilence, |
There was a problem hiding this comment.
🔴 Silence parameters sent in milliseconds to AssemblyAI API which expects seconds
The minTurnSilence and maxTurnSilence options are stored in milliseconds (per the repo's time-unit convention, as documented in CLAUDE.md and the JSDoc comments at lines 59-62), but they are sent directly to the AssemblyAI API without converting to seconds. The Python plugin this was ported from sends 0.1 (seconds) for 100ms of silence. The JS code sends 100 (the ms value) which the API interprets as 100 seconds, making end-of-turn detection essentially non-functional.
This affects the initial WebSocket connection parameters at lines 289-290, the u3-rt-pro model defaults at line 272, and the updateOptions runtime config messages at lines 208-209.
Prompt for agents
The minTurnSilence and maxTurnSilence values are stored in milliseconds (per the JS repo convention documented in CLAUDE.md), but AssemblyAI's API expects seconds (the Python plugin sends 0.1 for 100ms). The fix needs to convert ms to seconds in three places:
1. In #connectWS (stt.ts lines 289-290): divide minSilence and maxSilence by 1000 before putting them in liveConfig, e.g. min_turn_silence: minSilence !== undefined ? minSilence / 1000 : undefined
2. In SpeechStream.updateOptions (stt.ts lines 208-209): divide opts.maxTurnSilence and opts.minTurnSilence by 1000 before assigning to configMsg.max_turn_silence and configMsg.min_turn_silence
Note that the default fallback values in #connectWS at line 272 (minSilence = 100) are correct as internal ms values - they just need the /1000 conversion when serialized to the API.
Was this helpful? React with 👍 or 👎 to provide feedback.
| if (opts.maxTurnSilence !== undefined) configMsg.max_turn_silence = opts.maxTurnSilence; | ||
| if (opts.minTurnSilence !== undefined) configMsg.min_turn_silence = opts.minTurnSilence; |
There was a problem hiding this comment.
🔴 updateOptions sends silence parameters in milliseconds to AssemblyAI API
Same root cause as the #connectWS bug: opts.maxTurnSilence and opts.minTurnSilence (milliseconds) are sent directly to AssemblyAI's UpdateConfiguration message as max_turn_silence and min_turn_silence, but the API expects seconds. For example, calling updateOptions({ minTurnSilence: 200 }) would send min_turn_silence: 200 to the API, which interprets it as 200 seconds instead of 0.2 seconds.
| if (opts.maxTurnSilence !== undefined) configMsg.max_turn_silence = opts.maxTurnSilence; | |
| if (opts.minTurnSilence !== undefined) configMsg.min_turn_silence = opts.minTurnSilence; | |
| if (opts.maxTurnSilence !== undefined) configMsg.max_turn_silence = opts.maxTurnSilence / 1000; | |
| if (opts.minTurnSilence !== undefined) configMsg.min_turn_silence = opts.minTurnSilence / 1000; |
Was this helpful? React with 👍 or 👎 to provide feedback.
|
I checked the current AssemblyAI API docs before changing this, and I do not think the new silence-unit suggestion is correct. Both current docs describe
So the current JS implementation keeping these values in ms and sending them through unchanged matches the published API. Dividing by The earlier fixes in
Docs I checked: |
Summary
Adds a new
@livekit/agents-plugin-assemblyaiSTT plugin for AssemblyAI's Universal-Streaming v3 WebSocket API. Ported line-by-line from the Pythonlivekit-plugins-assemblyaiplugin; every non-trivial block has a// Ref: python .../stt.py - N-M linescomment pointing back to the source so future syncs are straightforward.Architecture mirrors the existing Deepgram STT plugin: retry loop around a single WebSocket lifetime, concurrent
sendTask/listenTask/configTaskinside#runWS, graceful close via{"type":"Terminate"}.Features
speechModel(universal-streaming-english,universal-streaming-multilingual,u3-rt-pro),sampleRate,bufferSizeMs,min/maxTurnSilence,formatTurns,keytermsPrompt,prompt(u3-rt-pro only),vadThreshold,endOfTurnConfidenceThreshold,speakerLabels,maxSpeakers,domain, custombaseUrl(e.g. for EU region).UpdateConfigurationmessages over the active socket — no reconnect needed, unlike Deepgram'supdateOptions. Backed by a small pending-queue +Futurewake pattern.forceEndpoint()to finalize the current turn on demand.sessionId/expiresAtgetters for debugging;sessionIdis also propagated asrequestIdonRECOGNITION_USAGEevents so metrics correlate back to a specific AssemblyAI connection.Turnmessage →INTERIM_TRANSCRIPT(cumulative words),PREFLIGHT_TRANSCRIPT(chunk-based fromutterance),FINAL_TRANSCRIPT+END_OF_SPEECHonend_of_turn(respectingturn_is_formattedwhenformatTurnsis set).createTimedString's seconds.u3-pro→u3-rt-prorename handled with a deprecation warning, matching Python.Known framework gap (not blocking)
stt.SpeechDatain@livekit/agentsdoesn't yet have aspeakerIdfield, so per-word speaker labels from AssemblyAI's diarization are not surfaced on emitted events. ThespeakerLabelsoption is exposed and sent to the server; a follow-up PR can wire through speaker ids once the base type is extended. See the TSDoc onspeakerLabelsinplugins/assemblyai/src/stt.tsfor the exact forwarding point.Test plan
pnpm --filter @livekit/agents-plugin-assemblyai... build— cleanpnpm --filter @livekit/agents-plugin-assemblyai lint— cleanpnpm -w format:check— cleanpnpm lint(whole monorepo) — 24/24 green, no regressionspnpm build(whole monorepo) — 24/24 greenpnpm vitest run plugins/assemblyai --testTimeout 30000against the real AssemblyAI API — green, transcription matches reference transcript within the 20% Levenshtein thresholdexamples/src/basic_agent.ts) and verify turn detection behaves naturallyNotes for reviewers
plugins/test/src/stt.tshelper has a default 5000ms vitest timeout; AssemblyAI's streaming happy path lands at 5.5-6.2s end-to-end, which is occasionally flaky near the boundary. Running with--testTimeout 30000is reliable. This affects the shared helper, not the plugin itself — a follow-up could bump the helper's default.1.2.4to match the monorepo's fixed-versioning group (.changeset/config.jsonhas"fixed": [["@livekit/agents", "@livekit/agents-plugin-*", ...]]). Changeset ispatchto match recent precedent — a maintainer can bump tominorat release time if preferred.turbo.jsongains one line:ASSEMBLYAI_API_KEYinglobalEnv, required so the new test file's env-var check doesn't tripturbo/no-undeclared-env-vars.