Skip to content

feat(assemblyai): new STT plugin for Universal-Streaming (v3)#1219

Merged
toubatbrian merged 3 commits intolivekit:mainfrom
chasef07:assemblyai-stt-plugin
Apr 13, 2026
Merged

feat(assemblyai): new STT plugin for Universal-Streaming (v3)#1219
toubatbrian merged 3 commits intolivekit:mainfrom
chasef07:assemblyai-stt-plugin

Conversation

@chasef07
Copy link
Copy Markdown
Contributor

@chasef07 chasef07 commented Apr 9, 2026

Summary

Adds a new @livekit/agents-plugin-assemblyai STT plugin for AssemblyAI's Universal-Streaming v3 WebSocket API. Ported line-by-line from the Python livekit-plugins-assemblyai plugin; every non-trivial block has a // Ref: python .../stt.py - N-M lines comment pointing back to the source so future syncs are straightforward.

Architecture mirrors the existing Deepgram STT plugin: retry loop around a single WebSocket lifetime, concurrent sendTask / listenTask / configTask inside #runWS, graceful close via {"type":"Terminate"}.

Features

  • All AssemblyAI v3 options exposed: speechModel (universal-streaming-english, universal-streaming-multilingual, u3-rt-pro), sampleRate, bufferSizeMs, min/maxTurnSilence, formatTurns, keytermsPrompt, prompt (u3-rt-pro only), vadThreshold, endOfTurnConfidenceThreshold, speakerLabels, maxSpeakers, domain, custom baseUrl (e.g. for EU region).
  • Live config updates via UpdateConfiguration messages over the active socket — no reconnect needed, unlike Deepgram's updateOptions. Backed by a small pending-queue + Future wake pattern.
  • forceEndpoint() to finalize the current turn on demand.
  • sessionId / expiresAt getters for debugging; sessionId is also propagated as requestId on RECOGNITION_USAGE events so metrics correlate back to a specific AssemblyAI connection.
  • Turn-event mapping: Turn message → INTERIM_TRANSCRIPT (cumulative words), PREFLIGHT_TRANSCRIPT (chunk-based from utterance), FINAL_TRANSCRIPT + END_OF_SPEECH on end_of_turn (respecting turn_is_formatted when formatTurns is set).
  • Word timestamps correctly converted from AssemblyAI's ms to createTimedString's seconds.
  • u3-prou3-rt-pro rename handled with a deprecation warning, matching Python.

Known framework gap (not blocking)

stt.SpeechData in @livekit/agents doesn't yet have a speakerId field, so per-word speaker labels from AssemblyAI's diarization are not surfaced on emitted events. The speakerLabels option is exposed and sent to the server; a follow-up PR can wire through speaker ids once the base type is extended. See the TSDoc on speakerLabels in plugins/assemblyai/src/stt.ts for the exact forwarding point.

Test plan

  • pnpm --filter @livekit/agents-plugin-assemblyai... build — clean
  • pnpm --filter @livekit/agents-plugin-assemblyai lint — clean
  • pnpm -w format:check — clean
  • pnpm lint (whole monorepo) — 24/24 green, no regressions
  • pnpm build (whole monorepo) — 24/24 green
  • pnpm vitest run plugins/assemblyai --testTimeout 30000 against the real AssemblyAI API — green, transcription matches reference transcript within the 20% Levenshtein threshold
  • Swap into a real voice-agent example (e.g. examples/src/basic_agent.ts) and verify turn detection behaves naturally

Notes for reviewers

  • The shared plugins/test/src/stt.ts helper has a default 5000ms vitest timeout; AssemblyAI's streaming happy path lands at 5.5-6.2s end-to-end, which is occasionally flaky near the boundary. Running with --testTimeout 30000 is reliable. This affects the shared helper, not the plugin itself — a follow-up could bump the helper's default.
  • Version is pinned to 1.2.4 to match the monorepo's fixed-versioning group (.changeset/config.json has "fixed": [["@livekit/agents", "@livekit/agents-plugin-*", ...]]). Changeset is patch to match recent precedent — a maintainer can bump to minor at release time if preferred.
  • turbo.json gains one line: ASSEMBLYAI_API_KEY in globalEnv, required so the new test file's env-var check doesn't trip turbo/no-undeclared-env-vars.

Adds a new `@livekit/agents-plugin-assemblyai` STT plugin that connects to
AssemblyAI's Universal-Streaming v3 WebSocket API. Ported line-by-line from
the Python `livekit-plugins-assemblyai` plugin; every non-trivial block has
a `// Ref: python .../stt.py - N-M lines` comment pointing back to the
source so future syncs are straightforward.

