Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a70c3b7
feat: emit OTEL metrics for turn latencies and usage counters
theomonnom Feb 18, 2026
136111c
use ModelUsageCollector, fix observability_url, add otel_metrics export
theomonnom Apr 2, 2026
3e001f1
add on_user_turn_completed_delay histogram and interruption usage cou…
theomonnom Apr 2, 2026
22980e6
add websocket connection time histogram to ConnectionPool
theomonnom Apr 2, 2026
b5dae8b
add acquire_time/connection_reused to metrics types and OTEL histogram
theomonnom Apr 2, 2026
1cd1389
unify model attr extraction, keep connection_reused on histogram
theomonnom Apr 2, 2026
f0c5f65
wire up acquire_time/connection_reused across all plugins
theomonnom Apr 2, 2026
854453d
fix elevenlabs connection_reused detection
theomonnom Apr 2, 2026
a91fe83
clean up elevenlabs: track acquire_time/connection_reused in _current…
theomonnom Apr 2, 2026
76d5ddb
fix: reset usage collector after flush to prevent double-counting acr…
theomonnom Apr 2, 2026
d49dd25
remove flush_usage, add counters directly in collect_usage
theomonnom Apr 2, 2026
e16a7cd
remove getattr, use ev.metadata directly in each isinstance branch
theomonnom Apr 2, 2026
19198fe
compact repr for metrics types: skip default fields
theomonnom Apr 2, 2026
76c7b9b
rename interruption counter to match field name: num_requests
theomonnom Apr 2, 2026
07222ad
add model metadata to turn latency histograms
theomonnom Apr 2, 2026
c2b2ffc
add llm_metadata to all realtime model MetricsReport paths
theomonnom Apr 2, 2026
49ac0ac
revert realtime model metadata — needs different naming, skip for now
theomonnom Apr 2, 2026
eba4cb1
fix STT double-recording of acquire_time in histogram
theomonnom Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions livekit-agents/livekit/agents/inference/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ async def _recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:

try:
async with self._tts._pool.connection(timeout=self._conn_options.timeout) as ws:
self._acquire_time = self._tts._pool.last_acquire_time
self._connection_reused = self._tts._pool.last_connection_reused
tasks = [
asyncio.create_task(_input_task()),
asyncio.create_task(_sentence_stream_task(ws)),
Expand Down
4 changes: 3 additions & 1 deletion livekit-agents/livekit/agents/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

from .log import logger
from .observability import Tagger
from .telemetry import _upload_session_report
from .telemetry import _upload_session_report, otel_metrics
from .telemetry.traces import _BufferingHandler, _setup_cloud_tracer, _shutdown_telemetry
from .types import NotGivenOr
from .utils import http_context, is_given, wait_for_participant
Expand Down Expand Up @@ -248,6 +248,8 @@ async def _on_session_end(self) -> None:
if not (session := self._primary_agent_session):
return

otel_metrics.flush_turn_metrics(session.history)

c = AgentsConsole.get_instance()
report = self.make_session_report(session)

Expand Down
9 changes: 9 additions & 0 deletions livekit-agents/livekit/agents/llm/chat_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ class AudioContent(BaseModel):

# The metrics are stored in a dict, since some fields may not be relevant
# in certain context (e.g., text-only mode or when using a speech-to-speech model).
class MetricsMetadata(TypedDict, total=False):
model_name: str
model_provider: str


class MetricsReport(TypedDict, total=False):
started_speaking_at: float
stopped_speaking_at: float
Expand Down Expand Up @@ -263,6 +268,10 @@ class MetricsReport(TypedDict, total=False):
Assistant `ChatMessage` only
"""

llm_metadata: MetricsMetadata
tts_metadata: MetricsMetadata
stt_metadata: MetricsMetadata


class ChatMessage(BaseModel):
id: str = Field(default_factory=lambda: utils.shortuuid("item_"))
Expand Down
21 changes: 21 additions & 0 deletions livekit-agents/livekit/agents/llm/realtime.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import time
from abc import ABC, abstractmethod
from collections.abc import AsyncIterable, Awaitable
from dataclasses import dataclass
Expand Down Expand Up @@ -148,6 +149,26 @@ def __init__(self, realtime_model: RealtimeModel) -> None:
super().__init__()
self._realtime_model = realtime_model

def _report_connection_acquired(self, acquire_time: float) -> None:
"""Report connection timing as a RealtimeModelMetrics event with zero usage."""
from ..metrics.base import Metadata, RealtimeModelMetrics

self.emit(
"metrics_collected",
RealtimeModelMetrics(
request_id="",
timestamp=time.time(),
acquire_time=acquire_time,
connection_reused=False,
input_token_details=RealtimeModelMetrics.InputTokenDetails(),
output_token_details=RealtimeModelMetrics.OutputTokenDetails(),
metadata=Metadata(
model_name=self._realtime_model.model,
model_provider=self._realtime_model.provider,
),
),
)

@property
def realtime_model(self) -> RealtimeModel:
return self._realtime_model
Expand Down
33 changes: 26 additions & 7 deletions livekit-agents/livekit/agents/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ class Metadata(BaseModel):
model_provider: str | None = None


class LLMMetrics(BaseModel):
class _BaseMetrics(BaseModel):
def __repr__(self) -> str:
fields = self.model_dump(exclude_defaults=True)
fields_str = ", ".join(f"{k}={v!r}" for k, v in fields.items())
return f"{self.__class__.__name__}({fields_str})"


class LLMMetrics(_BaseMetrics):
type: Literal["llm_metrics"] = "llm_metrics"
label: str
request_id: str
Expand All @@ -27,7 +34,7 @@ class LLMMetrics(BaseModel):
metadata: Metadata | None = None


class STTMetrics(BaseModel):
class STTMetrics(_BaseMetrics):
type: Literal["stt_metrics"] = "stt_metrics"
label: str
request_id: str
Expand All @@ -42,10 +49,14 @@ class STTMetrics(BaseModel):
"""Output text tokens (for token-based billing)."""
streamed: bool
"""Whether the STT is streaming (e.g using websocket)."""
acquire_time: float = 0.0
"""Time in seconds to acquire the connection. (WebSocket only)"""
connection_reused: bool = False
"""Whether the connection was reused from a pool. (WebSocket only)"""
metadata: Metadata | None = None


class TTSMetrics(BaseModel):
class TTSMetrics(_BaseMetrics):
type: Literal["tts_metrics"] = "tts_metrics"
label: str
request_id: str
Expand All @@ -61,12 +72,16 @@ class TTSMetrics(BaseModel):
output_tokens: int = 0
"""Output audio tokens (for token-based billing, e.g., OpenAI TTS)."""
streamed: bool
acquire_time: float = 0.0
"""Time in seconds to acquire the connection. (WebSocket only)"""
connection_reused: bool = False
"""Whether the connection was reused from a pool. (WebSocket only)"""
segment_id: str | None = None
speech_id: str | None = None
metadata: Metadata | None = None


class VADMetrics(BaseModel):
class VADMetrics(_BaseMetrics):
type: Literal["vad_metrics"] = "vad_metrics"
label: str
timestamp: float
Expand All @@ -76,7 +91,7 @@ class VADMetrics(BaseModel):
metadata: Metadata | None = None


class EOUMetrics(BaseModel):
class EOUMetrics(_BaseMetrics):
type: Literal["eou_metrics"] = "eou_metrics"
timestamp: float
end_of_utterance_delay: float
Expand All @@ -97,7 +112,7 @@ class EOUMetrics(BaseModel):
metadata: Metadata | None = None


class RealtimeModelMetrics(BaseModel):
class RealtimeModelMetrics(_BaseMetrics):
class CachedTokenDetails(BaseModel):
audio_tokens: int = 0
text_tokens: int = 0
Expand Down Expand Up @@ -141,10 +156,14 @@ class OutputTokenDetails(BaseModel):
"""Details about the input tokens used in the Response."""
output_token_details: OutputTokenDetails
"""Details about the output tokens used in the Response."""
acquire_time: float = 0.0
"""Time in seconds to acquire the connection. (WebSocket only)"""
connection_reused: bool = False
"""Whether the connection was reused from a pool. (WebSocket only)"""
metadata: Metadata | None = None


class InterruptionMetrics(BaseModel):
class InterruptionMetrics(_BaseMetrics):
type: Literal["interruption_metrics"] = "interruption_metrics"
timestamp: float
total_duration: float
Expand Down
17 changes: 17 additions & 0 deletions livekit-agents/livekit/agents/stt/stt.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,23 @@ def start_time_offset(self, value: float) -> None:
raise ValueError("start_time_offset must be non-negative")
self._start_time_offset = value

def _report_connection_acquired(self, acquire_time: float, connection_reused: bool) -> None:
"""Report connection timing as an STTMetrics event with zero usage."""
self._stt.emit(
"metrics_collected",
STTMetrics(
request_id="",
timestamp=time.time(),
duration=0.0,
label=self._stt._label,
audio_duration=0.0,
streamed=True,
acquire_time=acquire_time,
connection_reused=connection_reused,
metadata=Metadata(model_name=self._stt.model, model_provider=self._stt.provider),
),
)
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

@abstractmethod
async def _run(self) -> None: ...

Expand Down
3 changes: 2 additions & 1 deletion livekit-agents/livekit/agents/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from . import http_server, metrics, trace_types, utils
from . import http_server, metrics, otel_metrics, trace_types, utils
from .traces import (
_chat_ctx_to_otel_events,
_setup_cloud_tracer,
Expand All @@ -10,6 +10,7 @@
__all__ = [
"tracer",
"metrics",
"otel_metrics",
"trace_types",
"http_server",
"set_tracer_provider",
Expand Down
183 changes: 183 additions & 0 deletions livekit-agents/livekit/agents/telemetry/otel_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from opentelemetry import metrics as metrics_api

from ..metrics.base import (
AgentMetrics,
InterruptionMetrics,
LLMMetrics,
Metadata,
RealtimeModelMetrics,
STTMetrics,
TTSMetrics,
)

if TYPE_CHECKING:
from ..llm.chat_context import ChatContext, MetricsMetadata, MetricsReport

_meter = metrics_api.get_meter("livekit-agents")

# -- Per-turn latency histograms --
_turn_e2e_latency = _meter.create_histogram(
"lk.agents.turn.e2e_latency",
unit="s",
description="End-to-end turn latency",
)
_turn_llm_ttft = _meter.create_histogram(
"lk.agents.turn.llm_ttft",
unit="s",
description="Pipeline-level LLM time to first token",
)
_turn_tts_ttfb = _meter.create_histogram(
"lk.agents.turn.tts_ttfb",
unit="s",
description="Pipeline-level TTS time to first byte",
)
_turn_transcription_delay = _meter.create_histogram(
"lk.agents.turn.transcription_delay",
unit="s",
description="Time from end of speech to transcript available",
)
_turn_end_of_turn_delay = _meter.create_histogram(
"lk.agents.turn.end_of_turn_delay",
unit="s",
description="Time from end of speech to turn decision",
)
_turn_on_user_turn_completed_delay = _meter.create_histogram(
"lk.agents.turn.on_user_turn_completed_delay",
unit="s",
description="Time to invoke the on_user_turn_completed callback",
)

# -- Usage counters --
_llm_input_tokens = _meter.create_counter("lk.agents.usage.llm_input_tokens")
_llm_input_cached_tokens = _meter.create_counter("lk.agents.usage.llm_input_cached_tokens")
_llm_output_tokens = _meter.create_counter("lk.agents.usage.llm_output_tokens")
_llm_input_audio_tokens = _meter.create_counter("lk.agents.usage.llm_input_audio_tokens")
_llm_input_text_tokens = _meter.create_counter("lk.agents.usage.llm_input_text_tokens")
_llm_output_audio_tokens = _meter.create_counter("lk.agents.usage.llm_output_audio_tokens")
_llm_output_text_tokens = _meter.create_counter("lk.agents.usage.llm_output_text_tokens")
_llm_session_duration = _meter.create_counter(
"lk.agents.usage.llm_session_duration",
unit="s",
)
_tts_characters = _meter.create_counter("lk.agents.usage.tts_characters")
_tts_audio_duration = _meter.create_counter(
"lk.agents.usage.tts_audio_duration",
unit="s",
)
_stt_audio_duration = _meter.create_counter(
"lk.agents.usage.stt_audio_duration",
unit="s",
)
_interruption_num_requests = _meter.create_counter("lk.agents.usage.interruption_num_requests")

# -- Connection metrics --
_connection_acquire_time = _meter.create_histogram(
"lk.agents.connection.acquire_time",
unit="s",
description="Time to acquire a connection (WebSocket only)",
)


def _model_attrs(metadata: Metadata | None) -> dict[str, str]:
attrs: dict[str, str] = {}
if metadata:
if metadata.model_provider:
attrs["model_provider"] = metadata.model_provider
if metadata.model_name:
attrs["model_name"] = metadata.model_name
return attrs


def flush_turn_metrics(chat_ctx: ChatContext) -> None:
"""Emit per-turn latency histograms from the chat history. Called at session end."""
for msg in chat_ctx.messages():
_record_turn_metrics(msg.metrics)


def _metadata_to_attrs(metadata: MetricsMetadata) -> dict[str, str]:
attrs: dict[str, str] = {}
if "model_name" in metadata:
attrs["model_name"] = metadata["model_name"]
if "model_provider" in metadata:
attrs["model_provider"] = metadata["model_provider"]
return attrs


def _record_turn_metrics(report: MetricsReport) -> None:
llm_attrs = _metadata_to_attrs(report["llm_metadata"]) if "llm_metadata" in report else {}
tts_attrs = _metadata_to_attrs(report["tts_metadata"]) if "tts_metadata" in report else {}
stt_attrs = _metadata_to_attrs(report["stt_metadata"]) if "stt_metadata" in report else {}

if "e2e_latency" in report:
_turn_e2e_latency.record(report["e2e_latency"], attributes=llm_attrs)
if "llm_node_ttft" in report:
_turn_llm_ttft.record(report["llm_node_ttft"], attributes=llm_attrs)
if "tts_node_ttfb" in report:
_turn_tts_ttfb.record(report["tts_node_ttfb"], attributes=tts_attrs)
if "transcription_delay" in report:
_turn_transcription_delay.record(report["transcription_delay"], attributes=stt_attrs)
if "end_of_turn_delay" in report:
_turn_end_of_turn_delay.record(report["end_of_turn_delay"], attributes=stt_attrs)
if "on_user_turn_completed_delay" in report:
_turn_on_user_turn_completed_delay.record(
report["on_user_turn_completed_delay"], attributes=stt_attrs
)


def collect_usage(ev: AgentMetrics) -> None:
"""Record usage counters directly from each metrics event."""
if isinstance(ev, LLMMetrics):
attrs = _model_attrs(ev.metadata)
if ev.prompt_tokens:
_llm_input_tokens.add(ev.prompt_tokens, attributes=attrs)
if ev.prompt_cached_tokens:
_llm_input_cached_tokens.add(ev.prompt_cached_tokens, attributes=attrs)
if ev.completion_tokens:
_llm_output_tokens.add(ev.completion_tokens, attributes=attrs)

elif isinstance(ev, RealtimeModelMetrics):
attrs = _model_attrs(ev.metadata)
if ev.input_tokens:
_llm_input_tokens.add(ev.input_tokens, attributes=attrs)
if ev.input_token_details.cached_tokens:
_llm_input_cached_tokens.add(ev.input_token_details.cached_tokens, attributes=attrs)
if ev.output_tokens:
_llm_output_tokens.add(ev.output_tokens, attributes=attrs)
if ev.input_token_details.audio_tokens:
_llm_input_audio_tokens.add(ev.input_token_details.audio_tokens, attributes=attrs)
if ev.input_token_details.text_tokens:
_llm_input_text_tokens.add(ev.input_token_details.text_tokens, attributes=attrs)
if ev.output_token_details.audio_tokens:
_llm_output_audio_tokens.add(ev.output_token_details.audio_tokens, attributes=attrs)
if ev.output_token_details.text_tokens:
_llm_output_text_tokens.add(ev.output_token_details.text_tokens, attributes=attrs)
if ev.session_duration:
_llm_session_duration.add(ev.session_duration, attributes=attrs)

elif isinstance(ev, TTSMetrics):
attrs = _model_attrs(ev.metadata)
if ev.characters_count:
_tts_characters.add(ev.characters_count, attributes=attrs)
if ev.audio_duration:
_tts_audio_duration.add(ev.audio_duration, attributes=attrs)

elif isinstance(ev, STTMetrics):
attrs = _model_attrs(ev.metadata)
if ev.audio_duration:
_stt_audio_duration.add(ev.audio_duration, attributes=attrs)

elif isinstance(ev, InterruptionMetrics):
attrs = _model_attrs(ev.metadata)
if ev.num_requests:
_interruption_num_requests.add(ev.num_requests, attributes=attrs)

# Connection timing
if isinstance(ev, (STTMetrics, TTSMetrics, RealtimeModelMetrics)):
if ev.acquire_time > 0:
conn_attrs = _model_attrs(ev.metadata)
conn_attrs["connection_reused"] = str(ev.connection_reused).lower()
_connection_acquire_time.record(ev.acquire_time, attributes=conn_attrs)
Loading
Loading