From cf8ac613fc213dabfa5a9d78c30d68051527888d Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Mon, 1 Jun 2026 16:08:15 -0600 Subject: [PATCH] attachment processor --- lib/braintrust/config.rb | 18 +- lib/braintrust/trace.rb | 27 +- .../trace/attachment_processor/formats.rb | 295 ++++++++++++ .../trace/attachment_processor/processor.rb | 146 ++++++ .../trace/attachment_processor/reference.rb | 90 ++++ .../attachment_processor/transformed_span.rb | 52 +++ .../trace/attachment_processor/uploader.rb | 432 ++++++++++++++++++ lib/braintrust/trace/span_processor.rb | 92 +++- test/braintrust/config_test.rb | 23 +- .../attachment_processor/formats_test.rb | 193 ++++++++ .../attachment_processor/processor_test.rb | 160 +++++++ .../attachment_processor/reference_test.rb | 71 +++ .../transformed_span_test.rb | 59 +++ .../attachment_processor/uploader_test.rb | 248 ++++++++++ .../trace/span_processor_attachment_test.rb | 85 ++++ test/btx/btx_test.rb | 13 +- test/btx/span_converter.rb | 18 +- test/btx/spec_executor.rb | 19 +- 18 files changed, 2017 insertions(+), 24 deletions(-) create mode 100644 lib/braintrust/trace/attachment_processor/formats.rb create mode 100644 lib/braintrust/trace/attachment_processor/processor.rb create mode 100644 lib/braintrust/trace/attachment_processor/reference.rb create mode 100644 lib/braintrust/trace/attachment_processor/transformed_span.rb create mode 100644 lib/braintrust/trace/attachment_processor/uploader.rb create mode 100644 test/braintrust/trace/attachment_processor/formats_test.rb create mode 100644 test/braintrust/trace/attachment_processor/processor_test.rb create mode 100644 test/braintrust/trace/attachment_processor/reference_test.rb create mode 100644 test/braintrust/trace/attachment_processor/transformed_span_test.rb create mode 100644 test/braintrust/trace/attachment_processor/uploader_test.rb create mode 100644 test/braintrust/trace/span_processor_attachment_test.rb diff --git a/lib/braintrust/config.rb b/lib/braintrust/config.rb index 336f8459..af07211e 100644 --- a/lib/braintrust/config.rb +++ b/lib/braintrust/config.rb @@ -5,10 +5,10 @@ module Braintrust # and allows overriding with explicit options class Config attr_reader :api_key, :org_name, :default_project, :app_url, :api_url, - :filter_ai_spans, :span_filter_funcs + :filter_ai_spans, :span_filter_funcs, :auto_convert_ai_attachments def initialize(api_key: nil, org_name: nil, default_project: nil, app_url: nil, api_url: nil, - filter_ai_spans: nil, span_filter_funcs: nil) + filter_ai_spans: nil, span_filter_funcs: nil, auto_convert_ai_attachments: true) @api_key = api_key @org_name = org_name @default_project = default_project @@ -16,6 +16,7 @@ def initialize(api_key: nil, org_name: nil, default_project: nil, app_url: nil, @api_url = api_url @filter_ai_spans = filter_ai_spans @span_filter_funcs = span_filter_funcs || [] + @auto_convert_ai_attachments = auto_convert_ai_attachments end # Create a Config from environment variables, with option overrides @@ -29,7 +30,7 @@ def initialize(api_key: nil, org_name: nil, default_project: nil, app_url: nil, # @param span_filter_funcs [Array, nil] Custom span filter functions # @return [Config] the created config def self.from_env(api_key: nil, org_name: nil, default_project: nil, app_url: nil, api_url: nil, - filter_ai_spans: nil, span_filter_funcs: nil) + filter_ai_spans: nil, span_filter_funcs: nil, auto_convert_ai_attachments: nil) # Parse filter_ai_spans from ENV if not explicitly provided env_filter_ai_spans = ENV["BRAINTRUST_OTEL_FILTER_AI_SPANS"] filter_ai_spans_value = if filter_ai_spans.nil? @@ -38,6 +39,14 @@ def self.from_env(api_key: nil, org_name: nil, default_project: nil, app_url: ni filter_ai_spans end + # Attachment auto-conversion defaults to true; disabled only when the env + # var is explicitly "false" or an explicit false is passed. + auto_convert_value = if auto_convert_ai_attachments.nil? + ENV["BRAINTRUST_AUTO_CONVERT_AI_ATTACHMENTS"] != "false" + else + auto_convert_ai_attachments + end + new( api_key: api_key || ((ENV["BRAINTRUST_API_KEY"] && ENV["BRAINTRUST_API_KEY"].empty?) ? nil : ENV["BRAINTRUST_API_KEY"]), org_name: org_name || ENV["BRAINTRUST_ORG_NAME"], @@ -45,7 +54,8 @@ def self.from_env(api_key: nil, org_name: nil, default_project: nil, app_url: ni app_url: app_url || ENV["BRAINTRUST_APP_URL"] || "https://www.braintrust.dev", api_url: api_url || ENV["BRAINTRUST_API_URL"] || "https://api.braintrust.dev", filter_ai_spans: filter_ai_spans_value, - span_filter_funcs: span_filter_funcs + span_filter_funcs: span_filter_funcs, + auto_convert_ai_attachments: auto_convert_value ) end end diff --git a/lib/braintrust/trace.rb b/lib/braintrust/trace.rb index 2c6563fe..e0d37b5d 100644 --- a/lib/braintrust/trace.rb +++ b/lib/braintrust/trace.rb @@ -5,6 +5,8 @@ require_relative "trace/span_processor" require_relative "trace/span_exporter" require_relative "trace/span_filter" +require_relative "trace/attachment_processor/processor" +require_relative "trace/attachment_processor/uploader" require_relative "internal/env" require_relative "logger" @@ -81,7 +83,7 @@ def self.flush_spans end end - def self.enable(tracer_provider, state: nil, exporter: nil, config: nil) + def self.enable(tracer_provider, state: nil, exporter: nil, config: nil, attachment_processor: :default) state ||= Braintrust.current_state raise Error, "No state available" unless state @@ -104,8 +106,11 @@ def self.enable(tracer_provider, state: nil, exporter: nil, config: nil) # Build filters array from config filters = build_filters(config) + # Build the attachment processor (unless explicitly overridden / disabled) + attachment_processor = build_attachment_processor(state, config) if attachment_processor == :default + # Wrap span processor in our custom span processor to add Braintrust attributes and filters - processor = SpanProcessor.new(span_processor, state, filters) + processor = SpanProcessor.new(span_processor, state, filters, attachment_processor: attachment_processor) # Register with tracer provider tracer_provider.add_span_processor(processor) @@ -139,6 +144,24 @@ def self.build_filters(config) filters end + # Build the attachment processor for a given state/config, or nil when + # attachment auto-conversion is disabled. + # @param state [State] + # @param config [Config, nil] + # @return [AttachmentProcessor::Processor, nil] + def self.build_attachment_processor(state, config) + enabled = config.nil? || config.auto_convert_ai_attachments + return nil unless enabled + + uploader = AttachmentProcessor::S3Uploader.new( + api_key: state.api_key, + api_url: state.api_url, + login_url: state.app_url, + org_id: (state.org_id if state.respond_to?(:org_id)) + ) + AttachmentProcessor::Processor.new(uploader: uploader, logger: Log) + end + # Generate a permalink URL for a span to view in the Braintrust UI # Returns an empty string if the permalink cannot be generated # @param span [OpenTelemetry::Trace::Span] The span to generate a permalink for diff --git a/lib/braintrust/trace/attachment_processor/formats.rb b/lib/braintrust/trace/attachment_processor/formats.rb new file mode 100644 index 00000000..ec997eac --- /dev/null +++ b/lib/braintrust/trace/attachment_processor/formats.rb @@ -0,0 +1,295 @@ +# frozen_string_literal: true + +require_relative "reference" +require_relative "../../internal/encoding" + +module Braintrust + module Trace + module AttachmentProcessor + # Vendor-specific base64 attachment formats. + # + # Each {Format} is a self-contained unit: a detection predicate plus a + # replacement function. Adding support for a new vendor is a matter of + # appending one entry to {Formats.all} — shared walk/upload logic does not + # change. + module Formats + # Minimum length of a base64 string to consider it a real attachment. + # Avoids false positives on short strings that happen to look base64-ish. + MIN_BASE64_LEN = 20 + + # Matches "data:;base64,". + DATA_URI_PREFIX = 'data:([\w/\-.+]+);base64,' + + # Matches a base64 string of at least MIN_BASE64_LEN characters. + BASE64_STR = "([A-Za-z0-9+/=]{#{MIN_BASE64_LEN},})" + + # Compiled pattern for parsing an entire data URI. + DATA_URI_PATTERN = /#{DATA_URI_PREFIX}#{BASE64_STR}/ + + # Heuristic fragment matching a quoted data URI (OpenAI format). + DATA_URI_HEURISTIC = "\"#{DATA_URI_PREFIX}#{BASE64_STR}\"" + + # Heuristic fragment matching "bytes"/"data" keys with a base64 value + # (Bedrock/Anthropic/Gemini). + BYTE_VALUE_HEURISTIC = "\"(?:bytes|data)\"\\s*:\\s*\"#{BASE64_STR}\"" + + # A single vendor attachment format. + # + # @!attribute [r] name + # @return [String] human-readable label used for logging and test coverage tracking + # @!attribute [r] heuristic_fragment + # @return [String] regex fragment contributed to the combined fast-path heuristic + # @!attribute [r] match + # @return [Proc] +match.call(parent_key, node)+ -> Boolean + # @!attribute [r] replace + # @return [Proc] +replace.call(node, upload_fn)+ -> [replacement, ok] + Format = Struct.new(:name, :heuristic_fragment, :match, :replace, keyword_init: true) + + module_function + + # All supported vendor formats, checked in order during tree traversal. + # @return [Array] + def all + @all ||= [openai, bedrock, anthropic, gemini] + end + + # Build a single combined regex from every format's heuristic fragment. + # Identical fragments are de-duplicated. + # + # @param formats [Array] + # @return [Regexp] + def build_heuristic(formats = all) + fragments = formats.map(&:heuristic_fragment).uniq + Regexp.new(fragments.join("|")) + end + + # ── OpenAI ─────────────────────────────────────────────────────── + + # Data URIs that appear as an entire text node value. + # e.g. image_url.url = "data:image/png;base64,..." + def openai + Format.new( + name: "openai", + heuristic_fragment: DATA_URI_HEURISTIC, + match: ->(_parent_key, node) { + node.is_a?(String) && entirely_data_uri?(node) && DATA_URI_PATTERN.match?(node) + }, + replace: ->(node, upload_fn) { + m = DATA_URI_PATTERN.match(node) + next [nil, false] unless m + + upload_and_create_ref(m[1], m[2], upload_fn) + } + ) + end + + # True when the trimmed value is entirely a data URI: starts with + # "data:" and contains no quotes, backslashes, or spaces. + def entirely_data_uri?(value) + t = value.strip + t.start_with?("data:") && + !t.include?('"') && + !t.include?("\\") && + !t.include?(" ") + end + + # ── Bedrock (Converse API) ─────────────────────────────────────── + + # Per-block-type format-to-MIME mappings. The same format string (e.g. + # "mp4") resolves to different MIME types depending on the parent block + # type (video/mp4 vs audio/mp4), so a single flat table is insufficient. + CONVERSE_BLOCK_TYPE_FORMATS = { + "image" => { + "gif" => "image/gif", + "jpeg" => "image/jpeg", + "png" => "image/png", + "webp" => "image/webp" + }, + "video" => { + "flv" => "video/x-flv", + "mkv" => "video/x-matroska", + "mov" => "video/quicktime", + "mp4" => "video/mp4", + "mpeg" => "video/mpeg", + "mpg" => "video/mpeg", + "three_gp" => "video/3gpp", + "webm" => "video/webm", + "wmv" => "video/x-ms-wmv" + }, + "audio" => { + "aac" => "audio/aac", + "flac" => "audio/flac", + "m4a" => "audio/mp4", + "mka" => "audio/x-matroska", + "mkv" => "audio/x-matroska", + "mp3" => "audio/mpeg", + "mp4" => "audio/mp4", + "mpeg" => "audio/mpeg", + "mpga" => "audio/mpeg", + "ogg" => "audio/ogg", + "opus" => "audio/opus", + "pcm" => "audio/pcm", + "wav" => "audio/wav", + "webm" => "audio/webm", + "x-aac" => "audio/aac" + }, + "document" => { + "csv" => "text/csv", + "doc" => "application/msword", + "docx" => "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "html" => "text/html", + "md" => "text/markdown", + "pdf" => "application/pdf", + "txt" => "text/plain", + "xls" => "application/vnd.ms-excel", + "xlsx" => "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + } + }.freeze + + # Bedrock wraps attachments in a parent block keyed by type + # ("image"/"video"/"audio"/"document") containing + # {"format": "", "source": {"bytes": ""}}. + # The reference replaces source.bytes; surrounding fields are preserved. + def bedrock + Format.new( + name: "bedrock", + heuristic_fragment: BYTE_VALUE_HEURISTIC, + match: ->(_parent_key, node) { + node.is_a?(Hash) && !converse_block(node).nil? + }, + replace: ->(node, upload_fn) { + block = converse_block(node) + next [nil, false] unless block + + block_key, inner, format_map = block + fmt = inner["format"] + content_type = format_map[fmt.to_s.downcase] + next [nil, false] unless content_type + + source = inner["source"] + b64 = source["bytes"] + ref_val, ok = upload_and_create_ref(content_type, b64, upload_fn) + next [nil, false] unless ok + + # Copy all fields, swapping source.bytes in the matched block. + new_source = source.merge("bytes" => ref_val) + new_inner = inner.merge("source" => new_source) + [node.merge(block_key => new_inner), true] + } + ) + end + + # Returns [block_key, inner_hash, format_map] for the first recognized + # Bedrock block in +obj+, or nil. + def converse_block(obj) + CONVERSE_BLOCK_TYPE_FORMATS.each do |block_key, format_map| + inner = obj[block_key] + next unless inner.is_a?(Hash) + + fmt = inner["format"] + next unless fmt.is_a?(String) + next unless format_map.key?(fmt.downcase) + + source = inner["source"] + next unless source.is_a?(Hash) + + bytes = source["bytes"] + next unless bytes.is_a?(String) && bytes.length >= MIN_BASE64_LEN + + return [block_key, inner, format_map] + end + nil + end + + # ── Anthropic ──────────────────────────────────────────────────── + + # {"type":"base64","media_type":"","data":""} inside a + # "source" object. The entire source object is replaced with the ref. + def anthropic + Format.new( + name: "anthropic", + heuristic_fragment: BYTE_VALUE_HEURISTIC, + match: ->(_parent_key, node) { + next false unless node.is_a?(Hash) + next false unless node["type"] == "base64" + + media_type = node["media_type"] + next false unless media_type.is_a?(String) && !media_type.empty? + + data = node["data"] + data.is_a?(String) && data.length >= MIN_BASE64_LEN + }, + replace: ->(node, upload_fn) { + upload_and_create_ref(node["media_type"], node["data"], upload_fn) + } + ) + end + + # ── Gemini ─────────────────────────────────────────────────────── + + # {"inlineData": {"mimeType":"","data":""}}. + # Images become image_url: {url: ref}; non-images become + # file: {file_data: ref}. + def gemini + Format.new( + name: "gemini", + heuristic_fragment: BYTE_VALUE_HEURISTIC, + match: ->(_parent_key, node) { + next false unless node.is_a?(Hash) + + inline = node["inlineData"] + next false unless inline.is_a?(Hash) + + mime = inline["mimeType"] + next false unless mime.is_a?(String) && !mime.empty? + + data = inline["data"] + data.is_a?(String) && data.length >= MIN_BASE64_LEN + }, + replace: ->(node, upload_fn) { + inline = node["inlineData"] + content_type = inline["mimeType"] + ref_val, ok = upload_and_create_ref(content_type, inline["data"], upload_fn) + next [nil, false] unless ok + + wrapper = if content_type.start_with?("image/") + {"image_url" => {"url" => ref_val}} + else + {"file" => {"file_data" => ref_val}} + end + + # Copy all fields, swapping inlineData for the appropriate wrapper. + result = {} + node.each do |k, v| + if k == "inlineData" + result.merge!(wrapper) + else + result[k] = v + end + end + [result, true] + } + ) + end + + # ── Shared helpers ─────────────────────────────────────────────── + + # Decode base64 data, create a reference, and enqueue the upload. + # + # @return [Array(Hash, Boolean)] +[ref_hash, true]+ on success, or + # +[nil, false]+ when the data cannot be decoded or the upload was + # rejected. + def upload_and_create_ref(content_type, b64_data, upload_fn) + data = Internal::Encoding::Base64.strict_decode64(b64_data) + ref = Reference.new(content_type) + return [nil, false] unless upload_fn.call(ref, data) + + [ref.to_h, true] + rescue ArgumentError + # Invalid base64 — skip this value (per-span error). + [nil, false] + end + end + end + end +end diff --git a/lib/braintrust/trace/attachment_processor/processor.rb b/lib/braintrust/trace/attachment_processor/processor.rb new file mode 100644 index 00000000..7725187f --- /dev/null +++ b/lib/braintrust/trace/attachment_processor/processor.rb @@ -0,0 +1,146 @@ +# frozen_string_literal: true + +require "json" +require_relative "formats" +require_relative "uploader" + +module Braintrust + module Trace + module AttachmentProcessor + # Scans JSON strings for base64 attachments across multiple LLM provider + # formats, uploads them, and returns modified JSON with attachment + # references in place of the inline base64 data. + class Processor + # Maximum JSON nesting depth to recurse into. Pathological deeply-nested + # input returns unchanged once this cap is hit, rather than exhausting + # the stack. + MAX_WALK_DEPTH = 128 + + # @return [#enqueue, #shutdown?, #force_flush] the background uploader + attr_reader :uploader + + # @param uploader [#enqueue, #shutdown?] background attachment uploader + # @param logger [#warn, #debug, nil] optional logger + # @param formats [Array] vendor formats to apply + # @param json_max_nesting [Integer, false] nesting limit passed to + # JSON.parse/JSON.generate. Defaults to MAX_WALK_DEPTH so the parser + # and the walker agree on the depth cap. + def initialize(uploader:, logger: nil, formats: Formats.all, json_max_nesting: MAX_WALK_DEPTH) + @uploader = uploader + @logger = logger + @formats = formats + @heuristic = Formats.build_heuristic(formats) + @json_max_nesting = json_max_nesting + end + + # Scan +json_str+ for base64 attachments, upload them, and return the + # modified JSON. + # + # Returns the original string unchanged when: + # - it is nil/empty, or the uploader has shut down, + # - the fast-path heuristic finds nothing, + # - the JSON cannot be parsed (per-span error; does not shut anything down), + # - no attachment was found, or + # - any enqueue failed mid-walk (partial-replacement safety). + # + # @param json_str [String, nil] + # @return [String, nil] + def process_and_upload(json_str) + return json_str if json_str.nil? || json_str.empty? + return json_str if @uploader.shutdown? + return json_str unless @heuristic.match?(json_str) + + process_json(json_str) + rescue JSON::ParserError => e + # Per-span error: skip this span, do not poison the processor for + # other spans. The heuristic can match on non-LLM spans whose + # attributes merely look like base64. + @logger&.debug("attachment processing skipped for span: #{e.message}") + json_str + end + + private + + def process_json(json_str) + root = JSON.parse(json_str, max_nesting: @json_max_nesting) + + state = WalkState.new + result = walk_and_replace(root, "", state, 0) + + return json_str if state.failed || !state.modified + + JSON.generate(result, max_nesting: @json_max_nesting) + end + + # Mutable accumulator threaded through the walk. + class WalkState + attr_accessor :modified, :failed + + def initialize + @modified = false + @failed = false + end + end + + # Traverse the JSON tree. For each node, check all formats in order; the + # first whose matcher returns true handles the replacement and recursion + # stops for that subtree. Otherwise recurse into children. + # + # If an enqueue fails, +state.failed+ is set and the partially-rewritten + # tree is discarded by the caller. + def walk_and_replace(node, parent_key, state, depth) + return node if depth >= MAX_WALK_DEPTH || state.failed + + upload_fn = ->(ref, data) { + ok = @uploader.enqueue(ref, data) + state.failed = true unless ok + ok + } + + @formats.each do |fmt| + next unless fmt.match.call(parent_key, node) + + replacement, ok = fmt.replace.call(node, upload_fn) + if ok + state.modified = true + return replacement + end + # Replace returned false (e.g. decode failure / rejected upload). + # state.failed is set by upload_fn when the rejection came from the + # uploader; otherwise treat as a skip and fall through to recursion. + return node if state.failed + end + + case node + when Hash + result = nil + node.each do |k, child| + new_child = walk_and_replace(child, k, state, depth + 1) + return node if state.failed + + if !new_child.equal?(child) + result ||= node.dup + result[k] = new_child + end + end + result || node + when Array + result = nil + node.each_with_index do |child, i| + new_child = walk_and_replace(child, "", state, depth + 1) + return node if state.failed + + if !new_child.equal?(child) + result ||= node.dup + result[i] = new_child + end + end + result || node + else + node + end + end + end + end + end +end diff --git a/lib/braintrust/trace/attachment_processor/reference.rb b/lib/braintrust/trace/attachment_processor/reference.rb new file mode 100644 index 00000000..6206719b --- /dev/null +++ b/lib/braintrust/trace/attachment_processor/reference.rb @@ -0,0 +1,90 @@ +# frozen_string_literal: true + +require "securerandom" + +module Braintrust + module Trace + module AttachmentProcessor + # Reference is the JSON-serializable object that replaces inline base64 + # attachment data on a span. Its shape is the cross-SDK Braintrust + # attachment reference format understood by the Braintrust collector and + # UI. + # + # @example + # ref = Reference.new("image/png") + # ref.to_h + # # => {"type"=>"braintrust_attachment", "content_type"=>"image/png", + # # "filename"=>"attachment.png", "key"=>""} + class Reference + TYPE = "braintrust_attachment" + + # MIME type to file extension mapping. Used to derive the filename. + CONTENT_TYPE_EXTENSIONS = { + "image/png" => ".png", + "image/jpeg" => ".jpg", + "image/jpg" => ".jpg", + "image/gif" => ".gif", + "image/webp" => ".webp", + "image/svg+xml" => ".svg", + "application/pdf" => ".pdf", + "text/plain" => ".txt", + "text/csv" => ".csv", + "text/html" => ".html", + "text/markdown" => ".md", + "application/json" => ".json", + "application/msword" => ".doc", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document" => ".docx", + "application/vnd.ms-excel" => ".xls", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => ".xlsx", + "video/mp4" => ".mp4", + "video/webm" => ".webm", + "video/quicktime" => ".mov", + "audio/mpeg" => ".mp3", + "audio/mp3" => ".mp3", + "audio/wav" => ".wav" + }.freeze + + attr_reader :type, :content_type, :filename, :key + + # @param content_type [String] MIME type (e.g. "image/png") + # @param key [String, nil] Storage key; a fresh UUID is generated when omitted + def initialize(content_type, key: nil) + @type = TYPE + @content_type = content_type + @filename = "attachment#{self.class.content_type_to_extension(content_type)}" + @key = key || SecureRandom.uuid + end + + # @return [Hash] the JSON-embeddable reference object + def to_h + { + "type" => @type, + "content_type" => @content_type, + "filename" => @filename, + "key" => @key + } + end + + # Map a MIME type to a file extension (leading dot included). + # + # Falls back to the MIME subtype (stripped of parameters/suffixes) when + # the type is unknown, or an empty string when there is no subtype. + # + # @param content_type [String] + # @return [String] + def self.content_type_to_extension(content_type) + known = CONTENT_TYPE_EXTENSIONS[content_type.to_s.downcase] + return known if known + + parts = content_type.to_s.split("/", 2) + return "" unless parts.length == 2 + + sub = parts[1] + idx = sub.index(/[;-]/) + sub = sub[0...idx] if idx + sub.empty? ? "" : ".#{sub}" + end + end + end + end +end diff --git a/lib/braintrust/trace/attachment_processor/transformed_span.rb b/lib/braintrust/trace/attachment_processor/transformed_span.rb new file mode 100644 index 00000000..58e96a67 --- /dev/null +++ b/lib/braintrust/trace/attachment_processor/transformed_span.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +module Braintrust + module Trace + module AttachmentProcessor + # Wraps an ended OTel span and overrides selected attributes. + # + # By the time a span reaches +SpanProcessor#on_finish+ it has already + # ended and its attributes are frozen, so they cannot be mutated in place. + # The downstream processors (Simple/Batch) export by calling + # +span.to_span_data+, so this wrapper overrides +to_span_data+ to return a + # +SpanData+ whose attributes carry the replacements. Every other method is + # delegated to the original span. + # + # This is the Ruby equivalent of the Go SDK's transformedSpan and Java's + # TransformedReadableSpan. + class TransformedSpan + # @param span the original ended span (responds to +to_span_data+) + # @param overrides [Hash{String=>String}] attribute key => new value + def initialize(span, overrides) + @span = span + @overrides = overrides + end + + # @return [OpenTelemetry::SDK::Trace::SpanData] span data with overridden attributes + def to_span_data + data = @span.to_span_data + attrs = (data.attributes || {}).merge(@overrides) + # SpanData is a Struct; copy it with the :attributes member replaced. + # Look the member up by name so we are robust to field ordering + # changes across OTel SDK versions. + idx = data.members.index(:attributes) + values = data.to_a + values[idx] = attrs + data.class.new(*values) + end + + def respond_to_missing?(name, include_private = false) + @span.respond_to?(name, include_private) || super + end + + def method_missing(name, *args, **kwargs, &block) + if @span.respond_to?(name) + @span.send(name, *args, **kwargs, &block) + else + super + end + end + end + end + end +end diff --git a/lib/braintrust/trace/attachment_processor/uploader.rb b/lib/braintrust/trace/attachment_processor/uploader.rb new file mode 100644 index 00000000..0a148805 --- /dev/null +++ b/lib/braintrust/trace/attachment_processor/uploader.rb @@ -0,0 +1,432 @@ +# frozen_string_literal: true + +require "json" +require "net/http" +require "uri" +require_relative "../../internal/http" +require_relative "../../logger" + +module Braintrust + module Trace + module AttachmentProcessor + # Uploader that accepts all jobs but does nothing. Useful for testing the + # processor in isolation and for btx replay mode (where no real upload is + # performed but the reference rewriting still happens). + class NoopUploader + def initialize + @shutdown = false + @mutex = Mutex.new + end + + # @return [Boolean] true unless shut down + def enqueue(_ref, _data) + @mutex.synchronize { !@shutdown } + end + + # @return [Boolean] always true + def force_flush(_timeout = nil) + true + end + + def shutdown + @mutex.synchronize { @shutdown = true } + end + + # @return [Boolean] + def shutdown? + @mutex.synchronize { @shutdown } + end + end + + # Background uploader that pushes attachment data to Braintrust object + # storage via signed URLs. + # + # A single daemon thread pulls jobs from a bounded queue and runs the + # three-step upload flow (request signed URL, PUT data, report status). + # The actual uploads never block the app thread. + # + # On any upload-pipeline failure the uploader shuts itself down so that + # subsequent spans skip attachment processing entirely (and are exported + # with inline base64) rather than producing references whose data never + # lands in storage. + class S3Uploader + DEFAULT_MAX_RETRIES = 8 + DEFAULT_INITIAL_BACKOFF = 0.5 + DEFAULT_QUEUE_SIZE = 1024 + DEFAULT_SHUTDOWN_TIMEOUT = 120.0 + + # @param api_key [String] + # @param api_url [String] base URL for the Braintrust API (e.g. https://api.braintrust.dev) + # @param login_url [String, nil] app URL for the login endpoint (defaults to api_url) + # @param org_id [String, nil] pre-resolved org id; resolved via login when omitted + # @param logger [#warn, #debug, nil] + # @param max_retries [Integer] + # @param initial_backoff [Float] seconds + # @param queue_size [Integer] + # @param shutdown_timeout [Float] seconds + def initialize(api_key:, api_url:, login_url: nil, org_id: nil, logger: nil, + max_retries: DEFAULT_MAX_RETRIES, initial_backoff: DEFAULT_INITIAL_BACKOFF, + queue_size: DEFAULT_QUEUE_SIZE, shutdown_timeout: DEFAULT_SHUTDOWN_TIMEOUT) + @api_key = api_key + @api_url = api_url.to_s.sub(%r{/+\z}, "") + @login_url = (login_url || api_url).to_s.sub(%r{/+\z}, "") + @org_id = org_id + @logger = logger || Braintrust::Log + @max_retries = max_retries + @initial_backoff = initial_backoff + @shutdown_timeout = shutdown_timeout + + @queue = SizedQueue.new(queue_size) + @mutex = Mutex.new + @reject_new_jobs = false + @worker_started = false + @worker = nil + @shutdown_done = false + + # Tracks in-flight jobs (queued but not yet completed) so force_flush + # can wait for quiescence. + @inflight_mutex = Mutex.new + @inflight_cond = ConditionVariable.new + @inflight = 0 + + # Signals the worker to stop and cancels any retry backoff sleep. + @stop_mutex = Mutex.new + @stop_cond = ConditionVariable.new + @stopping = false + + # Single-flight org-id resolution. + @org_mutex = Mutex.new + @org_resolved = false + @org_error = nil + end + + # Enqueue an attachment for background upload. + # + # @return [Boolean] false if shut down or the queue is full + def enqueue(ref, data) + @mutex.synchronize do + return false if @reject_new_jobs + + ensure_worker_started + + # Bump in-flight before pushing so force_flush can't observe an + # idle state between push and the worker picking it up. + @inflight_mutex.synchronize { @inflight += 1 } + + begin + @queue.push({ref: ref, data: data}, true) # non-blocking + true + rescue ThreadError + # Queue full. + @inflight_mutex.synchronize do + @inflight -= 1 + @inflight_cond.broadcast + end + false + end + end + end + + # Block until all currently-enqueued uploads complete or +timeout+ + # seconds elapse. + # + # @param timeout [Float, nil] + # @return [Boolean] true if drained, false on timeout + def force_flush(timeout = nil) + deadline = timeout ? monotonic_now + timeout : nil + @inflight_mutex.synchronize do + while @inflight > 0 + if deadline + remaining = deadline - monotonic_now + return false if remaining <= 0 + + @inflight_cond.wait(@inflight_mutex, remaining) + else + @inflight_cond.wait(@inflight_mutex) + end + end + end + true + end + + # Stop the uploader, draining remaining jobs. Idempotent. + def shutdown + worker = nil + @mutex.synchronize do + return if @shutdown_done + + @shutdown_done = true + @reject_new_jobs = true + worker = @worker + end + + # Cancel any in-progress retry backoff and tell the worker to drain. + @stop_mutex.synchronize do + @stopping = true + @stop_cond.broadcast + end + + if worker + unless worker.join(@shutdown_timeout) + @logger.warn("Braintrust: attachment uploader shutdown timed out") + end + end + end + + # @return [Boolean] + def shutdown? + @mutex.synchronize { @reject_new_jobs } + end + + private + + # Must be called with @mutex held. + def ensure_worker_started + return if @worker_started + + @worker_started = true + @worker = Thread.new { worker_loop } + @worker.name = "braintrust-attachment-uploader" + end + + def worker_loop + @logger.debug("Braintrust: attachment uploader worker started") + loop do + job = next_job + break if job.nil? + + process_job(job) + end + @logger.debug("Braintrust: attachment uploader worker stopped") + end + + # Returns the next job, or nil when the uploader is stopping and the + # queue is drained. + def next_job + loop do + begin + return @queue.pop(true) # non-blocking + rescue ThreadError + # Queue empty. + end + + return nil if stopping? + + # Wait briefly for either a new job or a stop signal. + @stop_mutex.synchronize do + @stop_cond.wait(@stop_mutex, 0.05) unless @stopping + end + end + end + + # Run a single upload with rescue so an unexpected exception does not + # kill the worker permanently. Treat any crash as an upload failure. + def process_job(job) + upload(job) + rescue => e + @logger.error("Braintrust: attachment upload crashed: #{e.class}: #{e.message}") + fail_and_reject + ensure + @inflight_mutex.synchronize do + @inflight -= 1 + @inflight_cond.broadcast + end + end + + def upload(job) + ref = job[:ref] + data = job[:data] + + org_id = resolve_org_id + unless org_id + report_status(ref.key, "error", @org_error.to_s) + fail_and_reject + return + end + + signed_url, headers = request_upload_url(org_id, ref) + unless signed_url + report_status(ref.key, "error", "failed to request upload URL") + fail_and_reject + return + end + + if upload_to_signed_url(signed_url, headers, ref.content_type, data) + report_status(ref.key, "done") + else + report_status(ref.key, "error", "failed to upload to signed URL") + fail_and_reject + end + end + + def fail_and_reject + already = false + @mutex.synchronize do + already = @reject_new_jobs + @reject_new_jobs = true + end + unless already + @logger.warn("Braintrust: attachment uploader shutting down due to upload failure; " \ + "subsequent spans will be exported with inline base64") + end + end + + # ── org id resolution (single-flight) ──────────────────────────── + + def resolve_org_id + @org_mutex.synchronize do + return @org_id if @org_resolved || @org_id + + begin + @org_id = fetch_org_id + rescue => e + @org_error = e + @logger.warn("Braintrust: failed to resolve org id for attachment upload: #{e.message}") + ensure + @org_resolved = true + end + @org_id + end + end + + def fetch_org_id + uri = URI("#{@login_url}/api/apikey/login") + request = Net::HTTP::Post.new(uri) + request["Authorization"] = "Bearer #{@api_key}" + request["Content-Type"] = "application/json" + + response = Braintrust::Internal::Http.with_redirects(uri, request) + unless response.is_a?(Net::HTTPSuccess) + raise Error, "login returned status #{response.code}" + end + + body = JSON.parse(response.body) + org_info = body["org_info"] + raise Error, "no org info returned from login" unless org_info.is_a?(Array) && !org_info.empty? + + org_info.first["id"] + end + + # ── three-step upload flow ─────────────────────────────────────── + + def request_upload_url(org_id, ref) + uri = URI("#{@api_url}/attachment") + payload = { + key: ref.key, + filename: ref.filename, + content_type: ref.content_type, + org_id: org_id + } + body = do_with_retry(:post, uri, JSON.generate(payload), content_type: "application/json", auth: true) + return [nil, nil] unless body + + parsed = JSON.parse(body) + signed_url = parsed["signedUrl"] + return [nil, nil] if signed_url.nil? || signed_url.empty? + + headers = parsed["headers"] + headers = {} unless headers.is_a?(Hash) + [signed_url, headers] + rescue JSON::ParserError + [nil, nil] + end + + def upload_to_signed_url(signed_url, headers, content_type, data) + uri = URI(signed_url) + request = Net::HTTP::Put.new(uri) + request.body = data + request["Content-Type"] = content_type + headers.each { |k, v| request[k] = v } + add_provider_specific_headers(uri, request) + + response = do_request_with_retry(uri, request) + response&.is_a?(Net::HTTPSuccess) || false + end + + def report_status(key, status, error_message = nil) + org_id = resolve_org_id + return unless org_id + + status_map = {upload_status: status} + status_map[:error_message] = error_message if error_message && !error_message.empty? + + payload = {key: key, org_id: org_id, status: status_map} + uri = URI("#{@api_url}/attachment/status") + do_with_retry(:post, uri, JSON.generate(payload), content_type: "application/json", auth: true) + rescue => e + @logger.warn("Braintrust: failed to report attachment status (#{status}): #{e.message}") + end + + # ── HTTP helpers ───────────────────────────────────────────────── + + # @return [String, nil] response body on success, nil on failure + def do_with_retry(method, uri, body, content_type:, auth:) + klass = (method == :post) ? Net::HTTP::Post : Net::HTTP::Get + request = klass.new(uri) + request.body = body if body + request["Content-Type"] = content_type + request["Authorization"] = "Bearer #{@api_key}" if auth + + response = do_request_with_retry(uri, request) + return nil unless response&.is_a?(Net::HTTPSuccess) + + response.body + end + + # Execute a request with exponential backoff. Retries on 5xx and network + # errors; never retries 4xx. The backoff sleep is cancellable by + # shutdown. Returns the final response, or nil if all attempts failed. + def do_request_with_retry(uri, request) + backoff = @initial_backoff + last_response = nil + + (0..@max_retries).each do |attempt| + if attempt > 0 + return nil if sleep_or_cancel(backoff) + + backoff *= 2 + end + + begin + response = Braintrust::Internal::Http.with_redirects(uri, request) + rescue => e + @logger.debug("Braintrust: attachment request error (attempt #{attempt}): #{e.message}") + next + end + + return response if response.code.to_i < 500 + + last_response = response + end + + last_response + end + + # Sleep up to +seconds+, returning true early if shutdown was requested. + def sleep_or_cancel(seconds) + @stop_mutex.synchronize do + return true if @stopping + + @stop_cond.wait(@stop_mutex, seconds) + @stopping + end + end + + def add_provider_specific_headers(uri, request) + # Azure Blob Storage requires this header on PUT uploads. + if uri.host&.end_with?(".blob.core.windows.net") + request["x-ms-blob-type"] = "BlockBlob" + end + end + + def stopping? + @stop_mutex.synchronize { @stopping } + end + + def monotonic_now + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + end + end + end +end diff --git a/lib/braintrust/trace/span_processor.rb b/lib/braintrust/trace/span_processor.rb index cf2ba8a9..f3ac30a3 100644 --- a/lib/braintrust/trace/span_processor.rb +++ b/lib/braintrust/trace/span_processor.rb @@ -1,20 +1,34 @@ # frozen_string_literal: true require "opentelemetry/sdk" +require_relative "attachment_processor/transformed_span" module Braintrust module Trace - # Custom span processor that adds Braintrust-specific attributes to spans - # and optionally filters spans based on custom filter functions. + # Custom span processor that adds Braintrust-specific attributes to spans, + # optionally filters spans based on custom filter functions, and (when + # enabled) replaces inline base64 attachments with uploaded references. class SpanProcessor PARENT_ATTR_KEY = "braintrust.parent" ORG_ATTR_KEY = "braintrust.org" APP_URL_ATTR_KEY = "braintrust.app_url" - - def initialize(wrapped_processor, state, filters = []) + INPUT_JSON_ATTR_KEY = "braintrust.input_json" + OUTPUT_JSON_ATTR_KEY = "braintrust.output_json" + + # Default time budget for draining/shutting down the uploader when the + # caller does not provide one. + DEFAULT_FLUSH_TIMEOUT = 30.0 + + # @param wrapped_processor the delegate span processor (Simple/Batch) + # @param state [State] + # @param filters [Array] + # @param attachment_processor [AttachmentProcessor::Processor, nil] when + # present, scans/rewrites attachment data in onEnd + def initialize(wrapped_processor, state, filters = [], attachment_processor: nil) @wrapped = wrapped_processor @state = state @filters = filters || [] + @attachment_processor = attachment_processor end def on_start(span, parent_context) @@ -35,24 +49,80 @@ def on_start(span, parent_context) @wrapped.on_start(span, parent_context) end - # Called when a span ends - apply filters before forwarding + # Called when a span ends - apply filters, process attachments, then forward. def on_finish(span) - # Only forward span if it passes filters - @wrapped.on_finish(span) if should_forward_span?(span) + return unless should_forward_span?(span) + + span = process_attachments(span) if @attachment_processor + @wrapped.on_finish(span) end - # Shutdown the processor + # Shutdown the processor. + # + # The wrapped exporter is flushed/shut down first so that spans carrying + # attachment references reach the collector, then the uploader is drained + # so the referenced binary data is available in object storage. def shutdown(timeout: nil) - @wrapped.shutdown(timeout: timeout) + result = @wrapped.shutdown(timeout: timeout) + shutdown_uploader(timeout) + result end - # Force flush any buffered spans + # Force flush any buffered spans, then drain the uploader. def force_flush(timeout: nil) - @wrapped.force_flush(timeout: timeout) + result = @wrapped.force_flush(timeout: timeout) + @attachment_processor&.uploader&.force_flush(timeout || DEFAULT_FLUSH_TIMEOUT) + result end private + # Scan input/output JSON attributes for base64 attachments. If either was + # rewritten, forward a transformed span carrying the new values; otherwise + # forward the original span unchanged. + def process_attachments(span) + attrs = span.respond_to?(:attributes) ? span.attributes : nil + return span unless attrs + + input_json = attrs[INPUT_JSON_ATTR_KEY] + output_json = attrs[OUTPUT_JSON_ATTR_KEY] + + new_input = @attachment_processor.process_and_upload(input_json) + new_output = @attachment_processor.process_and_upload(output_json) + + overrides = {} + overrides[INPUT_JSON_ATTR_KEY] = new_input unless new_input.equal?(input_json) + overrides[OUTPUT_JSON_ATTR_KEY] = new_output unless new_output.equal?(output_json) + return span if overrides.empty? + + AttachmentProcessor::TransformedSpan.new(span, overrides) + rescue => e + # Attachment processing must never prevent a span from being exported. + Braintrust::Log.debug("Braintrust: attachment processing error: #{e.message}") + span + end + + # Drain/shut down the uploader, honoring the caller's deadline. If the + # uploader cannot finish in time, abandon the wait rather than blocking + # the caller past their budget (the worker keeps running in the + # background until process exit). + def shutdown_uploader(timeout) + uploader = @attachment_processor&.uploader + return unless uploader + + if timeout + # Run shutdown in the background and wait up to the caller's deadline. + # If it doesn't finish, abandon the wait and let the worker continue + # draining in the background rather than blocking past the budget. + thread = Thread.new { uploader.shutdown } + unless thread.join(timeout) + Braintrust::Log.debug("Braintrust: attachment uploader shutdown exceeded caller deadline; continuing in background") + end + else + uploader.shutdown + end + end + def default_parent # If default_project is set, format it as "project_name:value" # The default_project should be a plain project name (e.g., "my-project") diff --git a/test/braintrust/config_test.rb b/test/braintrust/config_test.rb index 0c9edbef..98c9b886 100644 --- a/test/braintrust/config_test.rb +++ b/test/braintrust/config_test.rb @@ -6,7 +6,8 @@ "BRAINTRUST_API_KEY" => ENV["BRAINTRUST_API_KEY"], "BRAINTRUST_ORG_NAME" => ENV["BRAINTRUST_ORG_NAME"], "BRAINTRUST_APP_URL" => ENV["BRAINTRUST_APP_URL"], - "BRAINTRUST_API_URL" => ENV["BRAINTRUST_API_URL"] + "BRAINTRUST_API_URL" => ENV["BRAINTRUST_API_URL"], + "BRAINTRUST_AUTO_CONVERT_AI_ATTACHMENTS" => ENV["BRAINTRUST_AUTO_CONVERT_AI_ATTACHMENTS"] }.freeze class Braintrust::ConfigTest < Minitest::Test @@ -61,4 +62,24 @@ def test_env_vars_override_defaults assert_equal "https://custom.braintrust.dev", config.app_url end + + def test_auto_convert_ai_attachments_defaults_to_true + assert_equal true, Braintrust::Config.from_env.auto_convert_ai_attachments + end + + def test_auto_convert_ai_attachments_disabled_via_env + ENV["BRAINTRUST_AUTO_CONVERT_AI_ATTACHMENTS"] = "false" + assert_equal false, Braintrust::Config.from_env.auto_convert_ai_attachments + end + + def test_auto_convert_ai_attachments_explicit_overrides_env + ENV["BRAINTRUST_AUTO_CONVERT_AI_ATTACHMENTS"] = "false" + config = Braintrust::Config.from_env(auto_convert_ai_attachments: true) + assert_equal true, config.auto_convert_ai_attachments + end + + def test_auto_convert_ai_attachments_truthy_env_keeps_enabled + ENV["BRAINTRUST_AUTO_CONVERT_AI_ATTACHMENTS"] = "true" + assert_equal true, Braintrust::Config.from_env.auto_convert_ai_attachments + end end diff --git a/test/braintrust/trace/attachment_processor/formats_test.rb b/test/braintrust/trace/attachment_processor/formats_test.rb new file mode 100644 index 00000000..4cb2b8bf --- /dev/null +++ b/test/braintrust/trace/attachment_processor/formats_test.rb @@ -0,0 +1,193 @@ +# frozen_string_literal: true + +require "test_helper" +require "json" +require "braintrust/trace/attachment_processor/formats" +require "braintrust/trace/attachment_processor/processor" + +module Braintrust + module Trace + module AttachmentProcessor + # Data-driven coverage of every vendor format. The TestAllFormatsHaveTestCases + # test below fails if a format is added to Formats.all without a + # corresponding test case here. + class FormatsTest < Minitest::Test + # 1x1 red PNG pixel, valid base64. + BASE64_PNG = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg==" + # Single blank-page PDF, valid base64. + BASE64_PDF = "JVBERi0xLjAKMSAwIG9iago8PCAvVHlwZSAvQ2F0YWxvZyAvUGFnZXMgMiAwIFIgPj4KZW5kb2JqCjIgMCBvYmoKPDwgL1R5cGUgL1BhZ2VzIC9LaWRzIFszIDAgUl0gL0NvdW50IDEgPj4KZW5kb2JqCjMgMCBvYmoKPDwgL1R5cGUgL1BhZ2UgL1BhcmVudCAyIDAgUiAvTWVkaWFCb3ggWzAgMCA2MTIgNzkyXSA+PgplbmRvYmoKeHJlZgowIDQKMDAwMDAwMDAwMCA2NTUzNSBmIAowMDAwMDAwMDA5IDAwMDAwIG4gCjAwMDAwMDAwNTggMDAwMDAgbiAKMDAwMDAwMDExNSAwMDAwMCBuIAp0cmFpbGVyCjw8IC9TaXplIDQgL1Jvb3QgMSAwIFIgPj4Kc3RhcnR4cmVmCjE5MAolJUVPRgo=" + + # Each entry: format name (must match Format#name), input JSON, and an + # assertion block run against the parsed, processed result. + CASES = [ + { + name: "openai-image", + format: "openai", + input: {"role" => "user", "content" => [ + {"type" => "text", "text" => "describe"}, + {"type" => "image_url", "image_url" => {"url" => "data:image/png;base64,#{BASE64_PNG}"}} + ]}, + assert: ->(t, root) { + part = root["content"][1] + t.assert_attachment_ref(part["image_url"]["url"], "image/png") + } + }, + { + name: "openai-file", + format: "openai", + input: {"role" => "user", "content" => [ + {"type" => "file", "file" => {"filename" => "blank.pdf", "file_data" => "data:application/pdf;base64,#{BASE64_PDF}"}} + ]}, + assert: ->(t, root) { + file = root["content"][0]["file"] + t.assert_equal "blank.pdf", file["filename"] + t.assert_attachment_ref(file["file_data"], "application/pdf") + } + }, + { + name: "bedrock-image", + format: "bedrock", + input: {"role" => "user", "content" => [ + {"image" => {"format" => "png", "source" => {"bytes" => BASE64_PNG}}} + ]}, + assert: ->(t, root) { + image = root["content"][0]["image"] + t.assert_equal "png", image["format"] + t.assert_attachment_ref(image["source"]["bytes"], "image/png") + } + }, + { + name: "bedrock-document", + format: "bedrock", + input: {"role" => "user", "content" => [ + {"document" => {"format" => "pdf", "name" => "blank", "source" => {"bytes" => BASE64_PDF}}} + ]}, + assert: ->(t, root) { + doc = root["content"][0]["document"] + t.assert_equal "pdf", doc["format"] + t.assert_equal "blank", doc["name"] + t.assert_attachment_ref(doc["source"]["bytes"], "application/pdf") + } + }, + { + # Ambiguous "mp4" under an audio block must resolve to audio/mp4, + # not video/mp4. + name: "bedrock-audio-mp4", + format: "bedrock", + input: {"role" => "user", "content" => [ + {"audio" => {"format" => "mp4", "source" => {"bytes" => BASE64_PDF}}} + ]}, + assert: ->(t, root) { + audio = root["content"][0]["audio"] + t.assert_equal "mp4", audio["format"] + t.assert_attachment_ref(audio["source"]["bytes"], "audio/mp4") + } + }, + { + name: "anthropic-image", + format: "anthropic", + input: {"role" => "user", "content" => [ + {"type" => "image", "source" => {"type" => "base64", "media_type" => "image/png", "data" => BASE64_PNG}} + ]}, + assert: ->(t, root) { + part = root["content"][0] + t.assert_equal "image", part["type"] + t.assert_attachment_ref(part["source"], "image/png") + } + }, + { + name: "anthropic-document", + format: "anthropic", + input: {"role" => "user", "content" => [ + {"type" => "document", "source" => {"type" => "base64", "media_type" => "application/pdf", "data" => BASE64_PDF}} + ]}, + assert: ->(t, root) { + part = root["content"][0] + t.assert_equal "document", part["type"] + t.assert_attachment_ref(part["source"], "application/pdf") + } + }, + { + name: "gemini-image", + format: "gemini", + input: {"contents" => [ + {"role" => "user", "parts" => [ + {"inlineData" => {"mimeType" => "image/png", "data" => BASE64_PNG}} + ]} + ]}, + assert: ->(t, root) { + part = root["contents"][0]["parts"][0] + t.assert_nil part["inlineData"], "inlineData should be removed" + t.assert_attachment_ref(part["image_url"]["url"], "image/png") + } + }, + { + name: "gemini-document", + format: "gemini", + input: {"contents" => [ + {"role" => "user", "parts" => [ + {"inlineData" => {"mimeType" => "application/pdf", "data" => BASE64_PDF}} + ]} + ]}, + assert: ->(t, root) { + part = root["contents"][0]["parts"][0] + t.assert_nil part["inlineData"], "inlineData should be removed" + t.assert_nil part["image_url"], "non-image should not use image_url" + t.assert_attachment_ref(part["file"]["file_data"], "application/pdf") + } + } + ].freeze + + CASES.each do |tc| + define_method("test_#{tc[:name].tr("-", "_")}") do + input_json = JSON.generate(tc[:input]) + + # The combined heuristic must match this format's test data. + assert build_heuristic.match?(input_json), + "heuristic should match test data for #{tc[:name]}" + + processor = Processor.new(uploader: NoopUploader.new) + result = processor.process_and_upload(input_json) + refute_equal input_json, result, "base64 should have been replaced for #{tc[:name]}" + + root = JSON.parse(result) + instance_exec(self, root, &tc[:assert]) + end + end + + # Ensures adding a new format without test data causes a failure. + def test_all_formats_have_test_cases + covered = CASES.map { |tc| tc[:format] }.uniq + Formats.all.each do |fmt| + assert_includes covered, fmt.name, + "format #{fmt.name.inspect} has no test cases in CASES — add at least one" + end + end + + def test_entirely_data_uri + assert Formats.entirely_data_uri?("data:image/png;base64,abc123") + assert Formats.entirely_data_uri?(" data:image/png;base64,abc123 ") + refute Formats.entirely_data_uri?("Check this: data:image/png;base64,abc123 please") + refute Formats.entirely_data_uri?('"data:image/png;base64,abc123"') + refute Formats.entirely_data_uri?('data:image/png;base64,abc\\n123') + refute Formats.entirely_data_uri?("not-a-data-uri") + end + + # Test assertion helper exposed to case blocks. + def assert_attachment_ref(node, expected_content_type) + assert node.is_a?(Hash), "attachment ref should be a Hash, got #{node.class}" + assert_equal "braintrust_attachment", node["type"] + assert_equal expected_content_type, node["content_type"] + refute_empty node["filename"] + refute_empty node["key"] + end + + private + + def build_heuristic + @build_heuristic ||= Formats.build_heuristic + end + end + end + end +end diff --git a/test/braintrust/trace/attachment_processor/processor_test.rb b/test/braintrust/trace/attachment_processor/processor_test.rb new file mode 100644 index 00000000..f6840f7f --- /dev/null +++ b/test/braintrust/trace/attachment_processor/processor_test.rb @@ -0,0 +1,160 @@ +# frozen_string_literal: true + +require "test_helper" +require "json" +require "braintrust/trace/attachment_processor/processor" + +module Braintrust + module Trace + module AttachmentProcessor + class ProcessorTest < Minitest::Test + BASE64_PNG = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg==" + + # Uploader that rejects every enqueue. + class RejectingUploader < NoopUploader + def enqueue(_ref, _data) + false + end + end + + # Uploader that accepts the first N enqueues, then rejects. + class LimitedUploader < NoopUploader + def initialize(remaining) + super() + @remaining = remaining + end + + def enqueue(_ref, _data) + return false if @remaining <= 0 + + @remaining -= 1 + true + end + end + + def processor(uploader = NoopUploader.new) + Processor.new(uploader: uploader) + end + + def openai_image_json + JSON.generate([{"role" => "user", "content" => [ + {"type" => "image_url", "image_url" => {"url" => "data:image/png;base64,#{BASE64_PNG}"}} + ]}]) + end + + def test_non_attachment_input_unchanged + input = JSON.generate([{"role" => "user", "content" => "Hello, how are you?"}]) + assert_equal input, processor.process_and_upload(input) + end + + def test_partial_data_uri_in_text_not_replaced + input = JSON.generate([{"role" => "user", + "content" => "Check this: data:image/png;base64,#{BASE64_PNG} please"}]) + assert_equal input, processor.process_and_upload(input) + end + + def test_short_base64_not_replaced + input = JSON.generate([{"role" => "user", "content" => [ + {"type" => "image", "source" => {"type" => "base64", "media_type" => "image/png", "data" => "abc123"}} + ]}]) + assert_equal input, processor.process_and_upload(input) + end + + def test_empty_input_returns_empty + assert_equal "", processor.process_and_upload("") + assert_nil processor.process_and_upload(nil) + end + + def test_heuristic_skips_plain_text + input = JSON.generate({"messages" => [{"role" => "user", "content" => "just text"}]}) + assert_equal input, processor.process_and_upload(input) + end + + def test_malformed_json_does_not_kill_processor + uploader = NoopUploader.new + p = processor(uploader) + + # Passes the heuristic but fails to parse. + bad = %({"data":"#{BASE64_PNG}" INVALID) + assert_equal bad, p.process_and_upload(bad), "should return original on parse error" + refute uploader.shutdown?, "uploader must not shut down on a parse error" + + # A subsequent valid span still gets processed. + good = p.process_and_upload(openai_image_json) + refute_equal openai_image_json, good + assert_includes good, "braintrust_attachment" + end + + def test_uploader_shutdown_skips_processing + uploader = NoopUploader.new + uploader.shutdown + assert_equal openai_image_json, processor(uploader).process_and_upload(openai_image_json) + end + + def test_rejecting_uploader_returns_original + assert_equal openai_image_json, processor(RejectingUploader.new).process_and_upload(openai_image_json) + end + + def test_partial_enqueue_failure_returns_original + # Two attachments, limit of 1: the second enqueue fails mid-walk and + # the whole rewrite must be abandoned (partial-replacement safety). + input = JSON.generate([{"role" => "user", "content" => [ + {"type" => "image_url", "image_url" => {"url" => "data:image/png;base64,#{BASE64_PNG}"}}, + {"type" => "image_url", "image_url" => {"url" => "data:image/png;base64,#{BASE64_PNG}"}} + ]}]) + assert_equal input, processor(LimitedUploader.new(1)).process_and_upload(input) + end + + def test_multiple_attachments_all_replaced + input = JSON.generate([{"role" => "user", "content" => [ + {"type" => "image_url", "image_url" => {"url" => "data:image/png;base64,#{BASE64_PNG}"}}, + {"type" => "image_url", "image_url" => {"url" => "data:image/png;base64,#{BASE64_PNG}"}} + ]}]) + result = processor.process_and_upload(input) + parsed = JSON.parse(result) + content = parsed[0]["content"] + assert_equal "braintrust_attachment", content[0]["image_url"]["url"]["type"] + assert_equal "braintrust_attachment", content[1]["image_url"]["url"]["type"] + end + + def test_deeply_nested_input_is_returned_unchanged + # Build nesting deeper than MAX_WALK_DEPTH with an attachment at the + # bottom; the walker must stop at the cap and leave it unchanged + # (and must not exhaust the stack). + node = {"type" => "image_url", "image_url" => {"url" => "data:image/png;base64,#{BASE64_PNG}"}} + (Processor::MAX_WALK_DEPTH + 10).times { node = {"nested" => node} } + input = JSON.generate(node, max_nesting: false) + + # Use a processor that parses with a generous nesting limit so we + # exercise the walker's own depth cap rather than the JSON parser's. + uploader = NoopUploader.new + p = Processor.new(uploader: uploader, json_max_nesting: false) + result = p.process_and_upload(input) + assert_equal input, result + refute uploader.shutdown?, "depth cap must not be treated as an upload failure" + end + + def test_input_exceeding_json_parse_depth_is_skipped + # Real input deeper than the JSON parser's default nesting limit is + # treated as a per-span parse error and returned unchanged. + node = {"type" => "image_url", "image_url" => {"url" => "data:image/png;base64,#{BASE64_PNG}"}} + 200.times { node = {"nested" => node} } + input = JSON.generate(node, max_nesting: false) + uploader = NoopUploader.new + assert_equal input, processor(uploader).process_and_upload(input) + refute uploader.shutdown? + end + + def test_unrelated_fields_preserved + input = JSON.generate([{"role" => "user", "extra" => "keep", "content" => [ + {"type" => "text", "text" => "hi"}, + {"type" => "image_url", "image_url" => {"url" => "data:image/png;base64,#{BASE64_PNG}"}} + ]}]) + parsed = JSON.parse(processor.process_and_upload(input)) + assert_equal "keep", parsed[0]["extra"] + assert_equal "hi", parsed[0]["content"][0]["text"] + end + end + end + end +end diff --git a/test/braintrust/trace/attachment_processor/reference_test.rb b/test/braintrust/trace/attachment_processor/reference_test.rb new file mode 100644 index 00000000..00e7df5b --- /dev/null +++ b/test/braintrust/trace/attachment_processor/reference_test.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +require "test_helper" +require "braintrust/trace/attachment_processor/reference" + +module Braintrust + module Trace + module AttachmentProcessor + class ReferenceTest < Minitest::Test + def test_builds_braintrust_attachment_reference + ref = Reference.new("image/png") + + assert_equal "braintrust_attachment", ref.type + assert_equal "image/png", ref.content_type + assert_equal "attachment.png", ref.filename + refute_empty ref.key + end + + def test_generates_unique_keys + assert_match(/\A[0-9a-f-]{36}\z/, Reference.new("image/png").key) + refute_equal Reference.new("image/png").key, Reference.new("image/png").key + end + + def test_accepts_explicit_key + ref = Reference.new("image/png", key: "fixed-key") + assert_equal "fixed-key", ref.key + end + + def test_to_h_shape + ref = Reference.new("application/pdf", key: "k") + assert_equal({ + "type" => "braintrust_attachment", + "content_type" => "application/pdf", + "filename" => "attachment.pdf", + "key" => "k" + }, ref.to_h) + end + + def test_content_type_to_extension_known_types + { + "image/png" => ".png", + "image/jpeg" => ".jpg", + "application/pdf" => ".pdf", + "video/mp4" => ".mp4", + "audio/mpeg" => ".mp3", + "audio/wav" => ".wav" + }.each do |mime, ext| + assert_equal ext, Reference.content_type_to_extension(mime), mime + end + end + + def test_content_type_to_extension_unknown_falls_back_to_subtype + # Strips at the first "-" or ";" like the Go reference impl. + assert_equal ".octet", Reference.content_type_to_extension("application/octet-stream") + end + + def test_content_type_to_extension_strips_parameters + assert_equal ".plain", Reference.content_type_to_extension("text/plain;charset=utf-8") + end + + def test_content_type_to_extension_no_subtype + assert_equal "", Reference.content_type_to_extension("weird") + end + + def test_content_type_to_extension_is_case_insensitive + assert_equal ".png", Reference.content_type_to_extension("IMAGE/PNG") + end + end + end + end +end diff --git a/test/braintrust/trace/attachment_processor/transformed_span_test.rb b/test/braintrust/trace/attachment_processor/transformed_span_test.rb new file mode 100644 index 00000000..b60de8e4 --- /dev/null +++ b/test/braintrust/trace/attachment_processor/transformed_span_test.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +require "test_helper" +require "opentelemetry/sdk" +require "braintrust/trace/attachment_processor/transformed_span" + +module Braintrust + module Trace + module AttachmentProcessor + class TransformedSpanTest < Minitest::Test + include ::Test::Support::TracingHelper + + def make_span + rig = setup_otel_test_rig + tracer = rig.tracer + span = tracer.start_span("test") + span.set_attribute("braintrust.input_json", %([{"role":"user"}])) + span.set_attribute("keep", "original") + span.finish + span + end + + def test_overrides_attribute_in_span_data + span = make_span + wrapped = TransformedSpan.new(span, {"braintrust.input_json" => "REPLACED"}) + data = wrapped.to_span_data + + assert_equal "REPLACED", data.attributes["braintrust.input_json"] + assert_equal "original", data.attributes["keep"], "non-overridden attributes preserved" + end + + def test_does_not_mutate_original_span + span = make_span + TransformedSpan.new(span, {"braintrust.input_json" => "REPLACED"}).to_span_data + + assert_equal %([{"role":"user"}]), span.to_span_data.attributes["braintrust.input_json"] + end + + def test_delegates_other_methods + span = make_span + wrapped = TransformedSpan.new(span, {}) + assert_equal span.to_span_data.name, wrapped.to_span_data.name + assert wrapped.respond_to?(:to_span_data) + end + + def test_preserves_other_span_data_fields + span = make_span + original = span.to_span_data + data = TransformedSpan.new(span, {"keep" => "changed"}).to_span_data + + assert_equal original.name, data.name + assert_equal original.trace_id, data.trace_id + assert_equal original.span_id, data.span_id + assert_equal original.kind, data.kind + end + end + end + end +end diff --git a/test/braintrust/trace/attachment_processor/uploader_test.rb b/test/braintrust/trace/attachment_processor/uploader_test.rb new file mode 100644 index 00000000..c6e8da83 --- /dev/null +++ b/test/braintrust/trace/attachment_processor/uploader_test.rb @@ -0,0 +1,248 @@ +# frozen_string_literal: true + +require "test_helper" +require "json" +require "braintrust/trace/attachment_processor/uploader" +require "braintrust/trace/attachment_processor/reference" + +module Braintrust + module Trace + module AttachmentProcessor + class UploaderTest < Minitest::Test + API_URL = "https://api.test.local" + SIGNED_URL = "https://storage.test.local/upload/abc" + + # Swallows log output so expected failure-path warnings don't pollute + # test output. + class SilentLogger + def warn(*) = nil + + def debug(*) = nil + + def error(*) = nil + + def info(*) = nil + end + + def setup + @ref = Reference.new("image/png", key: "test-key") + @data = "binary-png-bytes" + end + + def build_uploader(**opts) + S3Uploader.new( + api_key: "test-api-key", + api_url: API_URL, + org_id: "org-123", + initial_backoff: 0.01, + shutdown_timeout: 5.0, + logger: SilentLogger.new, + **opts + ) + end + + def stub_signed_url(headers: {}) + stub_request(:post, "#{API_URL}/attachment") + .to_return(status: 200, body: JSON.generate({"signedUrl" => SIGNED_URL, "headers" => headers})) + end + + def stub_upload(url: SIGNED_URL, status: 200) + stub_request(:put, url).to_return(status: status) + end + + def stub_status + stub_request(:post, "#{API_URL}/attachment/status").to_return(status: 200, body: "{}") + end + + def test_successful_upload_runs_full_three_step_flow + stub_signed_url + stub_upload + stub_status + + uploader = build_uploader + assert uploader.enqueue(@ref, @data) + assert uploader.force_flush(5.0) + + refute uploader.shutdown?, "successful upload should not flag the uploader as failed" + uploader.shutdown + + assert_requested(:post, "#{API_URL}/attachment") + assert_requested(:put, SIGNED_URL) + assert_requested(:post, "#{API_URL}/attachment/status") + end + + def test_signed_url_request_sends_correct_payload + stub_signed_url + stub_upload + stub_status + + uploader = build_uploader + uploader.enqueue(@ref, @data) + uploader.force_flush(5.0) + uploader.shutdown + + assert_requested(:post, "#{API_URL}/attachment") do |req| + body = JSON.parse(req.body) + body["key"] == "test-key" && + body["filename"] == "attachment.png" && + body["content_type"] == "image/png" && + body["org_id"] == "org-123" && + req.headers["Authorization"] == "Bearer test-api-key" + end + end + + def test_upload_uses_content_type_and_signed_headers + stub_signed_url(headers: {"x-custom" => "yes"}) + stub_upload + stub_status + + uploader = build_uploader + uploader.enqueue(@ref, @data) + uploader.force_flush(5.0) + uploader.shutdown + + assert_requested(:put, SIGNED_URL) do |req| + req.headers["Content-Type"] == "image/png" && + req.headers["X-Custom"] == "yes" && + req.body == @data + end + end + + def test_azure_blob_adds_block_blob_header + azure_url = "https://acct.blob.core.windows.net/c/blob" + stub_request(:post, "#{API_URL}/attachment") + .to_return(status: 200, body: JSON.generate({"signedUrl" => azure_url, "headers" => {}})) + stub_upload(url: azure_url) + stub_status + + uploader = build_uploader + uploader.enqueue(@ref, @data) + uploader.force_flush(5.0) + uploader.shutdown + + assert_requested(:put, azure_url) { |req| req.headers["X-Ms-Blob-Type"] == "BlockBlob" } + end + + def test_retries_on_5xx_then_succeeds + stub_request(:post, "#{API_URL}/attachment") + .to_return(status: 500).times(2).then + .to_return(status: 200, body: JSON.generate({"signedUrl" => SIGNED_URL, "headers" => {}})) + stub_upload + stub_status + + uploader = build_uploader + uploader.enqueue(@ref, @data) + uploader.force_flush(5.0) + + refute uploader.shutdown?, "should recover after retries" + uploader.shutdown + assert_requested(:post, "#{API_URL}/attachment", times: 3) + end + + def test_does_not_retry_4xx_and_fails + stub_request(:post, "#{API_URL}/attachment").to_return(status: 400) + stub_status + + uploader = build_uploader + uploader.enqueue(@ref, @data) + uploader.force_flush(5.0) + + assert uploader.shutdown?, "4xx on signed URL should fail the uploader" + uploader.shutdown + assert_requested(:post, "#{API_URL}/attachment", times: 1) + end + + def test_upload_failure_rejects_subsequent_enqueues + stub_request(:post, "#{API_URL}/attachment").to_return(status: 400) + stub_status + + uploader = build_uploader + assert uploader.enqueue(@ref, @data) + uploader.force_flush(5.0) + + refute uploader.enqueue(@ref, @data), "after a failure, new jobs must be rejected" + uploader.shutdown + end + + def test_reports_error_status_on_failure + stub_request(:post, "#{API_URL}/attachment").to_return(status: 400) + stub_status + + uploader = build_uploader + uploader.enqueue(@ref, @data) + uploader.force_flush(5.0) + uploader.shutdown + + assert_requested(:post, "#{API_URL}/attachment/status") do |req| + body = JSON.parse(req.body) + body["key"] == "test-key" && body.dig("status", "upload_status") == "error" + end + end + + def test_shutdown_is_idempotent + stub_signed_url + stub_upload + stub_status + + uploader = build_uploader + uploader.enqueue(@ref, @data) + uploader.shutdown + uploader.shutdown # must not raise or hang + assert uploader.shutdown? + end + + def test_enqueue_rejected_after_shutdown + uploader = build_uploader + uploader.shutdown + refute uploader.enqueue(@ref, @data) + end + + def test_resolves_org_id_via_login_when_not_provided + stub_request(:post, "#{API_URL}/api/apikey/login") + .to_return(status: 200, body: JSON.generate({"org_info" => [{"id" => "resolved-org"}]})) + stub_signed_url + stub_upload + stub_status + + uploader = S3Uploader.new(api_key: "test-api-key", api_url: API_URL, initial_backoff: 0.01) + uploader.enqueue(@ref, @data) + uploader.force_flush(5.0) + uploader.shutdown + + assert_requested(:post, "#{API_URL}/api/apikey/login") + assert_requested(:post, "#{API_URL}/attachment") { |req| JSON.parse(req.body)["org_id"] == "resolved-org" } + end + + def test_login_resolved_once_for_multiple_uploads + stub_request(:post, "#{API_URL}/api/apikey/login") + .to_return(status: 200, body: JSON.generate({"org_info" => [{"id" => "resolved-org"}]})) + stub_signed_url + stub_upload + stub_status + + uploader = S3Uploader.new(api_key: "test-api-key", api_url: API_URL, initial_backoff: 0.01) + 3.times { uploader.enqueue(@ref, @data) } + uploader.force_flush(5.0) + uploader.shutdown + + assert_requested(:post, "#{API_URL}/api/apikey/login", times: 1) + end + + def test_force_flush_times_out_when_not_drained + # Signed URL request blocks long enough to exceed the flush timeout. + stub_request(:post, "#{API_URL}/attachment").to_return do + sleep 0.5 + {status: 200, body: JSON.generate({"signedUrl" => SIGNED_URL, "headers" => {}})} + end + stub_upload + stub_status + + uploader = build_uploader + uploader.enqueue(@ref, @data) + refute uploader.force_flush(0.05), "force_flush should report timeout" + uploader.shutdown + end + end + end + end +end diff --git a/test/braintrust/trace/span_processor_attachment_test.rb b/test/braintrust/trace/span_processor_attachment_test.rb new file mode 100644 index 00000000..ffdd56eb --- /dev/null +++ b/test/braintrust/trace/span_processor_attachment_test.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +require "test_helper" +require "json" +require "braintrust/trace/attachment_processor/processor" +require "braintrust/trace/attachment_processor/uploader" + +module Braintrust + module Trace + # End-to-end coverage of attachment processing through the SpanProcessor + # using the in-memory OTel test rig. + class SpanProcessorAttachmentTest < Minitest::Test + include ::Test::Support::TracingHelper + + BASE64_PNG = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg==" + + # Builds a rig whose Braintrust SpanProcessor has an attachment processor + # backed by the given uploader. + def build_rig(uploader) + require "opentelemetry/sdk" + exporter = OpenTelemetry::SDK::Trace::Export::InMemorySpanExporter.new + tracer_provider = OpenTelemetry::SDK::Trace::TracerProvider.new + state = get_unit_test_state + simple = OpenTelemetry::SDK::Trace::Export::SimpleSpanProcessor.new(exporter) + processor = AttachmentProcessor::Processor.new(uploader: uploader) + bt = SpanProcessor.new(simple, state, [], attachment_processor: processor) + tracer_provider.add_span_processor(bt) + [tracer_provider, exporter] + end + + def emit_span(tracer_provider, input:) + tracer = tracer_provider.tracer("test") + span = tracer.start_span("Chat Completion") + span.set_attribute("braintrust.input_json", input) + span.finish + tracer_provider.force_flush + end + + def openai_input + JSON.generate([{"role" => "user", "content" => [ + {"type" => "image_url", "image_url" => {"url" => "data:image/png;base64,#{BASE64_PNG}"}} + ]}]) + end + + def test_replaces_attachment_in_exported_span + uploader = AttachmentProcessor::NoopUploader.new + tracer_provider, exporter = build_rig(uploader) + + emit_span(tracer_provider, input: openai_input) + + span = exporter.finished_spans.first + refute_nil span + parsed = JSON.parse(span.attributes["braintrust.input_json"]) + ref = parsed[0]["content"][0]["image_url"]["url"] + assert_equal "braintrust_attachment", ref["type"] + assert_equal "image/png", ref["content_type"] + end + + def test_passes_through_span_without_attachments + uploader = AttachmentProcessor::NoopUploader.new + tracer_provider, exporter = build_rig(uploader) + + plain = JSON.generate([{"role" => "user", "content" => "hello"}]) + emit_span(tracer_provider, input: plain) + + span = exporter.finished_spans.first + assert_equal plain, span.attributes["braintrust.input_json"] + end + + def test_falls_back_to_inline_when_uploader_rejects + # A rejecting uploader means the span must be exported with inline base64. + rejecting = Class.new(AttachmentProcessor::NoopUploader) do + def enqueue(_ref, _data) = false + end.new + tracer_provider, exporter = build_rig(rejecting) + + emit_span(tracer_provider, input: openai_input) + + span = exporter.finished_spans.first + assert_equal openai_input, span.attributes["braintrust.input_json"], + "rejected upload should leave inline base64 untouched" + end + end + end +end diff --git a/test/btx/btx_test.rb b/test/btx/btx_test.rb index 6b471f61..20fe2843 100644 --- a/test/btx/btx_test.rb +++ b/test/btx/btx_test.rb @@ -74,7 +74,18 @@ def run_spec(spec) result = with_cassette(spec) { executor.execute(spec) } # The in-memory OTel spans are converted to brainstore format in every mode. - converted = Braintrust::BTX::SpanConverter.to_brainstore_spans(result.otel_spans) + # + # Attachment handling differs by mode: + # - live: the SDK attachment processor is OFF and the backend converts + # data URIs to references after ingestion, so the converter replicates + # that transform to match what the backend stores. + # - replay: the SDK attachment processor runs in-process and is expected + # to produce the references itself, so the converter must NOT transform. + # This makes the attachment specs a genuine end-to-end check of the + # processor — they fail if it is disabled or broken. + converted = Braintrust::BTX::SpanConverter.to_brainstore_spans( + result.otel_spans, transform_attachments: live + ) if live run_spec_live(spec, result, state, converted) diff --git a/test/btx/span_converter.rb b/test/btx/span_converter.rb index bcdf72d4..f64841b8 100644 --- a/test/btx/span_converter.rb +++ b/test/btx/span_converter.rb @@ -30,11 +30,20 @@ module SpanConverter # Convert a list of exported OTel SpanData into brainstore-format hashes. # # @param otel_spans [Array] + # @param transform_attachments [Boolean] when true, replicate the backend's + # data-URI -> braintrust_attachment transform on the input. This is the + # behavior for live mode, where the SDK attachment processor is disabled + # and the backend performs the conversion after ingestion. + # + # In replay mode the SDK attachment processor runs in-process and is + # expected to produce the references itself, so the converter must NOT + # re-transform — otherwise it would mask a broken/disabled processor and + # the attachment specs would pass even when the SDK did nothing. # @return [Array] brainstore spans, in input order - def to_brainstore_spans(otel_spans) + def to_brainstore_spans(otel_spans, transform_attachments: true) otel_spans .select { |span| llm_instrumentation_span?(span) } - .map { |span| to_single_brainstore_span(span) } + .map { |span| to_single_brainstore_span(span, transform_attachments: transform_attachments) } end def llm_instrumentation_span?(span) @@ -42,12 +51,13 @@ def llm_instrumentation_span?(span) !attrs["braintrust.span_attributes"].nil? end - def to_single_brainstore_span(span) + def to_single_brainstore_span(span, transform_attachments: true) result = {} result["name"] = span.name result["metrics"] = parse_json_map(span, "braintrust.metrics") result["metadata"] = parse_json_map(span, "braintrust.metadata") - result["input"] = transform_input(parse_json_value(span, "braintrust.input_json")) + input = parse_json_value(span, "braintrust.input_json") + result["input"] = transform_attachments ? transform_input(input) : input result["output"] = parse_json_value(span, "braintrust.output_json") span_attrs = parse_json_map(span, "braintrust.span_attributes") || {} diff --git a/test/btx/spec_executor.rb b/test/btx/spec_executor.rb index a9bfe427..4df11940 100644 --- a/test/btx/spec_executor.rb +++ b/test/btx/spec_executor.rb @@ -49,7 +49,9 @@ def execute(spec) tracer_provider = OpenTelemetry::SDK::Trace::TracerProvider.new simple_processor = OpenTelemetry::SDK::Trace::Export::SimpleSpanProcessor.new(exporter) - bt_processor = Braintrust::Trace::SpanProcessor.new(simple_processor, @state) + bt_processor = Braintrust::Trace::SpanProcessor.new( + simple_processor, @state, attachment_processor: attachment_processor + ) tracer_provider.add_span_processor(bt_processor) # Live mode: also ship spans to the Braintrust backend via OTLP so they @@ -83,6 +85,21 @@ def execute(spec) private + # In replay/record mode we exercise the SDK's attachment processor so BTX + # also validates attachment-conversion correctness against the same specs + # that define the backend behavior. A NoopUploader performs the reference + # rewriting without making real upload HTTP calls. In live mode we leave it + # nil and let the Braintrust backend perform the conversion (authoritative). + def attachment_processor + return nil if @live + + require "braintrust/trace/attachment_processor/processor" + require "braintrust/trace/attachment_processor/uploader" + Braintrust::Trace::AttachmentProcessor::Processor.new( + uploader: Braintrust::Trace::AttachmentProcessor::NoopUploader.new + ) + end + def instrument!(provider) case provider when "openai"