Architecture follows the existing Deepgram STT plugin: retry loop around a
single WebSocket lifetime, concurrent sendTask/listenTask/configTask under
`#runWS`, graceful close via `{"type":"Terminate"}`.

Key features:
- All AssemblyAI v3 options exposed: speechModel (universal-streaming-*,
  u3-rt-pro), sampleRate, bufferSizeMs, min/maxTurnSilence, formatTurns,
  keytermsPrompt, prompt (u3-rt-pro only), vadThreshold, endOfTurnConfidenceThreshold,
  speakerLabels, maxSpeakers, domain, custom baseUrl (for EU region).
- Live config updates via `UpdateConfiguration` messages over the active
  socket — no reconnect needed, unlike Deepgram's updateOptions.
- `forceEndpoint()` to finalize the current turn on demand.
- `sessionId` / `expiresAt` getters for debugging; `sessionId` is also
  propagated as `requestId` on RECOGNITION_USAGE events.
- Turn-event mapping: `Turn` message → INTERIM_TRANSCRIPT (cumulative),
  PREFLIGHT_TRANSCRIPT (chunk-based from `utterance`), FINAL_TRANSCRIPT +
  END_OF_SPEECH on `end_of_turn` (with `turn_is_formatted` check when
  `formatTurns` is enabled).
- Word timestamps converted from ms → seconds for `createTimedString`.
- `u3-pro` → `u3-rt-pro` rename handled with deprecation warning (matches
  Python).

Known framework gap: `stt.SpeechData` in `@livekit/agents` doesn't yet have
a `speakerId` field, so per-word speaker labels from AssemblyAI's
diarization are not surfaced on emitted events. The `speakerLabels` option
is exposed and sent to the server; a follow-up PR can wire through speaker
ids once the base type is extended. See the TSDoc on `speakerLabels` for
the exact forwarding point.

Smoke-tested end-to-end against the AssemblyAI API using the shared
`@livekit/agents-plugins-test` STT harness; transcription matches the
reference transcript within the 20% Levenshtein threshold.

Also adds `ASSEMBLYAI_API_KEY` to `turbo.json` globalEnv (required for
lint to pass).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@changeset-bot
Copy link
Copy Markdown

changeset-bot bot commented Apr 9, 2026

🦋 Changeset detected

Latest commit: 0925809

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 23 packages
Name Type
@livekit/agents-plugin-assemblyai Patch
@livekit/agents Patch
@livekit/agents-plugin-anam Patch
@livekit/agents-plugin-baseten Patch
@livekit/agents-plugin-bey Patch
@livekit/agents-plugin-cartesia Patch
@livekit/agents-plugin-deepgram Patch
@livekit/agents-plugin-elevenlabs Patch
@livekit/agents-plugin-google Patch
@livekit/agents-plugin-hedra Patch
@livekit/agents-plugin-inworld Patch
@livekit/agents-plugin-lemonslice Patch
@livekit/agents-plugin-livekit Patch
@livekit/agents-plugin-neuphonic Patch
@livekit/agents-plugin-openai Patch
@livekit/agents-plugin-phonic Patch
@livekit/agents-plugin-resemble Patch
@livekit/agents-plugin-rime Patch
@livekit/agents-plugin-sarvam Patch
@livekit/agents-plugin-silero Patch
@livekit/agents-plugin-trugen Patch
@livekit/agents-plugin-xai Patch
@livekit/agents-plugins-test Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 5 additional findings.

Open in Devin Review

@toubatbrian
Copy link
Copy Markdown
Contributor

@codex review and check the plugin package format against others. Review for web-socket patterns compared with other STT plugins and check for alignment or discrepancies

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f374b42b21

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread plugins/assemblyai/src/stt.ts Outdated
Comment on lines +369 to +370
} else if (data.sampleRate === this.#opts.sampleRate || data.channels === 1) {
// Matches the Deepgram plugin's permissive check — the base class
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reject mismatched channel count before writing PCM frames

The guard here accepts frames when either sample rate matches or channels are mono, so a stereo frame at sampleRate === this.#opts.sampleRate is treated as valid. Because AudioByteStream is created as 1-channel, interleaved stereo bytes are reinterpreted as mono PCM, which can distort transcription and inflate timing/usage metrics. This is reachable when upstream audio is already at target sample rate but not downmixed (the base SpeechStream only resamples sample rate). Require channels === 1 in addition to the sample-rate tolerance before calling audioStream.write.

Useful? React with 👍 / 👎.

while (!this.input.closed && !this.closed) {
try {
const ws = await this.#connectWS();
await this.#runWS(ws);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reset reconnect retry counter after successful WS sessions

The retry budget is only incremented on disconnect errors and never reset after a successful #runWS cycle, so long-lived streams can fail permanently after 32 reconnects even if each reconnect succeeds. AssemblyAI sessions are time-bounded (expires_at is tracked), so periodic socket closes are expected in production; without resetting retries after a healthy session, normal lifecycle reconnects eventually trip the fatal failed to connect path.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector
Copy link
Copy Markdown

To use Codex here, create an environment for this repo.

2 similar comments
@chatgpt-codex-connector
Copy link
Copy Markdown

To use Codex here, create an environment for this repo.

@chatgpt-codex-connector
Copy link
Copy Markdown

To use Codex here, create an environment for this repo.

Copy link
Copy Markdown
Contributor Author

Addressed the two review items in 2436513c.

Changes:

  • Reset retries after a successful #runWS() session so expected reconnects do not accumulate toward the fatal retry cap.
  • Tightened the AssemblyAI PCM guard to require both data.sampleRate === this.#opts.sampleRate and data.channels === 1 before writing into the mono AudioByteStream, which rejects stereo frames instead of misinterpreting them as mono.

Validation:

  • pnpm --filter @livekit/agents-plugin-assemblyai lint
  • pnpm --filter @livekit/agents-plugin-assemblyai build

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 9 additional findings in Devin Review.

Open in Devin Review

Comment on lines +440 to +452
try {
await Promise.all([sendTask(), listenTask.result, wsMonitor.result]);
} finally {
closing = true;
listenTask.cancel();
configTask.cancel();
if (messageHandler) ws.off('message', messageHandler);
try {
ws.close();
} catch {
// ignore
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Orphaned sendTask after WebSocket reconnection causes audio frame loss

When the WebSocket closes unexpectedly, wsMonitor.result rejects causing Promise.all at line 441 to reject. The finally block (lines 442-452) cancels listenTask and configTask, but sendTask() is a plain async function — not a Task — so it is not cancelled. It remains blocked on await Promise.race([this.input.next(), abortPromise]) (line 360). The abortPromise only resolves when the entire SpeechStream is closed (not on reconnect), and this.closed is also not set during reconnect.

When run() catches the error and loops back to create a new WebSocket session, a new sendTask starts, also calling this.input.next(). Now two concurrent readers compete for frames from the same AsyncIterableQueue. Audio frames consumed by the orphaned old sendTask are sent to the old (closed) WebSocket via ws.send() (silently dropped by the ws library), while only the remaining frames reach the new session. This causes intermittent audio data loss during reconnection, degrading transcription.

Comparison with Deepgram plugin pattern

Deepgram's stt.ts has a similar structure but the issue manifests differently there because Deepgram passes the raw wsMonitor Task object (not wsMonitor.result) to Promise.all — meaning the Task resolves immediately as a non-thenable, so Promise.all effectively only waits for sendTask and listenTask. In AssemblyAI, wsMonitor.result is correctly awaited, which means the wsMonitor rejection properly triggers the cleanup path — but that cleanup path doesn't cancel the sendTask.

Prompt for agents
In #runWS (stt.ts), the sendTask is a plain async function passed directly to Promise.all. When Promise.all rejects (e.g. wsMonitor detects unexpected WebSocket close), the sendTask continues running in the background because there is no mechanism to cancel it. The finally block cancels listenTask and configTask (which are Task instances with cancel() support) but not sendTask.

The fix is to give sendTask its own AbortController (or reuse an existing signal) so it can be cancelled. Options:

1. Convert sendTask to use Task.from() like listenTask and configTask, and cancel it in the finally block.
2. Create a local AbortController for the WS session. Pass its signal to sendTask (in addition to the stream-level abortSignal). In the finally block, abort it.
3. Add a check inside sendTask's loop that tests the local `closing` variable. Since `closing` is set in the finally block, the sendTask would exit on its next iteration. However, this requires the sendTask to be awaiting something that yields — the `this.input.next()` await may not return promptly.

Option 1 or 2 are preferred because they provide immediate cancellation. With option 2, the sendTask's Promise.race would include the session-scoped abort promise, ensuring it exits promptly when the session ends.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor Author

Addressed the follow-up reconnect issue in 0925809b.

Changes:

  • Added a session-scoped AbortController inside #runWS().
  • Updated sendTask to race on both the stream abort and the current-session abort.
  • Abort the session controller in the outer finally block so an old sendTask exits immediately when that WebSocket session tears down.

This prevents an orphaned sendTask from surviving a reconnect and competing for this.input with the next session.

Validation:

  • pnpm --filter @livekit/agents-plugin-assemblyai lint
  • pnpm --filter @livekit/agents-plugin-assemblyai build

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View 11 additional findings in Devin Review.

Open in Devin Review

Comment on lines +289 to +290
min_turn_silence: minSilence,
max_turn_silence: maxSilence,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Silence parameters sent in milliseconds to AssemblyAI API which expects seconds

The minTurnSilence and maxTurnSilence options are stored in milliseconds (per the repo's time-unit convention, as documented in CLAUDE.md and the JSDoc comments at lines 59-62), but they are sent directly to the AssemblyAI API without converting to seconds. The Python plugin this was ported from sends 0.1 (seconds) for 100ms of silence. The JS code sends 100 (the ms value) which the API interprets as 100 seconds, making end-of-turn detection essentially non-functional.

This affects the initial WebSocket connection parameters at lines 289-290, the u3-rt-pro model defaults at line 272, and the updateOptions runtime config messages at lines 208-209.

Prompt for agents
The minTurnSilence and maxTurnSilence values are stored in milliseconds (per the JS repo convention documented in CLAUDE.md), but AssemblyAI's API expects seconds (the Python plugin sends 0.1 for 100ms). The fix needs to convert ms to seconds in three places:

1. In #connectWS (stt.ts lines 289-290): divide minSilence and maxSilence by 1000 before putting them in liveConfig, e.g. min_turn_silence: minSilence !== undefined ? minSilence / 1000 : undefined

2. In SpeechStream.updateOptions (stt.ts lines 208-209): divide opts.maxTurnSilence and opts.minTurnSilence by 1000 before assigning to configMsg.max_turn_silence and configMsg.min_turn_silence

Note that the default fallback values in #connectWS at line 272 (minSilence = 100) are correct as internal ms values - they just need the /1000 conversion when serialized to the API.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +208 to +209
if (opts.maxTurnSilence !== undefined) configMsg.max_turn_silence = opts.maxTurnSilence;
if (opts.minTurnSilence !== undefined) configMsg.min_turn_silence = opts.minTurnSilence;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 updateOptions sends silence parameters in milliseconds to AssemblyAI API

Same root cause as the #connectWS bug: opts.maxTurnSilence and opts.minTurnSilence (milliseconds) are sent directly to AssemblyAI's UpdateConfiguration message as max_turn_silence and min_turn_silence, but the API expects seconds. For example, calling updateOptions({ minTurnSilence: 200 }) would send min_turn_silence: 200 to the API, which interprets it as 200 seconds instead of 0.2 seconds.

Suggested change
if (opts.maxTurnSilence !== undefined) configMsg.max_turn_silence = opts.maxTurnSilence;
if (opts.minTurnSilence !== undefined) configMsg.min_turn_silence = opts.minTurnSilence;
if (opts.maxTurnSilence !== undefined) configMsg.max_turn_silence = opts.maxTurnSilence / 1000;
if (opts.minTurnSilence !== undefined) configMsg.min_turn_silence = opts.minTurnSilence / 1000;
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor Author

I checked the current AssemblyAI API docs before changing this, and I do not think the new silence-unit suggestion is correct.

Both current docs describe min_turn_silence / max_turn_silence as integer values in milliseconds, not seconds:

  • Universal-Streaming: max_turn_silence defaults to 1280 and min_turn_silence defaults to 400, both described as milliseconds.
  • Universal-3 Pro Streaming: max_turn_silence defaults to 1000 and min_turn_silence defaults to 100, both described as milliseconds.

So the current JS implementation keeping these values in ms and sending them through unchanged matches the published API. Dividing by 1000 here would likely under-shoot turn detection rather than fix it.

The earlier fixes in 2436513c and 0925809b were still valid:

  • reject non-mono PCM before writing into the mono AudioByteStream
  • reset reconnect retries after a healthy session
  • abort the old sendTask on WS teardown so reconnects do not leave concurrent readers on this.input

Docs I checked:

@toubatbrian toubatbrian merged commit cc89536 into livekit:main Apr 13, 2026
3 of 6 checks passed
@github-actions github-actions bot mentioned this pull request Apr 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants