From cae26411c8c915578ee9d3152a042bf9c093d4bc Mon Sep 17 00:00:00 2001 From: Richard Solomou Date: Tue, 27 Jan 2026 13:53:46 +0200 Subject: [PATCH 1/4] feat: pass raw provider usage metadata for backend cost calculations Add raw_usage field to TokenUsage type to capture raw provider usage metadata (OpenAI, Anthropic, Gemini). This enables the backend to extract modality-specific token counts (text vs image vs audio) for accurate cost calculations. - Add raw_usage field to TokenUsage TypedDict - Update all provider converters to capture raw usage: - OpenAI: capture response.usage and chunk usage - Anthropic: capture usage from message_start and message_delta events - Gemini: capture usage_metadata from responses and chunks - Pass raw usage as $ai_usage property in PostHog events - Update merge_usage_stats to handle raw_usage in both modes - Add tests verifying $ai_usage is captured for all providers Backend will extract provider-specific details and delete $ai_usage after processing to avoid bloating properties. Co-Authored-By: Claude Sonnet 4.5 --- posthog/ai/anthropic/anthropic_converter.py | 8 ++++++++ posthog/ai/gemini/gemini_converter.py | 3 +++ posthog/ai/openai/openai_converter.py | 9 +++++++++ posthog/ai/types.py | 1 + posthog/ai/utils.py | 20 ++++++++++++++++++++ posthog/test/ai/anthropic/test_anthropic.py | 3 +++ posthog/test/ai/gemini/test_gemini.py | 3 +++ posthog/test/ai/openai/test_openai.py | 3 +++ 8 files changed, 50 insertions(+) diff --git a/posthog/ai/anthropic/anthropic_converter.py b/posthog/ai/anthropic/anthropic_converter.py index 24f20b9f..df19b1ab 100644 --- a/posthog/ai/anthropic/anthropic_converter.py +++ b/posthog/ai/anthropic/anthropic_converter.py @@ -221,6 +221,9 @@ def extract_anthropic_usage_from_response(response: Any) -> TokenUsage: if web_search_count > 0: result["web_search_count"] = web_search_count + # Capture raw usage metadata for backend processing + result["raw_usage"] = response.usage + return result @@ -247,6 +250,8 @@ def extract_anthropic_usage_from_event(event: Any) -> TokenUsage: usage["cache_read_input_tokens"] = getattr( event.message.usage, "cache_read_input_tokens", 0 ) + # Capture raw usage metadata for backend processing + usage["raw_usage"] = event.message.usage # Handle usage stats from message_delta event if hasattr(event, "usage") and event.usage: @@ -262,6 +267,9 @@ def extract_anthropic_usage_from_event(event: Any) -> TokenUsage: if web_search_count > 0: usage["web_search_count"] = web_search_count + # Capture raw usage metadata for backend processing + usage["raw_usage"] = event.usage + return usage diff --git a/posthog/ai/gemini/gemini_converter.py b/posthog/ai/gemini/gemini_converter.py index 863b5899..a12dd96d 100644 --- a/posthog/ai/gemini/gemini_converter.py +++ b/posthog/ai/gemini/gemini_converter.py @@ -487,6 +487,9 @@ def _extract_usage_from_metadata(metadata: Any) -> TokenUsage: if reasoning_tokens and reasoning_tokens > 0: usage["reasoning_tokens"] = reasoning_tokens + # Capture raw usage metadata for backend processing + usage["raw_usage"] = metadata + return usage diff --git a/posthog/ai/openai/openai_converter.py b/posthog/ai/openai/openai_converter.py index 5b7eac8c..b1992ef1 100644 --- a/posthog/ai/openai/openai_converter.py +++ b/posthog/ai/openai/openai_converter.py @@ -429,6 +429,9 @@ def extract_openai_usage_from_response(response: Any) -> TokenUsage: if web_search_count > 0: result["web_search_count"] = web_search_count + # Capture raw usage metadata for backend processing + result["raw_usage"] = response.usage + return result @@ -482,6 +485,9 @@ def extract_openai_usage_from_chunk( chunk.usage.completion_tokens_details.reasoning_tokens ) + # Capture raw usage metadata for backend processing + usage["raw_usage"] = chunk.usage + elif provider_type == "responses": # For Responses API, usage is only in chunk.response.usage for completed events if hasattr(chunk, "type") and chunk.type == "response.completed": @@ -516,6 +522,9 @@ def extract_openai_usage_from_chunk( if web_search_count > 0: usage["web_search_count"] = web_search_count + # Capture raw usage metadata for backend processing + usage["raw_usage"] = response_usage + return usage diff --git a/posthog/ai/types.py b/posthog/ai/types.py index c549cadc..e2ffa0fd 100644 --- a/posthog/ai/types.py +++ b/posthog/ai/types.py @@ -64,6 +64,7 @@ class TokenUsage(TypedDict, total=False): cache_creation_input_tokens: Optional[int] reasoning_tokens: Optional[int] web_search_count: Optional[int] + raw_usage: Optional[Any] # Raw provider usage metadata for backend processing class ProviderResponse(TypedDict, total=False): diff --git a/posthog/ai/utils.py b/posthog/ai/utils.py index de110146..77d7bf40 100644 --- a/posthog/ai/utils.py +++ b/posthog/ai/utils.py @@ -60,6 +60,11 @@ def merge_usage_stats( current = target.get("web_search_count") or 0 target["web_search_count"] = max(current, source_web_search) + # Always replace raw_usage with latest (don't add or merge) + source_raw_usage = source.get("raw_usage") + if source_raw_usage is not None: + target["raw_usage"] = source_raw_usage + elif mode == "cumulative": # Replace with latest values (already cumulative) if source.get("input_tokens") is not None: @@ -76,6 +81,8 @@ def merge_usage_stats( target["reasoning_tokens"] = source["reasoning_tokens"] if source.get("web_search_count") is not None: target["web_search_count"] = source["web_search_count"] + if source.get("raw_usage") is not None: + target["raw_usage"] = source["raw_usage"] else: raise ValueError(f"Invalid mode: {mode}. Must be 'incremental' or 'cumulative'") @@ -332,6 +339,10 @@ def call_llm_and_track_usage( if web_search_count is not None and web_search_count > 0: tag("$ai_web_search_count", web_search_count) + raw_usage = usage.get("raw_usage") + if raw_usage is not None: + tag("$ai_usage", raw_usage) + if posthog_distinct_id is None: tag("$process_person_profile", False) @@ -457,6 +468,10 @@ async def call_llm_and_track_usage_async( if web_search_count is not None and web_search_count > 0: tag("$ai_web_search_count", web_search_count) + raw_usage = usage.get("raw_usage") + if raw_usage is not None: + tag("$ai_usage", raw_usage) + if posthog_distinct_id is None: tag("$process_person_profile", False) @@ -594,6 +609,11 @@ def capture_streaming_event( ): event_properties["$ai_web_search_count"] = web_search_count + # Add raw usage metadata if present (all providers) + raw_usage = event_data["usage_stats"].get("raw_usage") + if raw_usage is not None: + event_properties["$ai_usage"] = raw_usage + # Handle provider-specific fields if ( event_data["provider"] == "openai" diff --git a/posthog/test/ai/anthropic/test_anthropic.py b/posthog/test/ai/anthropic/test_anthropic.py index 4617f801..50d6014d 100644 --- a/posthog/test/ai/anthropic/test_anthropic.py +++ b/posthog/test/ai/anthropic/test_anthropic.py @@ -306,6 +306,9 @@ def test_basic_completion(mock_client, mock_anthropic_response): assert props["$ai_http_status"] == 200 assert props["foo"] == "bar" assert isinstance(props["$ai_latency"], float) + # Verify raw usage metadata is passed for backend processing + assert "$ai_usage" in props + assert props["$ai_usage"] is not None def test_groups(mock_client, mock_anthropic_response): diff --git a/posthog/test/ai/gemini/test_gemini.py b/posthog/test/ai/gemini/test_gemini.py index 0cf763ba..ee74fbdd 100644 --- a/posthog/test/ai/gemini/test_gemini.py +++ b/posthog/test/ai/gemini/test_gemini.py @@ -174,6 +174,9 @@ def test_new_client_basic_generation( assert props["foo"] == "bar" assert "$ai_trace_id" in props assert props["$ai_latency"] > 0 + # Verify raw usage metadata is passed for backend processing + assert "$ai_usage" in props + assert props["$ai_usage"] is not None def test_new_client_streaming_with_generate_content_stream( diff --git a/posthog/test/ai/openai/test_openai.py b/posthog/test/ai/openai/test_openai.py index 116ba2d1..367b7316 100644 --- a/posthog/test/ai/openai/test_openai.py +++ b/posthog/test/ai/openai/test_openai.py @@ -496,6 +496,9 @@ def test_basic_completion(mock_client, mock_openai_response): assert props["$ai_http_status"] == 200 assert props["foo"] == "bar" assert isinstance(props["$ai_latency"], float) + # Verify raw usage metadata is passed for backend processing + assert "$ai_usage" in props + assert props["$ai_usage"] is not None def test_embeddings(mock_client, mock_embedding_response): From 7ea7f4284c35c0e6b5f6275f36b186774d8fb129 Mon Sep 17 00:00:00 2001 From: Richard Solomou Date: Tue, 27 Jan 2026 19:02:13 +0200 Subject: [PATCH 2/4] fix: add serialize_raw_usage helper to ensure JSON serializability Address PR review feedback from @andrewm4894: 1. **Serialization**: Add serialize_raw_usage() helper with fallback chain: - .model_dump() for Pydantic models (OpenAI/Anthropic) - .to_dict() for protobuf-like objects - vars() for simple objects - str() as last resort This ensures we never pass unserializable objects to PostHog client. 2. **Data loss prevention**: Change from replacing to merging raw_usage in incremental mode. For Anthropic streaming, message_start has input token details and message_delta has output token details - merging preserves both instead of losing input data. 3. **Test coverage**: Enhanced tests to verify: - JSON serializability with json.dumps() - Expected structure of raw_usage dicts - Coverage for both non-streaming and streaming modes - Fixed Gemini test mocks to return proper dicts from model_dump() Co-Authored-By: Claude Sonnet 4.5 --- posthog/ai/utils.py | 74 +++++++++++++++++++-- posthog/test/ai/anthropic/test_anthropic.py | 17 +++++ posthog/test/ai/gemini/test_gemini.py | 52 +++++++++++++++ posthog/test/ai/openai/test_openai.py | 17 +++++ 4 files changed, 154 insertions(+), 6 deletions(-) diff --git a/posthog/ai/utils.py b/posthog/ai/utils.py index 77d7bf40..f3540747 100644 --- a/posthog/ai/utils.py +++ b/posthog/ai/utils.py @@ -13,6 +13,54 @@ from posthog.client import Client as PostHogClient +def serialize_raw_usage(raw_usage: Any) -> Optional[Dict[str, Any]]: + """ + Convert raw provider usage objects to JSON-serializable dicts. + + Handles Pydantic models (OpenAI/Anthropic) and protobuf-like objects (Gemini) + with a fallback chain to ensure we never pass unserializable objects to PostHog. + + Args: + raw_usage: Raw usage object from provider SDK + + Returns: + Plain dict or None if conversion fails + """ + if raw_usage is None: + return None + + # Already a dict + if isinstance(raw_usage, dict): + return raw_usage + + # Try Pydantic model_dump() (OpenAI/Anthropic) + if hasattr(raw_usage, "model_dump") and callable(raw_usage.model_dump): + try: + return raw_usage.model_dump() + except Exception: + pass + + # Try to_dict() (some protobuf objects) + if hasattr(raw_usage, "to_dict") and callable(raw_usage.to_dict): + try: + return raw_usage.to_dict() + except Exception: + pass + + # Try __dict__ / vars() for simple objects + try: + return vars(raw_usage) + except Exception: + pass + + # Last resort: convert to string representation + # This ensures we always return something rather than failing + try: + return {"_raw": str(raw_usage)} + except Exception: + return None + + def merge_usage_stats( target: TokenUsage, source: TokenUsage, mode: str = "incremental" ) -> None: @@ -60,10 +108,16 @@ def merge_usage_stats( current = target.get("web_search_count") or 0 target["web_search_count"] = max(current, source_web_search) - # Always replace raw_usage with latest (don't add or merge) + # Merge raw_usage to avoid losing data from earlier events + # For Anthropic streaming: message_start has input tokens, message_delta has output source_raw_usage = source.get("raw_usage") if source_raw_usage is not None: - target["raw_usage"] = source_raw_usage + serialized = serialize_raw_usage(source_raw_usage) + if serialized: + current_raw = target.get("raw_usage", {}) + if not isinstance(current_raw, dict): + current_raw = {} + target["raw_usage"] = {**current_raw, **serialized} elif mode == "cumulative": # Replace with latest values (already cumulative) @@ -82,7 +136,9 @@ def merge_usage_stats( if source.get("web_search_count") is not None: target["web_search_count"] = source["web_search_count"] if source.get("raw_usage") is not None: - target["raw_usage"] = source["raw_usage"] + serialized = serialize_raw_usage(source["raw_usage"]) + if serialized: + target["raw_usage"] = serialized else: raise ValueError(f"Invalid mode: {mode}. Must be 'incremental' or 'cumulative'") @@ -341,7 +397,9 @@ def call_llm_and_track_usage( raw_usage = usage.get("raw_usage") if raw_usage is not None: - tag("$ai_usage", raw_usage) + serialized = serialize_raw_usage(raw_usage) + if serialized: + tag("$ai_usage", serialized) if posthog_distinct_id is None: tag("$process_person_profile", False) @@ -470,7 +528,9 @@ async def call_llm_and_track_usage_async( raw_usage = usage.get("raw_usage") if raw_usage is not None: - tag("$ai_usage", raw_usage) + serialized = serialize_raw_usage(raw_usage) + if serialized: + tag("$ai_usage", serialized) if posthog_distinct_id is None: tag("$process_person_profile", False) @@ -612,7 +672,9 @@ def capture_streaming_event( # Add raw usage metadata if present (all providers) raw_usage = event_data["usage_stats"].get("raw_usage") if raw_usage is not None: - event_properties["$ai_usage"] = raw_usage + serialized = serialize_raw_usage(raw_usage) + if serialized: + event_properties["$ai_usage"] = serialized # Handle provider-specific fields if ( diff --git a/posthog/test/ai/anthropic/test_anthropic.py b/posthog/test/ai/anthropic/test_anthropic.py index 50d6014d..384f4761 100644 --- a/posthog/test/ai/anthropic/test_anthropic.py +++ b/posthog/test/ai/anthropic/test_anthropic.py @@ -1,3 +1,4 @@ +import json from unittest.mock import patch import pytest @@ -309,6 +310,12 @@ def test_basic_completion(mock_client, mock_anthropic_response): # Verify raw usage metadata is passed for backend processing assert "$ai_usage" in props assert props["$ai_usage"] is not None + # Verify it's JSON-serializable + json.dumps(props["$ai_usage"]) + # Verify it has expected structure + assert isinstance(props["$ai_usage"], dict) + assert "input_tokens" in props["$ai_usage"] + assert "output_tokens" in props["$ai_usage"] def test_groups(mock_client, mock_anthropic_response): @@ -921,6 +928,16 @@ def test_streaming_with_tool_calls(mock_client, mock_anthropic_stream_with_tools assert props["$ai_cache_read_input_tokens"] == 5 assert props["$ai_cache_creation_input_tokens"] == 0 + # Verify raw usage is captured in streaming mode (merged from events) + assert "$ai_usage" in props + assert props["$ai_usage"] is not None + # Verify it's JSON-serializable + json.dumps(props["$ai_usage"]) + # Verify it has expected structure (merged from message_start and message_delta) + assert isinstance(props["$ai_usage"], dict) + assert "input_tokens" in props["$ai_usage"] + assert "output_tokens" in props["$ai_usage"] + def test_async_streaming_with_tool_calls(mock_client, mock_anthropic_stream_with_tools): """Test that tool calls are properly captured in async streaming mode.""" diff --git a/posthog/test/ai/gemini/test_gemini.py b/posthog/test/ai/gemini/test_gemini.py index ee74fbdd..0adaa151 100644 --- a/posthog/test/ai/gemini/test_gemini.py +++ b/posthog/test/ai/gemini/test_gemini.py @@ -1,3 +1,4 @@ +import json from unittest.mock import MagicMock, patch import pytest @@ -34,6 +35,13 @@ def mock_gemini_response(): # Ensure cache and reasoning tokens are not present (not MagicMock) mock_usage.cached_content_token_count = 0 mock_usage.thoughts_token_count = 0 + # Make model_dump() return a proper dict for serialization + mock_usage.model_dump.return_value = { + "prompt_token_count": 20, + "candidates_token_count": 10, + "cached_content_token_count": 0, + "thoughts_token_count": 0, + } mock_response.usage_metadata = mock_usage mock_candidate = MagicMock() @@ -69,6 +77,13 @@ def mock_gemini_response_with_function_calls(): mock_usage.candidates_token_count = 15 mock_usage.cached_content_token_count = 0 mock_usage.thoughts_token_count = 0 + # Make model_dump() return a proper dict for serialization + mock_usage.model_dump.return_value = { + "prompt_token_count": 25, + "candidates_token_count": 15, + "cached_content_token_count": 0, + "thoughts_token_count": 0, + } mock_response.usage_metadata = mock_usage # Mock function call @@ -117,6 +132,13 @@ def mock_gemini_response_function_calls_only(): mock_usage.candidates_token_count = 12 mock_usage.cached_content_token_count = 0 mock_usage.thoughts_token_count = 0 + # Make model_dump() return a proper dict for serialization + mock_usage.model_dump.return_value = { + "prompt_token_count": 30, + "candidates_token_count": 12, + "cached_content_token_count": 0, + "thoughts_token_count": 0, + } mock_response.usage_metadata = mock_usage # Mock function call @@ -177,6 +199,12 @@ def test_new_client_basic_generation( # Verify raw usage metadata is passed for backend processing assert "$ai_usage" in props assert props["$ai_usage"] is not None + # Verify it's JSON-serializable + json.dumps(props["$ai_usage"]) + # Verify it has expected structure + assert isinstance(props["$ai_usage"], dict) + assert "prompt_token_count" in props["$ai_usage"] + assert "candidates_token_count" in props["$ai_usage"] def test_new_client_streaming_with_generate_content_stream( @@ -813,6 +841,13 @@ def test_streaming_cache_and_reasoning_tokens(mock_client, mock_google_genai_cli chunk1_usage.candidates_token_count = 5 chunk1_usage.cached_content_token_count = 30 # Cache tokens chunk1_usage.thoughts_token_count = 0 + # Make model_dump() return a proper dict for serialization + chunk1_usage.model_dump.return_value = { + "prompt_token_count": 100, + "candidates_token_count": 5, + "cached_content_token_count": 30, + "thoughts_token_count": 0, + } chunk1.usage_metadata = chunk1_usage chunk2 = MagicMock() @@ -822,6 +857,13 @@ def test_streaming_cache_and_reasoning_tokens(mock_client, mock_google_genai_cli chunk2_usage.candidates_token_count = 10 chunk2_usage.cached_content_token_count = 30 # Same cache tokens chunk2_usage.thoughts_token_count = 5 # Reasoning tokens + # Make model_dump() return a proper dict for serialization + chunk2_usage.model_dump.return_value = { + "prompt_token_count": 100, + "candidates_token_count": 10, + "cached_content_token_count": 30, + "thoughts_token_count": 5, + } chunk2.usage_metadata = chunk2_usage mock_stream = iter([chunk1, chunk2]) @@ -851,6 +893,16 @@ def test_streaming_cache_and_reasoning_tokens(mock_client, mock_google_genai_cli assert props["$ai_cache_read_input_tokens"] == 30 assert props["$ai_reasoning_tokens"] == 5 + # Verify raw usage is captured in streaming mode (merged from chunks) + assert "$ai_usage" in props + assert props["$ai_usage"] is not None + # Verify it's JSON-serializable + json.dumps(props["$ai_usage"]) + # Verify it has expected structure + assert isinstance(props["$ai_usage"], dict) + assert "prompt_token_count" in props["$ai_usage"] + assert "candidates_token_count" in props["$ai_usage"] + def test_web_search_grounding(mock_client, mock_google_genai_client): """Test web search detection via grounding_metadata.""" diff --git a/posthog/test/ai/openai/test_openai.py b/posthog/test/ai/openai/test_openai.py index 367b7316..0fd216b0 100644 --- a/posthog/test/ai/openai/test_openai.py +++ b/posthog/test/ai/openai/test_openai.py @@ -1,3 +1,4 @@ +import json import time from unittest.mock import AsyncMock, patch @@ -499,6 +500,12 @@ def test_basic_completion(mock_client, mock_openai_response): # Verify raw usage metadata is passed for backend processing assert "$ai_usage" in props assert props["$ai_usage"] is not None + # Verify it's JSON-serializable + json.dumps(props["$ai_usage"]) + # Verify it has expected structure + assert isinstance(props["$ai_usage"], dict) + assert "prompt_tokens" in props["$ai_usage"] + assert "completion_tokens" in props["$ai_usage"] def test_embeddings(mock_client, mock_embedding_response): @@ -925,6 +932,16 @@ def test_streaming_with_tool_calls(mock_client, streaming_tool_call_chunks): assert props["$ai_input_tokens"] == 20 assert props["$ai_output_tokens"] == 15 + # Verify raw usage is captured in streaming mode + assert "$ai_usage" in props + assert props["$ai_usage"] is not None + # Verify it's JSON-serializable + json.dumps(props["$ai_usage"]) + # Verify it has expected structure (merged from chunks) + assert isinstance(props["$ai_usage"], dict) + assert "prompt_tokens" in props["$ai_usage"] + assert "completion_tokens" in props["$ai_usage"] + # test responses api def test_responses_api(mock_client, mock_openai_response_with_responses_api): From dbdc5cd0d0794e43f635c95d5b2f68b288205868 Mon Sep 17 00:00:00 2001 From: Richard Solomou Date: Tue, 27 Jan 2026 19:28:20 +0200 Subject: [PATCH 3/4] refactor: move raw_usage serialization from utils to converters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address PR feedback from @andrewm4894 - serialize in converters, not utils. **Problem:** Utils was receiving raw Pydantic/protobuf objects and serializing them, which meant provider-specific knowledge leaked into generic code. **Solution:** Move serialization into converters where provider context exists: Converters (NEW): - OpenAI: serialize_raw_usage(response.usage) → dict - Anthropic: serialize_raw_usage(event.usage) → dict - Gemini: serialize_raw_usage(metadata) → dict Utils (SIMPLIFIED): - Just passes dicts through, no serialization needed - Merge operations work with dicts only **Benefits:** 1. Type correctness: raw_usage is always Dict[str, Any] 2. Separation of concerns: converters handle provider formats 3. Fail fast: serialization errors in converters with context 4. Cleaner abstraction: utils doesn't know about Pydantic/protobuf **Flow:** Provider object → Converter serializes → dict → Utils → PostHog Co-Authored-By: Claude Sonnet 4.5 --- posthog/ai/anthropic/anthropic_converter.py | 16 ++++++++-- posthog/ai/gemini/gemini_converter.py | 6 +++- posthog/ai/openai/openai_converter.py | 16 ++++++++-- posthog/ai/utils.py | 33 +++++++++------------ 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/posthog/ai/anthropic/anthropic_converter.py b/posthog/ai/anthropic/anthropic_converter.py index df19b1ab..011f3437 100644 --- a/posthog/ai/anthropic/anthropic_converter.py +++ b/posthog/ai/anthropic/anthropic_converter.py @@ -17,6 +17,7 @@ TokenUsage, ToolInProgress, ) +from posthog.ai.utils import serialize_raw_usage def format_anthropic_response(response: Any) -> List[FormattedMessage]: @@ -222,7 +223,10 @@ def extract_anthropic_usage_from_response(response: Any) -> TokenUsage: result["web_search_count"] = web_search_count # Capture raw usage metadata for backend processing - result["raw_usage"] = response.usage + # Serialize to dict here in the converter (not in utils) + serialized = serialize_raw_usage(response.usage) + if serialized: + result["raw_usage"] = serialized return result @@ -251,7 +255,10 @@ def extract_anthropic_usage_from_event(event: Any) -> TokenUsage: event.message.usage, "cache_read_input_tokens", 0 ) # Capture raw usage metadata for backend processing - usage["raw_usage"] = event.message.usage + # Serialize to dict here in the converter (not in utils) + serialized = serialize_raw_usage(event.message.usage) + if serialized: + usage["raw_usage"] = serialized # Handle usage stats from message_delta event if hasattr(event, "usage") and event.usage: @@ -268,7 +275,10 @@ def extract_anthropic_usage_from_event(event: Any) -> TokenUsage: usage["web_search_count"] = web_search_count # Capture raw usage metadata for backend processing - usage["raw_usage"] = event.usage + # Serialize to dict here in the converter (not in utils) + serialized = serialize_raw_usage(event.usage) + if serialized: + usage["raw_usage"] = serialized return usage diff --git a/posthog/ai/gemini/gemini_converter.py b/posthog/ai/gemini/gemini_converter.py index a12dd96d..1d521105 100644 --- a/posthog/ai/gemini/gemini_converter.py +++ b/posthog/ai/gemini/gemini_converter.py @@ -12,6 +12,7 @@ FormattedMessage, TokenUsage, ) +from posthog.ai.utils import serialize_raw_usage class GeminiPart(TypedDict, total=False): @@ -488,7 +489,10 @@ def _extract_usage_from_metadata(metadata: Any) -> TokenUsage: usage["reasoning_tokens"] = reasoning_tokens # Capture raw usage metadata for backend processing - usage["raw_usage"] = metadata + # Serialize to dict here in the converter (not in utils) + serialized = serialize_raw_usage(metadata) + if serialized: + usage["raw_usage"] = serialized return usage diff --git a/posthog/ai/openai/openai_converter.py b/posthog/ai/openai/openai_converter.py index b1992ef1..4d8bbe22 100644 --- a/posthog/ai/openai/openai_converter.py +++ b/posthog/ai/openai/openai_converter.py @@ -16,6 +16,7 @@ FormattedTextContent, TokenUsage, ) +from posthog.ai.utils import serialize_raw_usage def format_openai_response(response: Any) -> List[FormattedMessage]: @@ -430,7 +431,10 @@ def extract_openai_usage_from_response(response: Any) -> TokenUsage: result["web_search_count"] = web_search_count # Capture raw usage metadata for backend processing - result["raw_usage"] = response.usage + # Serialize to dict here in the converter (not in utils) + serialized = serialize_raw_usage(response.usage) + if serialized: + result["raw_usage"] = serialized return result @@ -486,7 +490,10 @@ def extract_openai_usage_from_chunk( ) # Capture raw usage metadata for backend processing - usage["raw_usage"] = chunk.usage + # Serialize to dict here in the converter (not in utils) + serialized = serialize_raw_usage(chunk.usage) + if serialized: + usage["raw_usage"] = serialized elif provider_type == "responses": # For Responses API, usage is only in chunk.response.usage for completed events @@ -523,7 +530,10 @@ def extract_openai_usage_from_chunk( usage["web_search_count"] = web_search_count # Capture raw usage metadata for backend processing - usage["raw_usage"] = response_usage + # Serialize to dict here in the converter (not in utils) + serialized = serialize_raw_usage(response_usage) + if serialized: + usage["raw_usage"] = serialized return usage diff --git a/posthog/ai/utils.py b/posthog/ai/utils.py index f3540747..3e5ae829 100644 --- a/posthog/ai/utils.py +++ b/posthog/ai/utils.py @@ -110,14 +110,13 @@ def merge_usage_stats( # Merge raw_usage to avoid losing data from earlier events # For Anthropic streaming: message_start has input tokens, message_delta has output + # Note: raw_usage is already serialized by converters, so it's a dict source_raw_usage = source.get("raw_usage") - if source_raw_usage is not None: - serialized = serialize_raw_usage(source_raw_usage) - if serialized: - current_raw = target.get("raw_usage", {}) - if not isinstance(current_raw, dict): - current_raw = {} - target["raw_usage"] = {**current_raw, **serialized} + if source_raw_usage is not None and isinstance(source_raw_usage, dict): + current_raw = target.get("raw_usage", {}) + if not isinstance(current_raw, dict): + current_raw = {} + target["raw_usage"] = {**current_raw, **source_raw_usage} elif mode == "cumulative": # Replace with latest values (already cumulative) @@ -135,10 +134,9 @@ def merge_usage_stats( target["reasoning_tokens"] = source["reasoning_tokens"] if source.get("web_search_count") is not None: target["web_search_count"] = source["web_search_count"] + # Note: raw_usage is already serialized by converters, so it's a dict if source.get("raw_usage") is not None: - serialized = serialize_raw_usage(source["raw_usage"]) - if serialized: - target["raw_usage"] = serialized + target["raw_usage"] = source["raw_usage"] else: raise ValueError(f"Invalid mode: {mode}. Must be 'incremental' or 'cumulative'") @@ -397,9 +395,8 @@ def call_llm_and_track_usage( raw_usage = usage.get("raw_usage") if raw_usage is not None: - serialized = serialize_raw_usage(raw_usage) - if serialized: - tag("$ai_usage", serialized) + # Already serialized by converters + tag("$ai_usage", raw_usage) if posthog_distinct_id is None: tag("$process_person_profile", False) @@ -528,9 +525,8 @@ async def call_llm_and_track_usage_async( raw_usage = usage.get("raw_usage") if raw_usage is not None: - serialized = serialize_raw_usage(raw_usage) - if serialized: - tag("$ai_usage", serialized) + # Already serialized by converters + tag("$ai_usage", raw_usage) if posthog_distinct_id is None: tag("$process_person_profile", False) @@ -672,9 +668,8 @@ def capture_streaming_event( # Add raw usage metadata if present (all providers) raw_usage = event_data["usage_stats"].get("raw_usage") if raw_usage is not None: - serialized = serialize_raw_usage(raw_usage) - if serialized: - event_properties["$ai_usage"] = serialized + # Already serialized by converters + event_properties["$ai_usage"] = raw_usage # Handle provider-specific fields if ( From e880de3c2c2347d2a791ed4866d526a88e463ab5 Mon Sep 17 00:00:00 2001 From: Richard Solomou Date: Tue, 27 Jan 2026 20:36:42 +0200 Subject: [PATCH 4/4] fix: add type annotation for current_raw to satisfy mypy Fix mypy error: "Need type annotation for 'current_raw'" Extract value first, then apply explicit type annotation with ternary conditional to satisfy mypy's type checker. Co-Authored-By: Claude Sonnet 4.5 --- posthog/ai/utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/posthog/ai/utils.py b/posthog/ai/utils.py index 3e5ae829..3c4afe05 100644 --- a/posthog/ai/utils.py +++ b/posthog/ai/utils.py @@ -113,9 +113,10 @@ def merge_usage_stats( # Note: raw_usage is already serialized by converters, so it's a dict source_raw_usage = source.get("raw_usage") if source_raw_usage is not None and isinstance(source_raw_usage, dict): - current_raw = target.get("raw_usage", {}) - if not isinstance(current_raw, dict): - current_raw = {} + current_raw_value = target.get("raw_usage") + current_raw: Dict[str, Any] = ( + current_raw_value if isinstance(current_raw_value, dict) else {} + ) target["raw_usage"] = {**current_raw, **source_raw_usage} elif mode == "cumulative":