diff --git a/src/openlayer/lib/data/_upload.py b/src/openlayer/lib/data/_upload.py index 69333ff5..84b26ec8 100644 --- a/src/openlayer/lib/data/_upload.py +++ b/src/openlayer/lib/data/_upload.py @@ -4,9 +4,10 @@ different storage backends. """ +import io import os from enum import Enum -from typing import Optional +from typing import BinaryIO, Dict, Optional, Union import requests from requests.adapters import Response @@ -35,6 +36,135 @@ class StorageType(Enum): VERIFY_REQUESTS = True +# ----- Low-level upload functions (work with bytes or file-like objects) ---- # +def upload_bytes( + storage: StorageType, + url: str, + data: Union[bytes, BinaryIO], + object_name: str, + content_type: str, + fields: Optional[Dict] = None, +) -> Response: + """Upload data to the appropriate storage backend. + + This is a convenience function that routes to the correct upload method + based on the storage type. + + Args: + storage: The storage backend type. + url: The presigned URL to upload to. + data: The data to upload (bytes or file-like object). + object_name: The object name (used for multipart uploads). + content_type: The MIME type of the data. + fields: Additional fields for multipart uploads (S3 policy fields). + + Returns: + The response from the upload request. + """ + if storage == StorageType.AWS: + return upload_bytes_multipart( + url=url, + data=data, + object_name=object_name, + content_type=content_type, + fields=fields, + ) + elif storage == StorageType.GCP: + return upload_bytes_put( + url=url, + data=data, + content_type=content_type, + ) + elif storage == StorageType.AZURE: + return upload_bytes_put( + url=url, + data=data, + content_type=content_type, + extra_headers={"x-ms-blob-type": "BlockBlob"}, + ) + else: + # Local storage uses multipart POST (no extra fields) + return upload_bytes_multipart( + url=url, + data=data, + object_name=object_name, + content_type=content_type, + ) + + +def upload_bytes_multipart( + url: str, + data: Union[bytes, BinaryIO], + object_name: str, + content_type: str, + fields: Optional[Dict] = None, +) -> Response: + """Upload data using multipart POST (for S3 and local storage). + + Args: + url: The presigned URL to upload to. + data: The data to upload (bytes or file-like object). + object_name: The object name for the file field. + content_type: The MIME type of the data. + fields: Additional fields to include in the multipart form (e.g., S3 policy fields). + + Returns: + The response from the upload request. + """ + # Convert bytes to file-like object if needed + if isinstance(data, bytes): + data = io.BytesIO(data) + + upload_fields = dict(fields) if fields else {} + upload_fields["file"] = (object_name, data, content_type) + + encoder = MultipartEncoder(fields=upload_fields) + headers = {"Content-Type": encoder.content_type} + + response = requests.post( + url, + data=encoder, + headers=headers, + verify=VERIFY_REQUESTS, + timeout=REQUESTS_TIMEOUT, + ) + response.raise_for_status() + return response + + +def upload_bytes_put( + url: str, + data: Union[bytes, BinaryIO], + content_type: str, + extra_headers: Optional[Dict[str, str]] = None, +) -> Response: + """Upload data using PUT request (for GCS and Azure). + + Args: + url: The presigned URL to upload to. + data: The data to upload (bytes or file-like object). + content_type: The MIME type of the data. + extra_headers: Additional headers (e.g., x-ms-blob-type for Azure). + + Returns: + The response from the upload request. + """ + headers = {"Content-Type": content_type} + if extra_headers: + headers.update(extra_headers) + + response = requests.put( + url, + data=data, + headers=headers, + verify=VERIFY_REQUESTS, + timeout=REQUESTS_TIMEOUT, + ) + response.raise_for_status() + return response + + +# --- High-level Uploader class (file-based uploads with progress tracking) -- # class Uploader: """Internal class to handle http requests""" @@ -105,7 +235,9 @@ def upload_blob_s3( fields = presigned_url_response.fields fields["file"] = (object_name, f, "application/x-tar") e = MultipartEncoder(fields=fields) - m = MultipartEncoderMonitor(e, lambda monitor: t.update(min(t.total, monitor.bytes_read) - t.n)) + m = MultipartEncoderMonitor( + e, lambda monitor: t.update(min(t.total, monitor.bytes_read) - t.n) + ) headers = {"Content-Type": m.content_type} res = requests.post( presigned_url_response.url, @@ -116,7 +248,9 @@ def upload_blob_s3( ) return res - def upload_blob_gcs(self, file_path: str, presigned_url_response: PresignedURLCreateResponse): + def upload_blob_gcs( + self, file_path: str, presigned_url_response: PresignedURLCreateResponse + ): """Generic method to upload data to Google Cloud Storage and create the appropriate resource in the backend. """ @@ -137,7 +271,9 @@ def upload_blob_gcs(self, file_path: str, presigned_url_response: PresignedURLCr ) return res - def upload_blob_azure(self, file_path: str, presigned_url_response: PresignedURLCreateResponse): + def upload_blob_azure( + self, file_path: str, presigned_url_response: PresignedURLCreateResponse + ): """Generic method to upload data to Azure Blob Storage and create the appropriate resource in the backend. """ @@ -180,7 +316,9 @@ def upload_blob_local( with open(file_path, "rb") as f: fields = {"file": (object_name, f, "application/x-tar")} e = MultipartEncoder(fields=fields) - m = MultipartEncoderMonitor(e, lambda monitor: t.update(min(t.total, monitor.bytes_read) - t.n)) + m = MultipartEncoderMonitor( + e, lambda monitor: t.update(min(t.total, monitor.bytes_read) - t.n) + ) headers = {"Content-Type": m.content_type} res = requests.post( presigned_url_response.url, diff --git a/src/openlayer/lib/integrations/openai_tracer.py b/src/openlayer/lib/integrations/openai_tracer.py index 8230c53f..27f120fc 100644 --- a/src/openlayer/lib/integrations/openai_tracer.py +++ b/src/openlayer/lib/integrations/openai_tracer.py @@ -2,9 +2,11 @@ import json import logging +import mimetypes +import re import time from functools import wraps -from typing import Any, Dict, Iterator, List, Optional, Union, TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Union try: import openai @@ -17,6 +19,14 @@ import openai from ..tracing import tracer +from ..tracing.attachments import Attachment +from ..tracing.content import ( + AudioContent, + ContentItem, + FileContent, + ImageContent, + TextContent, +) logger = logging.getLogger(__name__) @@ -55,7 +65,9 @@ def trace_openai( The patched OpenAI client. """ if not HAVE_OPENAI: - raise ImportError("OpenAI library is not installed. Please install it with: pip install openai") + raise ImportError( + "OpenAI library is not installed. Please install it with: pip install openai" + ) is_azure_openai = isinstance(client, openai.AzureOpenAI) @@ -74,7 +86,6 @@ def traced_chat_create_func(*args, **kwargs): create_func=chat_create_func, inference_id=inference_id, is_azure_openai=is_azure_openai, - api_type="chat_completions", ) return handle_non_streaming_create( *args, @@ -82,13 +93,12 @@ def traced_chat_create_func(*args, **kwargs): create_func=chat_create_func, inference_id=inference_id, is_azure_openai=is_azure_openai, - api_type="chat_completions", ) client.chat.completions.create = traced_chat_create_func # Patch parse method if it exists - if hasattr(client.chat.completions, 'parse'): + if hasattr(client.chat.completions, "parse"): parse_func = client.chat.completions.parse @wraps(parse_func) @@ -151,7 +161,6 @@ def handle_streaming_create( *args, is_azure_openai: bool = False, inference_id: Optional[str] = None, - api_type: str = "chat_completions", **kwargs, ) -> Iterator[Any]: """Handles the create method when streaming is enabled. @@ -214,12 +223,16 @@ def stream_chunks( if delta.function_call.name: collected_function_call["name"] += delta.function_call.name if delta.function_call.arguments: - collected_function_call["arguments"] += delta.function_call.arguments + collected_function_call[ + "arguments" + ] += delta.function_call.arguments elif delta.tool_calls: if delta.tool_calls[0].function.name: collected_function_call["name"] += delta.tool_calls[0].function.name if delta.tool_calls[0].function.arguments: - collected_function_call["arguments"] += delta.tool_calls[0].function.arguments + collected_function_call["arguments"] += delta.tool_calls[ + 0 + ].function.arguments yield chunk end_time = time.time() @@ -230,16 +243,21 @@ def stream_chunks( finally: # Try to add step to the trace try: - collected_output_data = [message for message in collected_output_data if message is not None] + collected_output_data = [ + message for message in collected_output_data if message is not None + ] if collected_output_data: output_data = "".join(collected_output_data) else: - collected_function_call["arguments"] = json.loads(collected_function_call["arguments"]) + collected_function_call["arguments"] = json.loads( + collected_function_call["arguments"] + ) output_data = collected_function_call + processed_messages = extract_chat_completion_messages(kwargs["messages"]) trace_args = create_trace_args( end_time=end_time, - inputs={"prompt": kwargs["messages"]}, + inputs={"prompt": processed_messages}, output=output_data, latency=latency, tokens=num_of_completion_tokens, @@ -249,7 +267,13 @@ def stream_chunks( model_parameters=get_model_parameters(kwargs), raw_output=raw_outputs, id=inference_id, - metadata={"timeToFirstToken": ((first_token_time - start_time) * 1000 if first_token_time else None)}, + metadata={ + "timeToFirstToken": ( + (first_token_time - start_time) * 1000 + if first_token_time + else None + ) + }, ) add_to_trace( **trace_args, @@ -314,7 +338,9 @@ def create_trace_args( return trace_args -def add_to_trace(is_azure_openai: bool = False, api_type: str = "chat_completions", **kwargs) -> None: +def add_to_trace( + is_azure_openai: bool = False, api_type: str = "chat_completions", **kwargs +) -> None: """Add a chat completion or responses step to the trace.""" # Remove api_type from kwargs to avoid passing it to the tracer kwargs.pop("api_type", None) @@ -322,15 +348,23 @@ def add_to_trace(is_azure_openai: bool = False, api_type: str = "chat_completion if api_type == "responses": # Handle Responses API tracing if is_azure_openai: - tracer.add_chat_completion_step_to_trace(**kwargs, name="Azure OpenAI Response", provider="Azure") + tracer.add_chat_completion_step_to_trace( + **kwargs, name="Azure OpenAI Response", provider="Azure" + ) else: - tracer.add_chat_completion_step_to_trace(**kwargs, name="OpenAI Response", provider="OpenAI") + tracer.add_chat_completion_step_to_trace( + **kwargs, name="OpenAI Response", provider="OpenAI" + ) else: # Handle Chat Completions API tracing (default behavior) if is_azure_openai: - tracer.add_chat_completion_step_to_trace(**kwargs, name="Azure OpenAI Chat Completion", provider="Azure") + tracer.add_chat_completion_step_to_trace( + **kwargs, name="Azure OpenAI Chat Completion", provider="Azure" + ) else: - tracer.add_chat_completion_step_to_trace(**kwargs, name="OpenAI Chat Completion", provider="OpenAI") + tracer.add_chat_completion_step_to_trace( + **kwargs, name="OpenAI Chat Completion", provider="OpenAI" + ) def handle_non_streaming_create( @@ -338,7 +372,6 @@ def handle_non_streaming_create( *args, is_azure_openai: bool = False, inference_id: Optional[str] = None, - api_type: str = "chat_completions", **kwargs, ) -> Union["openai.types.chat.chat_completion.ChatCompletion", Any]: """Handles the create method when streaming is disabled. @@ -364,9 +397,10 @@ def handle_non_streaming_create( # Try to add step to the trace try: output_data = parse_non_streaming_output_data(response) + processed_messages = extract_chat_completion_messages(kwargs["messages"]) trace_args = create_trace_args( end_time=end_time, - inputs={"prompt": kwargs["messages"]}, + inputs={"prompt": processed_messages}, output=output_data, latency=(end_time - start_time) * 1000, tokens=response.usage.total_tokens, @@ -384,11 +418,242 @@ def handle_non_streaming_create( ) # pylint: disable=broad-except except Exception as e: - logger.error("Failed to trace the create chat completion request with Openlayer. %s", e) + logger.error( + "Failed to trace the create chat completion request with Openlayer. %s", e + ) return response +def extract_chat_completion_messages( + messages: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """Extract and normalize messages from Chat Completions API format. + + Converts OpenAI message format to Openlayer format, extracting media content + (images, audio, files) into Attachment objects wrapped in Content classes. + + OpenAI content types supported: + - "text" -> TextContent + - "image_url" -> ImageContent (URL or base64 data URL) + - "input_audio" -> AudioContent (base64 encoded) + - "file" -> FileContent + + Args: + messages: List of messages in OpenAI Chat Completions format + + Returns: + List of messages with content normalized to Openlayer format. + String content is preserved as-is for backwards compatibility. + List content is converted to ContentItem objects. + """ + processed_messages: List[Dict[str, Any]] = [] + + for message in messages: + role = message.get("role", "user") + content = message.get("content") + name = message.get("name") # For function/tool messages + + # If content is a string, keep it as-is (backwards compatible) + if isinstance(content, str): + processed_msg: Dict[str, Any] = {"role": role, "content": content} + if name: + processed_msg["name"] = name + processed_messages.append(processed_msg) + continue + + # If content is None (e.g., assistant message with tool_calls only) + if content is None: + processed_messages.append(message) + continue + + # If content is a list, process each item + if isinstance(content, list): + normalized_content: List[ContentItem] = [] + for item in content: + content_item = _normalize_content_item(item) + normalized_content.append(content_item) + + processed_msg = {"role": role, "content": normalized_content} + if name: + processed_msg["name"] = name + processed_messages.append(processed_msg) + else: + # Unknown content type, keep as-is + processed_messages.append(message) + + return processed_messages + + +def _normalize_content_item(item: Dict[str, Any]) -> ContentItem: + """Normalize a single content item from OpenAI format to Openlayer format. + + Supports both Chat Completions API and Responses API formats. + + Chat Completions API types: + - "text" -> TextContent + - "image_url" -> ImageContent (URL or base64 data URL) + - "input_audio" -> AudioContent (base64 encoded) + - "file" -> FileContent + + Responses API types: + - "input_text" -> TextContent + - "input_image" -> ImageContent (URL, base64 data URL, or file_id) + + Args: + item: A content item dict from OpenAI message content array + + Returns: + A ContentItem object (TextContent, ImageContent, etc.) or the original item + """ + item_type = item.get("type", "") + + # Text content (both APIs) + if item_type in ("text", "input_text"): + text = item.get("text", "") + return TextContent(text=text) + + # Image content (both APIs - different structures) + elif item_type in ("image_url", "input_image"): + return _normalize_image_content(item, item_type) + + # Audio content (Chat Completions only) + elif item_type == "input_audio": + audio_data = item.get("input_audio", {}) + data = audio_data.get("data", "") + audio_format = audio_data.get("format", "wav") + + media_type = f"audio/{audio_format}" + attachment = Attachment.from_base64( + data_base64=data, + name=f"audio.{audio_format}", + media_type=media_type, + ) + return AudioContent(attachment=attachment) + + # File content (Chat Completions only) + elif item_type == "file": + file_data = item.get("file", {}) + file_id = file_data.get("file_id") + file_data_b64 = file_data.get("file_data") + filename = file_data.get("filename", "file") + + if file_data_b64: + attachment = Attachment.from_base64( + data_base64=file_data_b64, + name=filename, + media_type="application/octet-stream", + ) + elif file_id: + # Just reference the file ID (can't download without API call) + attachment = Attachment(name=filename) + attachment.metadata["openai_file_id"] = file_id + else: + attachment = Attachment(name=filename) + + return FileContent(attachment=attachment) + + else: + # Unknown type, return as TextContent with string representation + logger.debug("Unknown content item type: %s", item_type) + return TextContent(text=str(item)) + + +def _normalize_image_content(item: Dict[str, Any], item_type: str) -> ImageContent: + """Normalize image content from both Chat Completions and Responses API. + + Chat Completions: {"type": "image_url", "image_url": {"url": "..."}} + Responses API: {"type": "input_image", "image_url": "..."} or + {"type": "input_image", "file_id": "..."} + + Args: + item: The content item dict + item_type: The type string ("image_url" or "input_image") + + Returns: + An ImageContent object with the appropriate attachment + """ + if item_type == "input_image": + # Responses API format - flat structure + image_url = item.get("image_url") + file_id = item.get("file_id") + + if file_id: + # File ID reference - just store the reference + attachment = Attachment(name="image") + attachment.metadata["openai_file_id"] = file_id + elif image_url: + attachment = _create_image_attachment_from_url(image_url) + else: + # Fallback - empty attachment + attachment = Attachment(name="image") + else: + # Chat Completions API format - nested structure + image_url_data = item.get("image_url", {}) + url = image_url_data.get("url", "") + attachment = _create_image_attachment_from_url(url) + + return ImageContent(attachment=attachment) + + +def _create_image_attachment_from_url(url: str) -> Attachment: + """Create an Attachment from a URL (handles both regular URLs and data URLs). + + Args: + url: The image URL (can be a regular URL or a data: URL with base64 content) + + Returns: + An Attachment object + """ + if url.startswith("data:"): + # Base64 data URL + return _parse_data_url_to_attachment(url, default_type="image") + else: + # External URL - infer media type from URL or default to image/jpeg + media_type = mimetypes.guess_type(url)[0] + if media_type is None or not media_type.startswith("image/"): + media_type = "image/jpeg" + return Attachment.from_url(url, name="image", media_type=media_type) + + +def _parse_data_url_to_attachment( + data_url: str, default_type: str = "image" +) -> Attachment: + """Parse a data URL (data:image/jpeg;base64,...) into an Attachment. + + Args: + data_url: The data URL string (e.g., "...") + default_type: Default type if parsing fails ("image", "audio", "file") + + Returns: + An Attachment object + """ + # Format: ... + match = re.match(r"data:([^;]+);base64,(.+)", data_url, re.DOTALL) + if match: + media_type = match.group(1) + base64_data = match.group(2) + + # Infer extension from media type + extension = media_type.split("/")[-1] + extension_map = {"jpeg": "jpg", "x-wav": "wav", "mpeg": "mp3"} + extension = extension_map.get(extension, extension) + + return Attachment.from_base64( + data_base64=base64_data, + name=f"{default_type}.{extension}", + media_type=media_type, + ) + + # Fallback - couldn't parse, treat as plain base64 + logger.warning("Could not parse data URL format, treating as raw base64") + return Attachment.from_base64( + data_base64=data_url, + name=default_type, + media_type=f"{default_type}/unknown", + ) + + # -------------------------------- Responses API Handlers -------------------------------- # @@ -446,7 +711,9 @@ def stream_responses_chunks( try: i = 0 for i, chunk in enumerate(chunks): - raw_outputs.append(chunk.model_dump() if hasattr(chunk, "model_dump") else str(chunk)) + raw_outputs.append( + chunk.model_dump() if hasattr(chunk, "model_dump") else str(chunk) + ) if i == 0: first_token_time = time.time() if i > 0: @@ -474,13 +741,17 @@ def stream_responses_chunks( finally: # Try to add step to the trace try: - collected_output_data = [message for message in collected_output_data if message is not None] + collected_output_data = [ + message for message in collected_output_data if message is not None + ] if collected_output_data: output_data = "".join(collected_output_data) else: if collected_function_call["arguments"]: try: - collected_function_call["arguments"] = json.loads(collected_function_call["arguments"]) + collected_function_call["arguments"] = json.loads( + collected_function_call["arguments"] + ) except json.JSONDecodeError: # Keep as string if not valid JSON pass @@ -499,7 +770,11 @@ def stream_responses_chunks( raw_output=raw_outputs, id=inference_id, metadata={ - "timeToFirstToken": ((first_token_time - start_time) * 1000 if first_token_time else None) + "timeToFirstToken": ( + (first_token_time - start_time) * 1000 + if first_token_time + else None + ) }, ) add_to_trace( @@ -558,7 +833,11 @@ def handle_responses_non_streaming_create( completion_tokens=usage_data.get("completion_tokens", 0), model=getattr(response, "model", kwargs.get("model", "unknown")), model_parameters=get_responses_model_parameters(kwargs), - raw_output=response.model_dump() if hasattr(response, "model_dump") else str(response), + raw_output=( + response.model_dump() + if hasattr(response, "model_dump") + else str(response) + ), id=inference_id, ) @@ -613,9 +892,15 @@ def extract_responses_chunk_data(chunk: Any) -> Dict[str, Any]: result["content"] = delta.content elif hasattr(delta, "function_call"): func_call = {} - if hasattr(delta.function_call, "name") and delta.function_call.name: + if ( + hasattr(delta.function_call, "name") + and delta.function_call.name + ): func_call["name"] = delta.function_call.name - if hasattr(delta.function_call, "arguments") and delta.function_call.arguments: + if ( + hasattr(delta.function_call, "arguments") + and delta.function_call.arguments + ): func_call["arguments"] = delta.function_call.arguments if func_call: result["function_call"] = func_call @@ -632,101 +917,161 @@ def extract_responses_inputs(kwargs: Dict[str, Any]) -> Dict[str, Any]: Formats the input as a messages array similar to Chat Completions API format: {"prompt": [{"role": "user", "content": "..."}]} + Handles multimodal inputs (text + images) by normalizing content items + using the same ContentItem format as Chat Completions API. + + Responses API input formats supported: + - Simple string: input="What is 2+2?" + - List of messages: input=[{"role": "user", "content": "..."}] + - Multimodal content: input=[{"role": "user", "content": [ + {"type": "input_text", "text": "What's in this image?"}, + {"type": "input_image", "image_url": "https://..."} + ]}] + Args: kwargs: The parameters passed to the Responses API Returns: Dictionary with prompt as a messages array """ - messages = [] + messages: List[Dict[str, Any]] = [] # Handle different input formats for Responses API - if "conversation" in kwargs: - # Conversation is already in messages format - conversation = kwargs["conversation"] - if isinstance(conversation, list): - messages = conversation - else: - # Single message, wrap it - messages = [{"role": "user", "content": str(conversation)}] - else: - # Build messages array from available parameters - if "instructions" in kwargs: - messages.append({"role": "system", "content": kwargs["instructions"]}) - - if "input" in kwargs: - messages.append({"role": "user", "content": kwargs["input"]}) - elif "prompt" in kwargs: + if "input" in kwargs: + input_value = kwargs["input"] + + if isinstance(input_value, str): + # Simple string input + messages.append({"role": "user", "content": input_value}) + elif isinstance(input_value, list): + # List of messages - process each one for multimodal content + for msg in input_value: + if isinstance(msg, dict): + processed_msg = _process_responses_message(msg) + messages.append(processed_msg) + else: + # Non-dict item, wrap as user message + messages.append({"role": "user", "content": str(msg)}) + + # Add instructions as system message (if present) + if "instructions" in kwargs: + messages.insert(0, {"role": "system", "content": kwargs["instructions"]}) + + # Handle legacy/alternative input parameters + if not messages: + if "prompt" in kwargs: messages.append({"role": "user", "content": kwargs["prompt"]}) - - # If no user message was added, create a fallback - if not any(msg.get("role") == "user" for msg in messages): - if messages: - # Only system message, add empty user message - messages.append({"role": "user", "content": ""}) - else: - # No messages at all, add placeholder - messages.append({"role": "user", "content": "No input provided"}) + + # If no user message was added, create a fallback + if not any(msg.get("role") == "user" for msg in messages): + if messages: + # Only system message, add empty user message + messages.append({"role": "user", "content": ""}) + else: + # No messages at all, add placeholder + messages.append({"role": "user", "content": "No input provided"}) return {"prompt": messages} -def parse_responses_output_data(response: Any) -> Union[str, Dict[str, Any], None]: +def _process_responses_message(message: Dict[str, Any]) -> Dict[str, Any]: + """Process a single message from Responses API input. + + Normalizes multimodal content items (text, images) to Openlayer format. + String content is preserved as-is for backwards compatibility. + + Args: + message: A message dict with 'role' and 'content' keys + + Returns: + Processed message with normalized content + """ + role = message.get("role", "user") + content = message.get("content") + + # String content - keep as-is (backwards compatible) + if isinstance(content, str): + return {"role": role, "content": content} + + # List content - normalize each item + if isinstance(content, list): + normalized_content: List[ContentItem] = [] + for item in content: + if isinstance(item, dict): + content_item = _normalize_content_item(item) + normalized_content.append(content_item) + else: + # Non-dict item, convert to TextContent + normalized_content.append(TextContent(text=str(item))) + return {"role": role, "content": normalized_content} + + # None or other type - return as-is + return message + + +def parse_responses_output_data( + response: Any, +) -> Union[str, List[ContentItem], Dict[str, Any], None]: """Parses the output data from a Responses API response. + Handles text and image generation outputs. For multimodal outputs + (e.g., text + generated images), returns a list of ContentItem objects. + Args: response: The Response object from the Responses API Returns: - The parsed output data + The parsed output data: + - str: For text-only responses (backwards compatible) + - List[ContentItem]: For multimodal responses (text + images) + - Dict: For function/tool call responses + - None: If no output data """ try: - # Handle Response object structure - check for output first (Responses API structure) - if hasattr(response, "output") and response.output: - if isinstance(response.output, list) and response.output: - # Handle list of output messages - first_output = response.output[0] - if hasattr(first_output, "content") and first_output.content: - # Extract text from content list - if isinstance(first_output.content, list) and first_output.content: - text_content = first_output.content[0] - if hasattr(text_content, "text"): - return text_content.text.strip() - elif hasattr(first_output.content, "text"): - return first_output.content.text.strip() - else: - return str(first_output.content).strip() - elif hasattr(first_output, "text"): - return first_output.text.strip() - elif hasattr(response.output, "text"): + if not hasattr(response, "output") or not response.output: + return None + + if not isinstance(response.output, list): + # Handle non-list output + if hasattr(response.output, "text"): return response.output.text.strip() - elif hasattr(response.output, "content"): - return str(response.output.content).strip() - - # Handle Chat Completions style structure (fallback) - if hasattr(response, "choices") and response.choices: - choice = response.choices[0] - if hasattr(choice, "message"): - message = choice.message - if hasattr(message, "content") and message.content: - return message.content.strip() - elif hasattr(message, "function_call"): - return { - "name": message.function_call.name, - "arguments": json.loads(message.function_call.arguments) - if message.function_call.arguments - else {}, - } - elif hasattr(message, "tool_calls") and message.tool_calls: - tool_call = message.tool_calls[0] - return { - "name": tool_call.function.name, - "arguments": json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}, - } - - # Handle direct text response - if hasattr(response, "text") and response.text: - return response.text.strip() + return str(response.output).strip() + + content_items: List[ContentItem] = [] + text_content: Optional[str] = None + + for output_item in response.output: + output_type = getattr(output_item, "type", None) + + if output_type == "message": + # Text message output + extracted_text = _extract_text_from_message_output(output_item) + if extracted_text: + text_content = extracted_text + content_items.append(TextContent(text=extracted_text)) + + elif output_type == "image_generation_call": + # Image generation output - result contains base64 image + image_data = getattr(output_item, "result", None) + if image_data: + attachment = Attachment.from_base64( + data_base64=image_data, + name="generated_image.png", + media_type="image/png", + ) + content_items.append(ImageContent(attachment=attachment)) + + # Return appropriate format based on content + if len(content_items) > 1: + # Multimodal output - return list of ContentItems + return content_items + elif len(content_items) == 1: + if isinstance(content_items[0], TextContent): + # Text-only output - return string (backwards compatible) + return text_content + else: + # Single non-text content item - return as list + return content_items except Exception as e: logger.debug("Could not parse Responses API output data: %s", e) @@ -734,6 +1079,27 @@ def parse_responses_output_data(response: Any) -> Union[str, Dict[str, Any], Non return None +def _extract_text_from_message_output(output_item: Any) -> Optional[str]: + """Extract text content from a message output item. + + Args: + output_item: A ResponseOutputMessage object + + Returns: + The extracted text or None + """ + if hasattr(output_item, "content") and output_item.content: + if isinstance(output_item.content, list) and output_item.content: + first_content = output_item.content[0] + if hasattr(first_content, "text"): + return first_content.text.strip() + elif hasattr(output_item.content, "text"): + return output_item.content.text.strip() + elif hasattr(output_item, "text"): + return output_item.text.strip() + return None + + def extract_responses_usage(response: Any) -> Dict[str, int]: """Extract token usage from a Responses API response. @@ -751,15 +1117,23 @@ def extract_responses_usage(response: Any) -> Dict[str, int]: # Handle ResponseUsage object with different attribute names usage["total_tokens"] = getattr(usage_obj, "total_tokens", 0) # ResponseUsage uses 'input_tokens' instead of 'prompt_tokens' - usage["prompt_tokens"] = getattr(usage_obj, "input_tokens", getattr(usage_obj, "prompt_tokens", 0)) + usage["prompt_tokens"] = getattr( + usage_obj, "input_tokens", getattr(usage_obj, "prompt_tokens", 0) + ) # ResponseUsage uses 'output_tokens' instead of 'completion_tokens' - usage["completion_tokens"] = getattr(usage_obj, "output_tokens", getattr(usage_obj, "completion_tokens", 0)) + usage["completion_tokens"] = getattr( + usage_obj, "output_tokens", getattr(usage_obj, "completion_tokens", 0) + ) elif hasattr(response, "token_usage"): # Alternative usage attribute name usage_obj = response.token_usage usage["total_tokens"] = getattr(usage_obj, "total_tokens", 0) - usage["prompt_tokens"] = getattr(usage_obj, "input_tokens", getattr(usage_obj, "prompt_tokens", 0)) - usage["completion_tokens"] = getattr(usage_obj, "output_tokens", getattr(usage_obj, "completion_tokens", 0)) + usage["prompt_tokens"] = getattr( + usage_obj, "input_tokens", getattr(usage_obj, "prompt_tokens", 0) + ) + usage["completion_tokens"] = getattr( + usage_obj, "output_tokens", getattr(usage_obj, "completion_tokens", 0) + ) except Exception as e: logger.debug("Could not extract token usage from Responses API response: %s", e) @@ -783,38 +1157,81 @@ def get_responses_model_parameters(kwargs: Dict[str, Any]) -> Dict[str, Any]: def parse_non_streaming_output_data( response: "openai.types.chat.chat_completion.ChatCompletion", -) -> Union[str, Dict[str, Any], None]: +) -> Union[str, List[ContentItem], Dict[str, Any], None]: """Parses the output data from a non-streaming completion. + Handles text, audio, and function call outputs. For multimodal outputs + (e.g., audio responses), returns a list of ContentItem objects. + Parameters ---------- response : openai.types.chat.chat_completion.ChatCompletion The chat completion response. + Returns ------- - Union[str, Dict[str, Any], None] - The parsed output data. + Union[str, List[ContentItem], Dict[str, Any], None] + The parsed output data: + - str: For text-only responses (backwards compatible) + - List[ContentItem]: For multimodal responses (text + audio) + - Dict: For function/tool call responses + - None: If no output data """ - output_content = response.choices[0].message.content - output_function_call = response.choices[0].message.function_call - output_tool_calls = response.choices[0].message.tool_calls + message = response.choices[0].message + output_content = message.content + output_audio = getattr(message, "audio", None) + output_function_call = message.function_call + output_tool_calls = message.tool_calls + + # Check for audio output (multimodal response) + if output_audio is not None: + content_items: List[ContentItem] = [] + + # Add text content (transcript) if available + transcript = getattr(output_audio, "transcript", None) + if transcript: + content_items.append(TextContent(text=transcript)) + + # Add audio content + audio_data = getattr(output_audio, "data", None) + if audio_data: + # Create attachment from base64 audio data + attachment = Attachment.from_base64( + data_base64=audio_data, + name="output_audio.wav", + media_type="audio/wav", + ) + # Store additional audio metadata + audio_id = getattr(output_audio, "id", None) + if audio_id: + attachment.metadata["openai_audio_id"] = audio_id + expires_at = getattr(output_audio, "expires_at", None) + if expires_at: + attachment.metadata["expires_at"] = expires_at + + content_items.append(AudioContent(attachment=attachment)) + + if content_items: + return content_items + + # Text-only response (backwards compatible) if output_content: - output_data = output_content.strip() - elif output_function_call or output_tool_calls: + return output_content.strip() + + # Function/tool call response + if output_function_call or output_tool_calls: if output_function_call: - function_call = { + return { "name": output_function_call.name, "arguments": json.loads(output_function_call.arguments), } else: - function_call = { + return { "name": output_tool_calls[0].function.name, "arguments": json.loads(output_tool_calls[0].function.arguments), } - output_data = function_call - else: - output_data = None - return output_data + + return None def handle_streaming_parse( @@ -915,9 +1332,10 @@ def stream_parse_chunks( ) output_data = collected_function_call + processed_messages = extract_chat_completion_messages(kwargs["messages"]) trace_args = create_trace_args( end_time=end_time, - inputs={"prompt": kwargs["messages"]}, + inputs={"prompt": processed_messages}, output=output_data, latency=latency, tokens=num_of_completion_tokens, @@ -980,9 +1398,10 @@ def handle_non_streaming_parse( # Try to add step to the trace try: output_data = parse_structured_output_data(response) + processed_messages = extract_chat_completion_messages(kwargs["messages"]) trace_args = create_trace_args( end_time=end_time, - inputs={"prompt": kwargs["messages"]}, + inputs={"prompt": processed_messages}, output=output_data, latency=(end_time - start_time) * 1000, tokens=response.usage.total_tokens, @@ -1018,7 +1437,7 @@ def parse_structured_output_data(response: Any) -> Union[str, Dict[str, Any], No ---------- response : Any The parse method completion response. - + Returns ------- Union[str, Dict[str, Any], None] @@ -1026,20 +1445,20 @@ def parse_structured_output_data(response: Any) -> Union[str, Dict[str, Any], No """ try: # Check if response has parsed structured data - if hasattr(response, 'parsed') and response.parsed is not None: + if hasattr(response, "parsed") and response.parsed is not None: # Handle Pydantic models - if hasattr(response.parsed, 'model_dump'): + if hasattr(response.parsed, "model_dump"): return response.parsed.model_dump() - # Handle dict-like objects - elif hasattr(response.parsed, '__dict__'): + # Handle dict-like objects + elif hasattr(response.parsed, "__dict__"): return response.parsed.__dict__ # Handle other structured formats else: return response.parsed - + # Fallback to regular message content parsing return parse_non_streaming_output_data(response) - + except Exception as e: logger.error("Failed to parse structured output data: %s", e) # Final fallback to regular parsing @@ -1047,13 +1466,17 @@ def parse_structured_output_data(response: Any) -> Union[str, Dict[str, Any], No # --------------------------- OpenAI Assistants API -------------------------- # -def trace_openai_assistant_thread_run(client: "openai.OpenAI", run: "openai.types.beta.threads.run.Run") -> None: +def trace_openai_assistant_thread_run( + client: "openai.OpenAI", run: "openai.types.beta.threads.run.Run" +) -> None: """Trace a run from an OpenAI assistant. Once the run is completed, the thread data is published to Openlayer, along with the latency, and number of tokens used.""" if not HAVE_OPENAI: - raise ImportError("OpenAI library is not installed. Please install it with: pip install openai") + raise ImportError( + "OpenAI library is not installed. Please install it with: pip install openai" + ) _type_check_run(run) @@ -1067,7 +1490,9 @@ def trace_openai_assistant_thread_run(client: "openai.OpenAI", run: "openai.type metadata = _extract_run_metadata(run) # Convert thread to prompt - messages = client.beta.threads.messages.list(thread_id=run.thread_id, order="asc") + messages = client.beta.threads.messages.list( + thread_id=run.thread_id, order="asc" + ) prompt = _thread_messages_to_prompt(messages) # Add step to the trace diff --git a/src/openlayer/lib/tracing/__init__.py b/src/openlayer/lib/tracing/__init__.py index ffa19284..c63a5b11 100644 --- a/src/openlayer/lib/tracing/__init__.py +++ b/src/openlayer/lib/tracing/__init__.py @@ -1,30 +1,32 @@ """Openlayer tracing module.""" +from .attachments import Attachment from .tracer import ( + configure, + create_step, + get_current_step, + get_current_trace, + log_attachment, + log_context, + log_output, trace, trace_async, - update_current_trace, update_current_step, - log_context, - log_output, - configure, - get_current_trace, - get_current_step, - create_step, + update_current_trace, ) - __all__ = [ # Core tracing functions "trace", - "trace_async", + "trace_async", "update_current_trace", "update_current_step", "log_context", "log_output", "configure", "get_current_trace", - "get_current_step", + "get_current_step", "create_step", + "Attachment", + "log_attachment", ] - diff --git a/src/openlayer/lib/tracing/attachment_uploader.py b/src/openlayer/lib/tracing/attachment_uploader.py new file mode 100644 index 00000000..c13398d2 --- /dev/null +++ b/src/openlayer/lib/tracing/attachment_uploader.py @@ -0,0 +1,300 @@ +"""Attachment upload handling for traces. + +This module provides functionality to upload attachments to Openlayer storage +using the existing upload infrastructure from openlayer.lib.data._upload. +""" + +import logging +import uuid +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +if TYPE_CHECKING: + from ..._client import Openlayer + from .steps import Step + from .traces import Trace + +from .attachments import Attachment +from ..data._upload import STORAGE, StorageType, upload_bytes + +logger = logging.getLogger(__name__) + + +def find_attachments(data: Any) -> List[Attachment]: + """Recursively find all Attachment objects in a data structure. + + This function traverses dicts, lists, tuples, and objects with + 'attachment' attributes to find any Attachment objects embedded within. + + Args: + data: Any data structure that may contain Attachment objects. + + Returns: + A list of all Attachment objects found. + """ + if isinstance(data, Attachment): + return [data] + elif isinstance(data, dict): + result = [] + for value in data.values(): + result.extend(find_attachments(value)) + return result + elif isinstance(data, (list, tuple)): + result = [] + for item in data: + result.extend(find_attachments(item)) + return result + elif hasattr(data, "attachment"): + # Handle ContentItem objects (ImageContent, AudioContent, etc.) + attachment = getattr(data, "attachment") + if isinstance(attachment, Attachment): + return [attachment] + return [] + + +class AttachmentUploader: + """Handles uploading attachments to Openlayer storage. + + This class manages the upload of attachment data to Openlayer's storage + backend using the same infrastructure as other data uploads. It supports + S3, GCS, Azure, and local storage backends. + """ + + def __init__(self, client: "Openlayer", storage: Optional[StorageType] = None): + """Initialize the attachment uploader. + + Args: + client: The Openlayer client instance. + storage: Storage type override. Defaults to the global STORAGE setting. + """ + self._client = client + self._storage = storage or STORAGE + self._upload_cache: Dict[str, str] = {} # checksum -> storage_uri + + def upload_attachment(self, attachment: "Attachment") -> "Attachment": + """Upload a single attachment if needed. + + If the attachment already has a storage_uri or external URL, + it is returned as-is. Otherwise, the attachment data is uploaded + to Openlayer storage and the storage_uri is set. + + Args: + attachment: The attachment to upload. + + Returns: + The attachment with storage_uri populated (if upload was needed). + """ + # Skip if already uploaded + if attachment.is_uploaded(): + logger.debug("Attachment %s already uploaded", attachment.name) + return attachment + + # Skip if it has an external URL (no upload needed) + if attachment.url: + logger.debug( + "Attachment %s has external URL, skipping upload", attachment.name + ) + return attachment + + # Check if we have data to upload + if not attachment.has_data(): + logger.warning( + "Attachment %s has no data available for upload", + attachment.name, + ) + return attachment + + # Check cache by checksum for deduplication + if attachment.checksum_md5 and attachment.checksum_md5 in self._upload_cache: + attachment.storage_uri = self._upload_cache[attachment.checksum_md5] + logger.debug( + "Using cached storage_uri for attachment %s (checksum: %s)", + attachment.name, + attachment.checksum_md5, + ) + return attachment + + try: + # Generate a unique object name for storage + object_name = self._generate_object_name(attachment) + + # Get presigned URL from Openlayer API + presigned_response = self._client.storage.presigned_url.create( + object_name=object_name, + ) + + # Get the bytes to upload + file_bytes = attachment.get_bytes() + if file_bytes is None: + raise ValueError(f"No data available for attachment {attachment.name}") + + # Upload using the shared upload function + upload_bytes( + storage=self._storage, + url=presigned_response.url, + data=file_bytes, + object_name=object_name, + content_type=attachment.media_type, + fields=( + dict(presigned_response.fields) + if presigned_response.fields + else None + ), + ) + + # Set the storage URI on the attachment + attachment.storage_uri = presigned_response.storage_uri + + # Cache for deduplication + if attachment.checksum_md5: + self._upload_cache[attachment.checksum_md5] = attachment.storage_uri + + # Clear data after upload (no longer needed, avoid duplicating in JSON) + attachment._pending_bytes = None + attachment.data_base64 = None + + logger.debug( + "Uploaded attachment %s to %s", + attachment.name, + attachment.storage_uri, + ) + + except Exception as e: + logger.error( + "Failed to upload attachment %s: %s", + attachment.name, + e, + ) + + return attachment + + def _generate_object_name(self, attachment: "Attachment") -> str: + """Generate a unique object name for storage. + + Args: + attachment: The attachment to generate a name for. + + Returns: + A unique object name for storage. + """ + # Use checksum if available for deduplication, otherwise UUID + unique_id = attachment.checksum_md5 or str(uuid.uuid4()) + + # Extract extension from name or media type + extension = "" + if "." in attachment.name: + extension = attachment.name.rsplit(".", 1)[-1] + elif "/" in attachment.media_type: + # Try to get extension from media type (e.g., "image/png" -> "png") + subtype = attachment.media_type.split("/")[-1] + # Handle special cases + extension_map = { + "mpeg": "mp3", + "jpeg": "jpg", + "x-wav": "wav", + "x-m4a": "m4a", + } + extension = extension_map.get(subtype, subtype) + + if extension: + return f"attachments/{unique_id}.{extension}" + return f"attachments/{unique_id}" + + def upload_trace_attachments(self, trace: "Trace") -> int: + """Upload all attachments in a trace. + + Recursively processes all steps in the trace and uploads any + attachments that have data available. This includes attachments + in the step's attachments list, as well as any Attachment objects + embedded in the step's inputs or outputs. + + Args: + trace: The trace containing steps with attachments. + + Returns: + The number of attachments uploaded. + """ + upload_count = 0 + seen_ids: set = set() + + def process_step(step: "Step") -> None: + nonlocal upload_count + + # Collect attachments from all sources + all_attachments: List[Attachment] = list(step.attachments) + all_attachments.extend(find_attachments(step.inputs)) + all_attachments.extend(find_attachments(step.output)) + + # Process each attachment (deduplicate by ID) + for attachment in all_attachments: + if attachment.id in seen_ids: + continue + seen_ids.add(attachment.id) + + if not attachment.is_uploaded() and attachment.has_data(): + self.upload_attachment(attachment) + if attachment.is_uploaded(): + upload_count += 1 + + # Process nested steps recursively + for nested_step in step.steps: + process_step(nested_step) + + for step in trace.steps: + process_step(step) + + if upload_count > 0: + logger.info("Uploaded %d attachment(s) for trace", upload_count) + + return upload_count + + +# Module-level uploader instance (lazy initialized) +_uploader: Optional[AttachmentUploader] = None + + +def get_uploader() -> Optional[AttachmentUploader]: + """Get or create the attachment uploader. + + Returns: + The AttachmentUploader instance if uploads are enabled, None otherwise. + """ + global _uploader + from . import tracer + + if not tracer._configured_attachment_upload_enabled: + return None + + if _uploader is None: + client = tracer._get_client() + if client: + _uploader = AttachmentUploader(client) + + return _uploader + + +def reset_uploader() -> None: + """Reset the uploader instance. + + This is called when tracer.configure() is called to ensure + the uploader is recreated with new settings. + """ + global _uploader + _uploader = None + + +def upload_trace_attachments(trace: "Trace") -> int: + """Upload all attachments in a trace. + + This is a convenience function that gets the uploader and + uploads all attachments in the trace. + + Args: + trace: The trace to upload attachments for. + + Returns: + The number of attachments uploaded, or 0 if uploads are disabled. + """ + uploader = get_uploader() + if uploader is None: + return 0 + return uploader.upload_trace_attachments(trace) diff --git a/src/openlayer/lib/tracing/attachments.py b/src/openlayer/lib/tracing/attachments.py new file mode 100644 index 00000000..50cf580b --- /dev/null +++ b/src/openlayer/lib/tracing/attachments.py @@ -0,0 +1,374 @@ +"""Attachment abstraction for unstructured data in traces. + +This module provides the Attachment class for representing binary/media content +(images, audio, video, PDFs, etc.) that is associated with trace steps but +stored separately from structured trace data. +""" + +import base64 +import hashlib +import logging +import mimetypes +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, Optional, Union + +logger = logging.getLogger(__name__) + + +@dataclass +class Attachment: + """Unstructured data attached to a trace step. + + Attachments represent binary/media content that needs special handling + for storage and display. The attachment holds references to where the + data is stored, not the raw data itself (except for small inline data). + + Examples of attachments: + - Audio files (input to STT, output from TTS) + - Images (input to vision models, output from image generation) + - PDFs and documents + - Debug artifacts and screenshots + + Attributes: + id: Unique identifier for this attachment. + name: Human-readable name (e.g., "input_audio.wav"). + media_type: MIME type (e.g., "audio/wav", "image/png"). + storage_uri: Openlayer storage reference (set after upload). + url: External URL reference. + file_path: Local file path (for development/debugging). + data_base64: Inline base64 data (for small attachments). + size_bytes: Size of the attachment in bytes. + checksum_md5: MD5 checksum for integrity/deduplication. + metadata: Extensible metadata dict (duration, dimensions, etc.). + """ + + # Identity + id: str = field(default_factory=lambda: str(uuid.uuid4())) + name: str = "" + + # Content type + media_type: str = "application/octet-stream" + + # Storage references (priority: storage_uri > url > file_path) + storage_uri: Optional[str] = None # Openlayer managed storage + url: Optional[str] = None # External URL + file_path: Optional[str] = None # Local file path + + # Inline data (for small attachments when configured) + data_base64: Optional[str] = None + + # Size and integrity + size_bytes: Optional[int] = None + checksum_md5: Optional[str] = None + + # Extensible metadata + # For audio: duration_seconds, sample_rate, channels + # For images: width, height + # For documents: page_count + metadata: Dict[str, Any] = field(default_factory=dict) + + # Internal: holds pending bytes for upload (not serialized) + _pending_bytes: Optional[bytes] = field(default=None, repr=False, compare=False) + + def to_dict(self) -> Dict[str, Any]: + """Serialize attachment for JSON transport. + + Returns: + Dictionary representation suitable for JSON serialization. + """ + result = { + "id": self.id, + "name": self.name, + "mediaType": self.media_type, + } + + # Add references (only non-None) + if self.storage_uri: + result["storageUri"] = self.storage_uri + if self.url: + result["url"] = self.url + if self.file_path: + result["filePath"] = self.file_path + if self.data_base64: + result["dataBase64"] = self.data_base64 + + # Add optional fields + if self.size_bytes is not None: + result["sizeBytes"] = self.size_bytes + if self.checksum_md5: + result["checksumMd5"] = self.checksum_md5 + if self.metadata: + result["metadata"] = self.metadata + + return result + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "Attachment": + """Deserialize attachment from JSON. + + Args: + data: Dictionary representation of an attachment. + + Returns: + Attachment instance. + """ + return cls( + id=data.get("id", str(uuid.uuid4())), + name=data.get("name", ""), + media_type=data.get("mediaType", "application/octet-stream"), + storage_uri=data.get("storageUri"), + url=data.get("url"), + file_path=data.get("filePath"), + data_base64=data.get("dataBase64"), + size_bytes=data.get("sizeBytes"), + checksum_md5=data.get("checksumMd5"), + metadata=data.get("metadata", {}), + ) + + def is_uploaded(self) -> bool: + """Check if attachment has been uploaded to Openlayer storage. + + Returns: + True if storage_uri is set. + """ + return self.storage_uri is not None + + def has_data(self) -> bool: + """Check if attachment has data available for upload. + + Returns: + True if data is available via file_path, data_base64, or _pending_bytes. + """ + if self._pending_bytes is not None: + return True + if self.data_base64 is not None: + return True + if self.file_path: + return Path(self.file_path).exists() + return False + + def is_valid(self) -> bool: + """Check if attachment is valid and should be included in trace data. + + An attachment is valid if it has been uploaded, has an external URL, + or has data available for upload. + + Returns: + True if the attachment should be serialized. + """ + return self.is_uploaded() or self.url is not None or self.has_data() + + def get_reference(self) -> Optional[str]: + """Get the best available reference URI. + + Returns: + The most reliable reference to this attachment's data. + """ + return self.storage_uri or self.url or self.file_path + + # ----- Factory methods ----- + + @classmethod + def from_file( + cls, + file_path: Union[str, Path], + name: Optional[str] = None, + media_type: Optional[str] = None, + extract_metadata: bool = True, + ) -> "Attachment": + """Create an attachment from a local file. + + Args: + file_path: Path to the file. + name: Display name (defaults to filename). + media_type: MIME type (auto-detected if not provided). + extract_metadata: Whether to extract metadata like duration/dimensions. + + Returns: + Attachment instance with file reference. + """ + path = Path(file_path).expanduser() + + if not path.exists(): + logger.warning( + "Attachment file does not exist: %s. " + "The attachment will be ignored.", + path, + ) + + if media_type is None: + media_type = ( + mimetypes.guess_type(str(path))[0] or "application/octet-stream" + ) + + attachment = cls( + name=name or path.name, + media_type=media_type, + file_path=str(path.absolute()), + ) + + if path.exists(): + stat = path.stat() + attachment.size_bytes = stat.st_size + + # Compute checksum for deduplication + try: + with open(path, "rb") as f: + attachment.checksum_md5 = hashlib.md5(f.read()).hexdigest() + except Exception as e: + logger.debug("Could not compute checksum for %s: %s", path, e) + + if extract_metadata: + attachment._extract_metadata_from_file(path) + + return attachment + + @classmethod + def from_url( + cls, + url: str, + name: Optional[str] = None, + media_type: Optional[str] = None, + ) -> "Attachment": + """Create an attachment from an external URL. + + Args: + url: The URL pointing to the media. + name: Display name (extracted from URL if not provided). + media_type: MIME type (guessed from URL if not provided). + + Returns: + Attachment instance with URL reference. + """ + if media_type is None: + media_type = mimetypes.guess_type(url)[0] or "application/octet-stream" + + # Extract filename from URL if name not provided + if name is None: + from urllib.parse import urlparse + + parsed = urlparse(url) + name = Path(parsed.path).name or "attachment" + + return cls( + name=name, + media_type=media_type, + url=url, + ) + + @classmethod + def from_bytes( + cls, + data: bytes, + name: str, + media_type: str, + inline: bool = False, + ) -> "Attachment": + """Create an attachment from raw bytes. + + Args: + data: The binary data. + name: Display name for the attachment. + media_type: MIME type of the data. + inline: If True, store data as base64 inline (for small attachments). + + Returns: + Attachment instance with data ready for upload. + """ + attachment = cls( + name=name, + media_type=media_type, + size_bytes=len(data), + checksum_md5=hashlib.md5(data).hexdigest(), + ) + + if inline: + attachment.data_base64 = base64.b64encode(data).decode("utf-8") + else: + # Store bytes for later upload + attachment._pending_bytes = data + + return attachment + + @classmethod + def from_base64( + cls, + data_base64: str, + name: str, + media_type: str, + ) -> "Attachment": + """Create an attachment from base64-encoded data. + + Args: + data_base64: Base64-encoded binary data. + name: Display name for the attachment. + media_type: MIME type of the data. + + Returns: + Attachment instance with inline data. + """ + # Decode to get size and checksum + try: + decoded = base64.b64decode(data_base64) + size_bytes = len(decoded) + checksum_md5 = hashlib.md5(decoded).hexdigest() + except Exception: + size_bytes = None + checksum_md5 = None + + return cls( + name=name, + media_type=media_type, + data_base64=data_base64, + size_bytes=size_bytes, + checksum_md5=checksum_md5, + ) + + def get_bytes(self) -> Optional[bytes]: + """Get the binary data for this attachment. + + Returns: + The attachment data as bytes, or None if not available locally. + """ + # Check for pending bytes first + if self._pending_bytes is not None: + return self._pending_bytes + + # Check for inline data + if self.data_base64: + try: + return base64.b64decode(self.data_base64) + except Exception as e: + logger.error("Failed to decode base64 data: %s", e) + return None + + # Read from file + if self.file_path: + path = Path(self.file_path) + if path.exists(): + try: + return path.read_bytes() + except Exception as e: + logger.error("Failed to read file %s: %s", path, e) + return None + + return None + + def _extract_metadata_from_file(self, path: Path) -> None: + """Extract metadata from file based on media type. + + Args: + path: Path to the file. + """ + # TODO: Implement metadata extraction from file + pass + + def __repr__(self) -> str: + """String representation of the attachment.""" + ref = self.storage_uri or self.url or self.file_path or "(no reference)" + return ( + f"Attachment(name={self.name!r}, " + f"media_type={self.media_type!r}, ref={ref!r})" + ) diff --git a/src/openlayer/lib/tracing/content.py b/src/openlayer/lib/tracing/content.py new file mode 100644 index 00000000..b6caa952 --- /dev/null +++ b/src/openlayer/lib/tracing/content.py @@ -0,0 +1,56 @@ +"""Module with the different Content classes that can be used in the input/output +of a chat completion step (multimodal).""" + +from dataclasses import dataclass, field +from typing import Any, Dict, Union + +from .attachments import Attachment +from .enums import ContentType + + +@dataclass +class TextContent: + """Text content item.""" + + text: str + type: ContentType = field(default=ContentType.TEXT, init=False) + + def to_dict(self) -> Dict[str, Any]: + return {"type": self.type.value, "text": self.text} + + +@dataclass +class ImageContent: + """Image content item.""" + + attachment: Attachment + type: ContentType = field(default=ContentType.IMAGE, init=False) + + def to_dict(self) -> Dict[str, Any]: + return {"type": self.type.value, "attachment": self.attachment.to_dict()} + + +@dataclass +class AudioContent: + """Audio content item.""" + + attachment: Attachment + type: ContentType = field(default=ContentType.AUDIO, init=False) + + def to_dict(self) -> Dict[str, Any]: + return {"type": self.type.value, "attachment": self.attachment.to_dict()} + + +@dataclass +class FileContent: + """File content item (PDFs, documents, etc.).""" + + attachment: Attachment + type: ContentType = field(default=ContentType.FILE, init=False) + + def to_dict(self) -> Dict[str, Any]: + return {"type": self.type.value, "attachment": self.attachment.to_dict()} + + +# Union type for type hints +ContentItem = Union[TextContent, ImageContent, AudioContent, FileContent] diff --git a/src/openlayer/lib/tracing/enums.py b/src/openlayer/lib/tracing/enums.py index a188f4ff..ee483b7a 100644 --- a/src/openlayer/lib/tracing/enums.py +++ b/src/openlayer/lib/tracing/enums.py @@ -4,6 +4,8 @@ class StepType(enum.Enum): + """Types of steps in a trace.""" + AGENT = "agent" CHAT_COMPLETION = "chat_completion" GUARDRAIL = "guardrail" @@ -11,3 +13,12 @@ class StepType(enum.Enum): RETRIEVER = "retriever" TOOL = "tool" USER_CALL = "user_call" + + +class ContentType(enum.Enum): + """Types of content in multimodal messages.""" + + TEXT = "text" + IMAGE = "image" + AUDIO = "audio" + FILE = "file" diff --git a/src/openlayer/lib/tracing/steps.py b/src/openlayer/lib/tracing/steps.py index 4afe13f4..08237dfb 100644 --- a/src/openlayer/lib/tracing/steps.py +++ b/src/openlayer/lib/tracing/steps.py @@ -2,10 +2,12 @@ import time import uuid -from typing import Any, Dict, Optional, List +from pathlib import Path +from typing import Any, BinaryIO, Dict, List, Optional, Union from .. import utils from . import enums +from .attachments import Attachment class Step: @@ -37,20 +39,85 @@ def __init__( self.steps = [] + # Attachments: unstructured data (images, audio, PDFs, etc.) + self.attachments: List["Attachment"] = [] + def add_nested_step(self, nested_step: "Step") -> None: """Adds a nested step to the current step.""" self.steps.append(nested_step) + def attach( + self, + data: Union[bytes, str, Path, BinaryIO, "Attachment"], + name: Optional[str] = None, + media_type: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> "Attachment": + """Attach unstructured data to this step. + + This method allows attaching binary content (images, audio, documents, etc.) + to a trace step. The attachment will be uploaded to Openlayer storage + when the trace is completed (if upload is enabled). + + Args: + data: The data to attach. Can be: + - bytes: Raw binary data + - str/Path: File path to read from + - File-like object: Will be read + - Attachment: An existing Attachment object + name: Display name for the attachment. If not provided, will be + inferred from the file path or set to "attachment". + media_type: MIME type (e.g., "image/png", "audio/wav"). + Auto-detected for file paths if not provided. + metadata: Additional metadata dict (e.g., duration, dimensions). + + Returns: + The created or added Attachment. + + Examples: + >>> step.attach("/path/to/audio.wav") + >>> step.attach(image_bytes, name="screenshot.png", media_type="image/png") + >>> step.attach(pdf_file, name="document.pdf", media_type="application/pdf") + """ + if isinstance(data, Attachment): + attachment = data + elif isinstance(data, bytes): + attachment = Attachment.from_bytes( + data=data, + name=name or "attachment", + media_type=media_type or "application/octet-stream", + ) + elif isinstance(data, (str, Path)): + attachment = Attachment.from_file( + file_path=data, + name=name, + media_type=media_type, + ) + else: + # File-like object + file_bytes = data.read() + inferred_name = name or getattr(data, "name", None) or "attachment" + attachment = Attachment.from_bytes( + data=file_bytes, + name=inferred_name, + media_type=media_type or "application/octet-stream", + ) + + if metadata: + attachment.metadata.update(metadata) + + self.attachments.append(attachment) + return attachment + def log(self, **kwargs: Any) -> None: """Logs step data.""" - kwargs = utils.json_serialize(kwargs) for key, value in kwargs.items(): if hasattr(self, key): setattr(self, key, value) def to_dict(self) -> Dict[str, Any]: """Dictionary representation of the Step.""" - return { + result = { "name": self.name, "id": str(self.id), "type": self.step_type.value, @@ -64,6 +131,18 @@ def to_dict(self) -> Dict[str, Any]: "endTime": self.end_time, } + # Include valid attachments only (filter out ones with no data/reference) + if self.attachments: + valid_attachments = [ + attachment.to_dict() + for attachment in self.attachments + if attachment.is_valid() + ] + if valid_attachments: + result["attachments"] = valid_attachments + + return result + class UserCallStep(Step): """User call step represents a generic user call in the trace.""" diff --git a/src/openlayer/lib/tracing/tracer.py b/src/openlayer/lib/tracing/tracer.py index 9d3f3b6b..03748fb2 100644 --- a/src/openlayer/lib/tracing/tracer.py +++ b/src/openlayer/lib/tracing/tracer.py @@ -13,14 +13,24 @@ from contextlib import contextmanager from functools import wraps from pathlib import Path -from typing import Any, Awaitable, Callable, Dict, Generator, List, Optional, Tuple, Union +from typing import ( + Any, + Awaitable, + Callable, + Dict, + Generator, + List, + Optional, + Tuple, + Union, +) from ..._base_client import DefaultHttpxClient from ..._client import Openlayer from ...types.inference_pipelines.data_stream_params import ConfigLlmData from .. import utils +from ..guardrails.base import GuardrailAction, GuardrailResult from . import enums, steps, traces -from ..guardrails.base import GuardrailResult, GuardrailAction from .context import UserSessionContext # Type aliases for callback functions @@ -35,7 +45,9 @@ TRUE_LIST = ["true", "on", "1"] _publish = utils.get_env_variable("OPENLAYER_DISABLE_PUBLISH") not in TRUE_LIST -_verify_ssl = (utils.get_env_variable("OPENLAYER_VERIFY_SSL") or "true").lower() in TRUE_LIST +_verify_ssl = ( + utils.get_env_variable("OPENLAYER_VERIFY_SSL") or "true" +).lower() in TRUE_LIST _client = None # Configuration variables for programmatic setup @@ -51,6 +63,9 @@ _configured_offline_buffer_path: Optional[str] = None _configured_max_buffer_size: Optional[int] = None +# Attachment upload configuration +_configured_attachment_upload_enabled: bool = False + def configure( api_key: Optional[str] = None, @@ -62,6 +77,7 @@ def configure( offline_buffer_enabled: bool = False, offline_buffer_path: Optional[str] = None, max_buffer_size: Optional[int] = None, + attachment_upload_enabled: bool = False, ) -> None: """Configure the Openlayer tracer with custom settings. @@ -82,6 +98,9 @@ def configure( offline_buffer_enabled: Enable offline buffering of failed traces. Defaults to False. offline_buffer_path: Directory path for storing buffered traces. Defaults to ~/.openlayer/buffer. max_buffer_size: Maximum number of trace files to store in buffer. Defaults to 1000. + attachment_upload_enabled: Enable uploading of attachments (images, audio, etc.) to + Openlayer storage. When enabled, attachments on steps will be uploaded during + trace completion. Defaults to False. Examples: >>> import openlayer.lib.tracing.tracer as tracer @@ -99,24 +118,20 @@ def configure( ... offline_buffer_path="/tmp/openlayer_buffer", ... max_buffer_size=500, ... ) + >>> # Configure with attachment uploads enabled + >>> tracer.configure( + ... api_key="your_api_key_here", + ... inference_pipeline_id="your_pipeline_id_here", + ... attachment_upload_enabled=True, + ... ) >>> # Now use the decorators normally >>> @tracer.trace() >>> def my_function(): ... return "result" """ - global \ - _configured_api_key, \ - _configured_pipeline_id, \ - _configured_base_url, \ - _configured_timeout, \ - _configured_max_retries, \ - _client - global \ - _configured_on_flush_failure, \ - _configured_offline_buffer_enabled, \ - _configured_offline_buffer_path, \ - _configured_max_buffer_size, \ - _offline_buffer + global _configured_api_key, _configured_pipeline_id, _configured_base_url, _configured_timeout, _configured_max_retries, _client + global _configured_on_flush_failure, _configured_offline_buffer_enabled, _configured_offline_buffer_path, _configured_max_buffer_size, _offline_buffer + global _configured_attachment_upload_enabled _configured_api_key = api_key _configured_pipeline_id = inference_pipeline_id @@ -127,11 +142,17 @@ def configure( _configured_offline_buffer_enabled = offline_buffer_enabled _configured_offline_buffer_path = offline_buffer_path _configured_max_buffer_size = max_buffer_size + _configured_attachment_upload_enabled = attachment_upload_enabled # Reset the client and buffer so they get recreated with new configuration _client = None _offline_buffer = None + # Reset attachment uploader + from .attachment_uploader import reset_uploader + + reset_uploader() + def _get_client() -> Optional[Openlayer]: """Get or create the Openlayer client with lazy initialization.""" @@ -192,7 +213,9 @@ def __init__( max_buffer_size: Maximum number of trace files to store. Defaults to 1000. """ - self.buffer_path = Path(buffer_path or os.path.expanduser("~/.openlayer/buffer")) + self.buffer_path = Path( + buffer_path or os.path.expanduser("~/.openlayer/buffer") + ) self.max_buffer_size = max_buffer_size or 1000 self._lock = threading.RLock() @@ -201,7 +224,12 @@ def __init__( logger.debug("Initialized offline buffer at %s", self.buffer_path) - def store_trace(self, trace_data: Dict[str, Any], config: Dict[str, Any], inference_pipeline_id: str) -> bool: + def store_trace( + self, + trace_data: Dict[str, Any], + config: Dict[str, Any], + inference_pipeline_id: str, + ) -> bool: """Store a failed trace to the offline buffer. Args: @@ -262,7 +290,10 @@ def get_buffered_traces(self) -> List[Dict[str, Any]]: try: with self._lock: - trace_files = sorted(self.buffer_path.glob("trace_*.json"), key=lambda f: f.stat().st_mtime) + trace_files = sorted( + self.buffer_path.glob("trace_*.json"), + key=lambda f: f.stat().st_mtime, + ) for file_path in trace_files: try: @@ -271,7 +302,9 @@ def get_buffered_traces(self) -> List[Dict[str, Any]]: payload["_file_path"] = str(file_path) traces.append(payload) except Exception as e: - logger.error("Failed to read buffered trace %s: %s", file_path, e) + logger.error( + "Failed to read buffered trace %s: %s", file_path, e + ) except Exception as e: logger.error("Failed to get buffered traces: %s", e) @@ -312,8 +345,16 @@ def get_buffer_status(self) -> Dict[str, Any]: "total_traces": len(trace_files), "max_buffer_size": self.max_buffer_size, "total_size_bytes": total_size, - "oldest_trace": (min(trace_files, key=lambda f: f.stat().st_mtime).name if trace_files else None), - "newest_trace": (max(trace_files, key=lambda f: f.stat().st_mtime).name if trace_files else None), + "oldest_trace": ( + min(trace_files, key=lambda f: f.stat().st_mtime).name + if trace_files + else None + ), + "newest_trace": ( + max(trace_files, key=lambda f: f.stat().st_mtime).name + if trace_files + else None + ), } except Exception as e: logger.error("Failed to get buffer status: %s", e) @@ -409,7 +450,7 @@ def create_step( # This can occur when async generators cross context boundaries if "was created in a different Context" not in str(e): raise - + _handle_trace_completion( is_root_step=is_root_step, step_name=name, @@ -496,7 +537,9 @@ def __init__(self): self._token = None self._output_chunks = [] self._trace_initialized = False - self._captured_context = None # Capture context for ASGI compatibility + self._captured_context = ( + None # Capture context for ASGI compatibility + ) def __iter__(self): return self @@ -505,12 +548,14 @@ def __next__(self): # Initialize tracing on first iteration only if not self._trace_initialized: self._original_gen = func(*func_args, **func_kwargs) - self._step, self._is_root_step, self._token = _create_and_initialize_step( - step_name=step_name, - step_type=enums.StepType.USER_CALL, - inputs=None, - output=None, - metadata=None, + self._step, self._is_root_step, self._token = ( + _create_and_initialize_step( + step_name=step_name, + step_type=enums.StepType.USER_CALL, + inputs=None, + output=None, + metadata=None, + ) ) self._inputs = _extract_function_inputs( func_signature=func_signature, @@ -614,16 +659,19 @@ def wrapper(*func_args, **func_kwargs): ) # Apply input guardrails - modified_inputs, input_guardrail_metadata = _apply_input_guardrails( - guardrails or [], - original_inputs, + modified_inputs, input_guardrail_metadata = ( + _apply_input_guardrails( + guardrails or [], + original_inputs, + ) ) guardrail_metadata.update(input_guardrail_metadata) # Check if function execution should be skipped if ( hasattr(modified_inputs, "__class__") - and modified_inputs.__class__.__name__ == "SkipFunctionExecution" + and modified_inputs.__class__.__name__ + == "SkipFunctionExecution" ): # Function execution was blocked with SKIP_FUNCTION strategy output = None @@ -653,17 +701,20 @@ def wrapper(*func_args, **func_kwargs): # Apply output guardrails (skip if function was skipped) if ( hasattr(modified_inputs, "__class__") - and modified_inputs.__class__.__name__ == "SkipFunctionExecution" + and modified_inputs.__class__.__name__ + == "SkipFunctionExecution" ): final_output, output_guardrail_metadata = output, {} # Use original inputs for logging since modified_inputs # is a special marker modified_inputs = original_inputs else: - final_output, output_guardrail_metadata = _apply_output_guardrails( - guardrails or [], - output, - modified_inputs or original_inputs, + final_output, output_guardrail_metadata = ( + _apply_output_guardrails( + guardrails or [], + output, + modified_inputs or original_inputs, + ) ) guardrail_metadata.update(output_guardrail_metadata) @@ -765,12 +816,14 @@ async def __anext__(self): # Initialize tracing on first iteration only if not self._trace_initialized: self._original_gen = func(*func_args, **func_kwargs) - self._step, self._is_root_step, self._token = _create_and_initialize_step( - step_name=step_name, - step_type=enums.StepType.USER_CALL, - inputs=None, - output=None, - metadata=None, + self._step, self._is_root_step, self._token = ( + _create_and_initialize_step( + step_name=step_name, + step_type=enums.StepType.USER_CALL, + inputs=None, + output=None, + metadata=None, + ) ) self._inputs = _extract_function_inputs( func_signature=func_signature, @@ -842,16 +895,20 @@ async def async_function_wrapper(*func_args, **func_kwargs): ) # Process inputs through guardrails - modified_inputs, input_metadata = _apply_input_guardrails( - guardrails, - inputs, + modified_inputs, input_metadata = ( + _apply_input_guardrails( + guardrails, + inputs, + ) ) guardrail_metadata.update(input_metadata) # Execute function with potentially modified inputs if modified_inputs != inputs: # Reconstruct function arguments from modified inputs - bound = func_signature.bind(*func_args, **func_kwargs) + bound = func_signature.bind( + *func_args, **func_kwargs + ) bound.apply_defaults() # Update bound arguments with modified values @@ -860,7 +917,9 @@ async def async_function_wrapper(*func_args, **func_kwargs): modified_value, ) in modified_inputs.items(): if param_name in bound.arguments: - bound.arguments[param_name] = modified_value + bound.arguments[param_name] = ( + modified_value + ) output = await func(*bound.args, **bound.kwargs) else: @@ -879,15 +938,17 @@ async def async_function_wrapper(*func_args, **func_kwargs): # Apply output guardrails if provided if guardrails and output is not None: try: - final_output, output_metadata = _apply_output_guardrails( - guardrails, - output, - _extract_function_inputs( - func_signature=func_signature, - func_args=func_args, - func_kwargs=func_kwargs, - context_kwarg=context_kwarg, - ), + final_output, output_metadata = ( + _apply_output_guardrails( + guardrails, + output, + _extract_function_inputs( + func_signature=func_signature, + func_args=func_args, + func_kwargs=func_kwargs, + context_kwarg=context_kwarg, + ), + ) ) guardrail_metadata.update(output_metadata) @@ -934,16 +995,20 @@ def sync_wrapper(*func_args, **func_kwargs): ) # Process inputs through guardrails - modified_inputs, input_metadata = _apply_input_guardrails( - guardrails, - inputs, + modified_inputs, input_metadata = ( + _apply_input_guardrails( + guardrails, + inputs, + ) ) guardrail_metadata.update(input_metadata) # Execute function with potentially modified inputs if modified_inputs != inputs: # Reconstruct function arguments from modified inputs - bound = func_signature.bind(*func_args, **func_kwargs) + bound = func_signature.bind( + *func_args, **func_kwargs + ) bound.apply_defaults() # Update bound arguments with modified values @@ -1039,6 +1104,48 @@ def log_context(context: List[str]) -> None: logger.warning("No current step found to log context.") +def log_attachment( + data: Union[bytes, str, Path, Any], + name: Optional[str] = None, + media_type: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, +) -> Optional["Attachment"]: + """Attaches unstructured data (image, audio, PDF, etc.) to the current step. + + This function allows you to attach binary content to the currently active + trace step. The attachment will be uploaded to Openlayer storage when the + trace completes (if attachment uploads are enabled via configure()). + """ + from .attachments import Attachment + + current_step = get_current_step() + if current_step is None: + logger.warning( + "log_attachment() called without an active step. " + "Make sure to call this function within a traced context " + "(e.g., inside a function decorated with @trace)." + ) + return None + + # If it's already an Attachment, just add it to the step + if isinstance(data, Attachment): + if metadata: + data.metadata.update(metadata) + current_step.attachments.append(data) + logger.debug("Attached existing Attachment %s to current step", data.name) + return data + + # Use the step's attach method for other data types + attachment = current_step.attach( + data=data, + name=name, + media_type=media_type, + metadata=metadata, + ) + logger.debug("Attached %s to current step", attachment.name) + return attachment + + def update_current_trace(**kwargs) -> None: """Updates the current trace metadata with the provided values. @@ -1229,7 +1336,9 @@ def replay_buffered_traces( try: on_replay_success(trace_data, config) except Exception as callback_err: - logger.error("Error in replay success callback: %s", callback_err) + logger.error( + "Error in replay success callback: %s", callback_err + ) break @@ -1258,7 +1367,9 @@ def replay_buffered_traces( try: on_replay_failure(trace_data, config, err) except Exception as callback_err: - logger.error("Error in replay failure callback: %s", callback_err) + logger.error( + "Error in replay failure callback: %s", callback_err + ) result = { "total_traces": total_traces, @@ -1387,6 +1498,17 @@ def _handle_trace_completion( ) return + # Upload attachments before processing trace data + if _configured_attachment_upload_enabled: + try: + from .attachment_uploader import upload_trace_attachments + + upload_count = upload_trace_attachments(current_trace) + if upload_count > 0: + logger.debug("Uploaded %d attachments for trace", upload_count) + except Exception as e: + logger.error("Failed to upload trace attachments: %s", e) + trace_data, input_variable_names = post_process_trace(current_trace) config = dict( @@ -1590,15 +1712,20 @@ def _finalize_step_logging( # Add summary fields for easy filtering step_metadata["has_guardrails"] = True - step_metadata["guardrail_actions"] = [metadata.get("action") for metadata in guardrail_metadata.values()] + step_metadata["guardrail_actions"] = [ + metadata.get("action") for metadata in guardrail_metadata.values() + ] step_metadata["guardrail_names"] = [ - key.replace("input_", "").replace("output_", "") for key in guardrail_metadata.keys() + key.replace("input_", "").replace("output_", "") + for key in guardrail_metadata.keys() ] # Add flags for specific actions for easy filtering actions = step_metadata["guardrail_actions"] step_metadata["guardrail_blocked"] = "blocked" in actions - step_metadata["guardrail_modified"] = "redacted" in actions or "modified" in actions + step_metadata["guardrail_modified"] = ( + "redacted" in actions or "modified" in actions + ) step_metadata["guardrail_allowed"] = "allow" in actions else: step_metadata["has_guardrails"] = False @@ -1631,9 +1758,13 @@ def _finalize_sync_generator_step( # Context variable was created in a different context (e.g., different thread) # This can happen in async/multi-threaded environments like FastAPI/OpenWebUI # We can safely ignore this as the step finalization will still complete - logger.debug("Context variable reset failed - generator consumed in different context") + logger.debug( + "Context variable reset failed - generator consumed in different context" + ) - _finalize_step_logging(step=step, inputs=inputs, output=output, start_time=step.start_time) + _finalize_step_logging( + step=step, inputs=inputs, output=output, start_time=step.start_time + ) _handle_trace_completion( is_root_step=is_root_step, @@ -1655,7 +1786,9 @@ def _finalize_async_generator_step( ) -> None: """Finalize async generator step - called when generator is consumed.""" _current_step.reset(token) - _finalize_step_logging(step=step, inputs=inputs, output=output, start_time=step.start_time) + _finalize_step_logging( + step=step, inputs=inputs, output=output, start_time=step.start_time + ) _handle_trace_completion( is_root_step=is_root_step, step_name=step_name, @@ -1689,19 +1822,20 @@ def post_process_trace( This is done to ensure backward compatibility with data on Openlayer. """ root_step = trace_obj.steps[0] + processed_steps = trace_obj.to_dict() - input_variables = root_step.inputs - if input_variables: - input_variable_names = list(input_variables.keys()) + # Get input variable names from raw inputs, but use serialized values + raw_inputs = root_step.inputs + serialized_inputs = processed_steps[0].get("inputs", {}) + if raw_inputs: + input_variable_names = list(raw_inputs.keys()) else: input_variable_names = [] - processed_steps = trace_obj.to_dict() - trace_data = { "inferenceTimestamp": root_step.start_time, "inferenceId": trace_obj.inference_id or str(root_step.id), - "output": root_step.output, + "output": processed_steps[0].get("output"), "latency": root_step.latency, "cost": processed_steps[0].get("cost", 0), "tokens": processed_steps[0].get("tokens", 0), @@ -1725,8 +1859,8 @@ def post_process_trace( if root_step.ground_truth: trace_data["groundTruth"] = root_step.ground_truth - if input_variables: - trace_data.update(input_variables) + if serialized_inputs: + trace_data.update(serialized_inputs) context = get_rag_context() if context: @@ -1760,7 +1894,7 @@ def _apply_input_guardrails( for i, guardrail in enumerate(guardrails): try: # Import here to avoid circular imports - from ..guardrails.base import BaseGuardrail, GuardrailBlockedException + from ..guardrails.base import BaseGuardrail if not isinstance(guardrail, BaseGuardrail): logger.warning("Skipping invalid guardrail: %s", guardrail) @@ -1813,7 +1947,10 @@ def _apply_input_guardrails( guardrail_step.log(**step_log_data) return final_inputs, overall_metadata - elif result.action.value == "modify" and result.modified_data is not None: + elif ( + result.action.value == "modify" + and result.modified_data is not None + ): step_log_data["output"] = result.modified_data modified_inputs = result.modified_data logger.debug("Guardrail %s modified inputs", guardrail.name) @@ -1841,8 +1978,12 @@ def _apply_input_guardrails( raise else: # Log other exceptions but don't fail the trace - logger.error("Error applying input guardrail %s: %s", guardrail.name, e) - guardrail_key = f"input_{guardrail.name.lower().replace(' ', '_')}" + logger.error( + "Error applying input guardrail %s: %s", guardrail.name, e + ) + guardrail_key = ( + f"input_{guardrail.name.lower().replace(' ', '_')}" + ) overall_metadata[guardrail_key] = { "action": "error", "reason": str(e), @@ -1864,7 +2005,9 @@ def _apply_input_guardrails( return modified_inputs, overall_metadata -def _apply_output_guardrails(guardrails: List[Any], output: Any, inputs: Dict[str, Any]) -> Tuple[Any, Dict[str, Any]]: +def _apply_output_guardrails( + guardrails: List[Any], output: Any, inputs: Dict[str, Any] +) -> Tuple[Any, Dict[str, Any]]: """Apply guardrails to function output, creating guardrail steps. Args: @@ -1884,7 +2027,7 @@ def _apply_output_guardrails(guardrails: List[Any], output: Any, inputs: Dict[st for i, guardrail in enumerate(guardrails): try: # Import here to avoid circular imports - from ..guardrails.base import BaseGuardrail, GuardrailBlockedException + from ..guardrails.base import BaseGuardrail if not isinstance(guardrail, BaseGuardrail): logger.warning("Skipping invalid guardrail: %s", guardrail) @@ -1937,7 +2080,10 @@ def _apply_output_guardrails(guardrails: List[Any], output: Any, inputs: Dict[st guardrail_step.log(**step_log_data) return final_output, overall_metadata - elif result.action.value == "modify" and result.modified_data is not None: + elif ( + result.action.value == "modify" + and result.modified_data is not None + ): step_log_data["output"] = result.modified_data modified_output = result.modified_data logger.debug("Guardrail %s modified output", guardrail.name) @@ -1965,8 +2111,12 @@ def _apply_output_guardrails(guardrails: List[Any], output: Any, inputs: Dict[st raise else: # Log other exceptions but don't fail the trace - logger.error("Error applying output guardrail %s: %s", guardrail.name, e) - guardrail_key = f"output_{guardrail.name.lower().replace(' ', '_')}" + logger.error( + "Error applying output guardrail %s: %s", guardrail.name, e + ) + guardrail_key = ( + f"output_{guardrail.name.lower().replace(' ', '_')}" + ) overall_metadata[guardrail_key] = { "action": "error", "reason": str(e), @@ -2030,7 +2180,8 @@ def _handle_guardrail_block( # Original behavior - raise exception (breaks pipeline) raise GuardrailBlockedException( guardrail_name=guardrail.name, - reason=result.reason or f"{'Input' if is_input else 'Output'} blocked by guardrail", + reason=result.reason + or f"{'Input' if is_input else 'Output'} blocked by guardrail", metadata=result.metadata, ) @@ -2039,7 +2190,9 @@ def _handle_guardrail_block( if is_input: # For input blocking, return empty inputs empty_inputs = {key: "" for key in (modified_inputs or {})} - logger.info("Guardrail %s blocked input, returning empty inputs", guardrail.name) + logger.info( + "Guardrail %s blocked input, returning empty inputs", guardrail.name + ) return empty_inputs, guardrail_metadata or {} else: # For output blocking, return None @@ -2048,7 +2201,9 @@ def _handle_guardrail_block( elif strategy == BlockStrategy.RETURN_ERROR_MESSAGE: # Return error message (graceful) - error_msg = getattr(result, "error_message", "Request blocked due to policy violation") + error_msg = getattr( + result, "error_message", "Request blocked due to policy violation" + ) logger.info( "Guardrail %s blocked %s, returning error message", guardrail.name, @@ -2084,9 +2239,12 @@ class SkipFunctionExecution: else: # Fallback to raising exception - logger.warning("Unknown block strategy %s, falling back to raising exception", strategy) + logger.warning( + "Unknown block strategy %s, falling back to raising exception", strategy + ) raise GuardrailBlockedException( guardrail_name=guardrail.name, - reason=result.reason or f"{'Input' if is_input else 'Output'} blocked by guardrail", + reason=result.reason + or f"{'Input' if is_input else 'Output'} blocked by guardrail", metadata=result.metadata, ) diff --git a/src/openlayer/lib/utils.py b/src/openlayer/lib/utils.py index b5366c35..fb8691a4 100644 --- a/src/openlayer/lib/utils.py +++ b/src/openlayer/lib/utils.py @@ -51,6 +51,12 @@ def json_serialize(data): elif isinstance(data, type): # Handle model classes/metaclasses return str(data.__name__ if hasattr(data, "__name__") else data) + elif hasattr(data, "to_dict") and callable(getattr(data, "to_dict")): + # Handle objects with to_dict method (e.g., Attachment) + try: + return json_serialize(data.to_dict()) + except Exception: + return str(data) elif hasattr(data, "model_dump") and callable(getattr(data, "model_dump")): # Handle Pydantic model instances try: