diff --git a/src/typeagent/aitools/vectorbase.py b/src/typeagent/aitools/vectorbase.py index e22083c..805ae77 100644 --- a/src/typeagent/aitools/vectorbase.py +++ b/src/typeagent/aitools/vectorbase.py @@ -13,6 +13,25 @@ ) from .model_adapters import create_embedding_model +DEFAULT_MIN_SCORE = 0.25 + +# Empirical defaults for built-in OpenAI embedding models. +# These values come from repeated runs of the Adrian Tchaikovsky Episode 53 +# search benchmark in `tools/benchmark_embeddings.py`, with raw outputs stored +# under `benchmark_results/`. +# They reflect that narrow retrieval benchmark only. Separate end-to-end evals +# have performed better with a stricter 0.7 cutoff in the message-text query +# path, so these values are not an answer-quality recommendation. +MODEL_DEFAULT_MIN_SCORES: dict[str, float] = { + "text-embedding-3-large": 0.25, + "text-embedding-3-small": 0.25, + "text-embedding-ada-002": 0.25, +} + + +def get_default_min_score(model_name: str) -> float: + return MODEL_DEFAULT_MIN_SCORES.get(model_name, DEFAULT_MIN_SCORE) + @dataclass class ScoredInt: @@ -34,10 +53,12 @@ def __init__( max_matches: int | None = None, batch_size: int | None = None, ): - self.min_score = min_score if min_score is not None else 0.85 - self.max_matches = max_matches if max_matches and max_matches >= 1 else None - self.batch_size = batch_size if batch_size and batch_size >= 1 else 8 self.embedding_model = embedding_model or create_embedding_model() + model_name = getattr(self.embedding_model, "model_name", "") + default_min_score = get_default_min_score(model_name) + self.min_score = min_score if min_score is not None else default_min_score + self.max_matches = max_matches # None means no limit + self.batch_size = batch_size if batch_size and batch_size >= 1 else 8 class VectorBase: diff --git a/src/typeagent/knowpro/convsettings.py b/src/typeagent/knowpro/convsettings.py index 9dbf121..f7e3aaa 100644 --- a/src/typeagent/knowpro/convsettings.py +++ b/src/typeagent/knowpro/convsettings.py @@ -10,6 +10,9 @@ from ..aitools.vectorbase import TextEmbeddingIndexSettings from .interfaces import IKnowledgeExtractor, IStorageProvider +DEFAULT_RELATED_TERM_MIN_SCORE = 0.85 +DEFAULT_MESSAGE_TEXT_MIN_SCORE = 0.7 + @dataclass class MessageTextIndexSettings: @@ -45,13 +48,16 @@ def __init__( # All settings share the same model, so they share the embedding cache. model = model or create_embedding_model() self.embedding_model = model - min_score = 0.85 + min_score = DEFAULT_RELATED_TERM_MIN_SCORE self.related_term_index_settings = RelatedTermIndexSettings( TextEmbeddingIndexSettings(model, min_score=min_score, max_matches=50) ) self.thread_settings = TextEmbeddingIndexSettings(model, min_score=min_score) self.message_text_index_settings = MessageTextIndexSettings( - TextEmbeddingIndexSettings(model, min_score=0.7) + # True end-to-end evals have performed better with 0.7 here than + # with the generic low-level VectorBase default from the narrow + # retrieval benchmark. + TextEmbeddingIndexSettings(model, min_score=DEFAULT_MESSAGE_TEXT_MIN_SCORE) ) self.semantic_ref_index_settings = SemanticRefIndexSettings( batch_size=4, # Effectively max concurrency diff --git a/tests/test_benchmark_embeddings.py b/tests/test_benchmark_embeddings.py new file mode 100644 index 0000000..9416292 --- /dev/null +++ b/tests/test_benchmark_embeddings.py @@ -0,0 +1,85 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import importlib.util +from pathlib import Path + +import pytest + +from typeagent.aitools.model_adapters import create_test_embedding_model + +MODULE_PATH = ( + Path(__file__).resolve().parent.parent / "tools" / "benchmark_embeddings.py" +) +SPEC = importlib.util.spec_from_file_location("benchmark_embeddings", MODULE_PATH) +assert SPEC is not None +assert SPEC.loader is not None +benchmark_embeddings = importlib.util.module_from_spec(SPEC) +SPEC.loader.exec_module(benchmark_embeddings) + +AnswerBenchmarkRow = benchmark_embeddings.AnswerBenchmarkRow +AnswerMetrics = benchmark_embeddings.AnswerMetrics +parse_float_list = benchmark_embeddings.parse_float_list +parse_int_list = benchmark_embeddings.parse_int_list +score_answer_pair = benchmark_embeddings.score_answer_pair +select_best_answer_row = benchmark_embeddings.select_best_answer_row + + +def test_parse_float_list_default_and_custom() -> None: + assert parse_float_list(None) + assert parse_float_list("0.25, 0.7") == [0.25, 0.7] + + +def test_parse_int_list_validates_positive_values() -> None: + assert parse_int_list("5,10") == [5, 10] + + +@pytest.mark.asyncio +async def test_score_answer_pair_exact_match() -> None: + model = create_test_embedding_model() + score = await score_answer_pair(model, ("Python", True), ("Python", True)) + assert score == 1.0 + + +@pytest.mark.asyncio +async def test_score_answer_pair_expected_answer_missing() -> None: + model = create_test_embedding_model() + score = await score_answer_pair(model, ("Python", True), ("No answer", False)) + assert score == 0.0 + + +@pytest.mark.asyncio +async def test_score_answer_pair_expected_no_answer_match() -> None: + model = create_test_embedding_model() + score = await score_answer_pair( + model, + ("No relevant info", False), + ("Still none", False), + ) + assert score == 1.001 + + +def test_select_best_answer_row_prefers_true_eval_metrics() -> None: + weaker = AnswerBenchmarkRow( + min_score=0.25, + max_hits=20, + metrics=AnswerMetrics( + mean_score=0.82, + exact_or_near_rate=60.0, + zero_score_rate=12.0, + zero_score_count=6, + ), + ) + stronger = AnswerBenchmarkRow( + min_score=0.7, + max_hits=10, + metrics=AnswerMetrics( + mean_score=0.91, + exact_or_near_rate=75.0, + zero_score_rate=4.0, + zero_score_count=2, + ), + ) + + best = select_best_answer_row([weaker, stronger]) + assert best is stronger diff --git a/tests/test_convsettings.py b/tests/test_convsettings.py new file mode 100644 index 0000000..4b83505 --- /dev/null +++ b/tests/test_convsettings.py @@ -0,0 +1,21 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from typeagent.aitools.model_adapters import create_test_embedding_model +from typeagent.knowpro.convsettings import ( + ConversationSettings, + DEFAULT_MESSAGE_TEXT_MIN_SCORE, + DEFAULT_RELATED_TERM_MIN_SCORE, +) + + +def test_conversation_settings_use_stricter_message_text_cutoff() -> None: + settings = ConversationSettings(model=create_test_embedding_model()) + + assert settings.related_term_index_settings.embedding_index_settings.min_score == ( + DEFAULT_RELATED_TERM_MIN_SCORE + ) + assert settings.thread_settings.min_score == DEFAULT_RELATED_TERM_MIN_SCORE + assert settings.message_text_index_settings.embedding_index_settings.min_score == ( + DEFAULT_MESSAGE_TEXT_MIN_SCORE + ) diff --git a/tools/benchmark_embeddings.py b/tools/benchmark_embeddings.py new file mode 100644 index 0000000..66dcf4a --- /dev/null +++ b/tools/benchmark_embeddings.py @@ -0,0 +1,633 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Benchmark embedding settings on retrieval-only or true end-to-end evals. + +This script evaluates combinations of `min_score` and `max_hits` for the +Episode 53 dataset in `tests/testdata/`. + +Two benchmark modes are supported: +- `answer` (default): run the full slow eval path used by `make eval` +- `retrieval`: run the narrower `messageMatches` retrieval benchmark + +The answer mode is the one to use when choosing settings for better final +answers. The retrieval mode is still useful for quick diagnostics, but it does +not prove that a row is best for end-to-end answer quality. + +Usage: + uv run python tools/benchmark_embeddings.py + uv run python tools/benchmark_embeddings.py --mode retrieval + uv run python tools/benchmark_embeddings.py --model openai:text-embedding-3-small +""" + +import argparse +import asyncio +from dataclasses import dataclass, replace +import json +from pathlib import Path +from statistics import mean +import time +from typing import Literal + +from dotenv import load_dotenv + +import typechat + +from typeagent.aitools import model_adapters, utils +from typeagent.aitools.embeddings import IEmbeddingModel, NormalizedEmbeddings +from typeagent.aitools.model_adapters import create_embedding_model +from typeagent.aitools.vectorbase import TextEmbeddingIndexSettings, VectorBase +from typeagent.knowpro import ( + answer_response_schema, + answers, + search_query_schema, + searchlang, + secindex, +) +from typeagent.knowpro.convsettings import ConversationSettings +from typeagent.podcasts.podcast import Podcast +from typeagent.storage.memory.convthreads import ConversationThreads +from typeagent.storage.utils import create_storage_provider + +DEFAULT_MIN_SCORES = [0.25, 0.30, 0.35, 0.40, 0.50, 0.60, 0.70, 0.75, 0.80, 0.85] +DEFAULT_MAX_HITS = [5, 10, 15, 20] +DATA_DIR = Path("tests") / "testdata" +INDEX_DATA_PATH = DATA_DIR / "Episode_53_AdrianTchaikovsky_index_data.json" +INDEX_PREFIX_PATH = DATA_DIR / "Episode_53_AdrianTchaikovsky_index" +SEARCH_RESULTS_PATH = DATA_DIR / "Episode_53_Search_results.json" +ANSWER_RESULTS_PATH = DATA_DIR / "Episode_53_Answer_results.json" +DEFAULT_SEARCH_OPTIONS = searchlang.LanguageSearchOptions( + compile_options=searchlang.LanguageQueryCompileOptions( + exact_scope=False, + verb_scope=True, + term_filter=None, + apply_scope=True, + ), + exact_match=False, + max_message_matches=25, +) +DEFAULT_ANSWER_OPTIONS = answers.AnswerContextOptions( + entities_top_k=50, + topics_top_k=50, + messages_top_k=None, + chunking=None, +) +type BenchmarkMode = Literal["answer", "retrieval"] + + +@dataclass +class SearchQueryCase: + query: str + expected_matches: list[int] + + +@dataclass +class AnswerQueryCase: + question: str + expected_answer: str + expected_success: bool + + +@dataclass +class SearchMetrics: + hit_rate: float + mean_reciprocal_rank: float + + +@dataclass +class AnswerMetrics: + mean_score: float + exact_or_near_rate: float + zero_score_rate: float + zero_score_count: int + + +@dataclass +class RetrievalBenchmarkRow: + min_score: float + max_hits: int + metrics: SearchMetrics + + +@dataclass +class AnswerBenchmarkRow: + min_score: float + max_hits: int + metrics: AnswerMetrics + + +@dataclass +class TrueEvalContext: + conversation: Podcast + embedding_model: IEmbeddingModel + query_translator: typechat.TypeChatJsonTranslator[search_query_schema.SearchQuery] + answer_translator: typechat.TypeChatJsonTranslator[ + answer_response_schema.AnswerResponse + ] + settings: ConversationSettings + + +def parse_float_list(raw: str | None) -> list[float]: + if raw is None: + return DEFAULT_MIN_SCORES + values = [float(item.strip()) for item in raw.split(",") if item.strip()] + if not values: + raise ValueError("--min-scores must contain at least one value") + return values + + +def parse_int_list(raw: str | None) -> list[int]: + if raw is None: + return DEFAULT_MAX_HITS + values = [int(item.strip()) for item in raw.split(",") if item.strip()] + if not values: + raise ValueError("--max-hits must contain at least one value") + if any(value <= 0 for value in values): + raise ValueError("--max-hits values must be positive integers") + return values + + +def load_message_texts(repo_root: Path) -> list[str]: + index_data = json.loads((repo_root / INDEX_DATA_PATH).read_text(encoding="utf-8")) + messages = index_data["messages"] + return [" ".join(message.get("textChunks", [])) for message in messages] + + +def load_search_queries(repo_root: Path) -> list[SearchQueryCase]: + search_data = json.loads( + (repo_root / SEARCH_RESULTS_PATH).read_text(encoding="utf-8") + ) + cases: list[SearchQueryCase] = [] + for item in search_data: + search_text = item.get("searchText") + results = item.get("results", []) + if not search_text or not results: + continue + expected_matches = results[0].get("messageMatches", []) + if not expected_matches: + continue + cases.append(SearchQueryCase(search_text, expected_matches)) + return cases + + +def load_answer_queries(repo_root: Path) -> list[AnswerQueryCase]: + answer_data = json.loads( + (repo_root / ANSWER_RESULTS_PATH).read_text(encoding="utf-8") + ) + cases: list[AnswerQueryCase] = [] + for item in answer_data: + question = item.get("question") + answer = item.get("answer") + has_no_answer = item.get("hasNoAnswer") + if question is None or answer is None or has_no_answer is None: + continue + cases.append( + AnswerQueryCase( + question=question, + expected_answer=answer, + expected_success=not has_no_answer, + ) + ) + return cases + + +async def build_vector_base( + model_spec: str | None, + message_texts: list[str], + batch_size: int, +) -> tuple[IEmbeddingModel, VectorBase]: + model = create_embedding_model(model_spec) + settings = TextEmbeddingIndexSettings( + embedding_model=model, + min_score=0.0, + max_matches=None, + batch_size=batch_size, + ) + vector_base = VectorBase(settings) + + for start in range(0, len(message_texts), batch_size): + batch = message_texts[start : start + batch_size] + await vector_base.add_keys(batch) + + return model, vector_base + + +def evaluate_search_queries( + vector_base: VectorBase, + query_cases: list[SearchQueryCase], + query_embeddings: NormalizedEmbeddings, + min_score: float, + max_hits: int, +) -> SearchMetrics: + hit_count = 0 + reciprocal_ranks: list[float] = [] + + for case, query_embedding in zip(query_cases, query_embeddings): + scored_results = vector_base.fuzzy_lookup_embedding( + query_embedding, + max_hits=max_hits, + min_score=min_score, + ) + rank = 0 + for result_index, scored_result in enumerate(scored_results, start=1): + if scored_result.item in case.expected_matches: + rank = result_index + break + if rank > 0: + hit_count += 1 + reciprocal_ranks.append(1.0 / rank) + else: + reciprocal_ranks.append(0.0) + + return SearchMetrics( + hit_rate=(hit_count / len(query_cases)) * 100, + mean_reciprocal_rank=mean(reciprocal_ranks), + ) + + +async def create_true_eval_context( + repo_root: Path, + model_spec: str | None, +) -> TrueEvalContext: + embedding_model = create_embedding_model(model_spec) + settings = ConversationSettings(model=embedding_model) + settings.storage_provider = await create_storage_provider( + settings.message_text_index_settings, + settings.related_term_index_settings, + message_type=None, + ) + + raw_data = Podcast._read_conversation_data_from_file( + str(repo_root / INDEX_PREFIX_PATH) + ) + raw_data.pop("messageIndexData", None) + raw_data.pop("relatedTermsIndexData", None) + + conversation = await Podcast.create(settings) + await conversation.deserialize(raw_data) + await secindex.build_secondary_indexes(conversation, settings) + + threads = ( + conversation.secondary_indexes.threads + if conversation.secondary_indexes is not None + else None + ) + if isinstance(threads, ConversationThreads) and threads.threads: + await threads.build_index() + + chat_model = model_adapters.create_chat_model() + query_translator = utils.create_translator( + chat_model, search_query_schema.SearchQuery + ) + answer_translator = utils.create_translator( + chat_model, + answer_response_schema.AnswerResponse, + ) + + return TrueEvalContext( + conversation=conversation, + embedding_model=embedding_model, + query_translator=query_translator, + answer_translator=answer_translator, + settings=settings, + ) + + +def answer_response_to_eval_tuple( + response: answer_response_schema.AnswerResponse, +) -> tuple[str, bool]: + match response.type: + case "Answered": + return response.answer or "", True + case "NoAnswer": + return response.why_no_answer or "", False + case _: + raise ValueError(f"Unexpected answer type: {response.type}") + + +async def score_answer_pair( + embedding_model: IEmbeddingModel, + expected: tuple[str, bool], + actual: tuple[str, bool], +) -> float: + expected_text, expected_success = expected + actual_text, actual_success = actual + + if expected_success != actual_success: + return 0.000 if expected_success else 0.001 + if not actual_success: + return 1.001 + if expected_text == actual_text: + return 1.000 + if expected_text.lower() == actual_text.lower(): + return 0.999 + + embeddings = await embedding_model.get_embeddings([expected_text, actual_text]) + assert embeddings.shape[0] == 2, "Expected two embeddings" + return float(embeddings[0] @ embeddings[1]) + + +async def evaluate_answer_queries( + context: TrueEvalContext, + query_cases: list[AnswerQueryCase], + min_score: float, + max_hits: int, +) -> AnswerMetrics: + context.settings.message_text_index_settings.embedding_index_settings.min_score = ( + min_score + ) + search_options = replace(DEFAULT_SEARCH_OPTIONS, max_message_matches=max_hits) + + scores: list[float] = [] + total = len(query_cases) + started_at = time.perf_counter() + for index, case in enumerate(query_cases, start=1): + if index == 1 or index % 5 == 0 or index == total: + elapsed = time.perf_counter() - started_at + print( + f" Question {index}/{total} " + f"(elapsed {elapsed:.1f}s): {case.question}", + flush=True, + ) + result = await searchlang.search_conversation_with_language( + context.conversation, + context.query_translator, + case.question, + search_options, + ) + if isinstance(result, typechat.Failure): + actual = (f"Search failed: {result.message}", False) + else: + _, combined_answer = await answers.generate_answers( + context.answer_translator, + result.value, + context.conversation, + case.question, + options=DEFAULT_ANSWER_OPTIONS, + ) + actual = answer_response_to_eval_tuple(combined_answer) + + expected = (case.expected_answer, case.expected_success) + scores.append( + await score_answer_pair(context.embedding_model, expected, actual) + ) + + zero_score_count = sum(1 for score in scores if score <= 0.0) + exact_or_near_count = sum(1 for score in scores if score >= 0.97) + + return AnswerMetrics( + mean_score=mean(scores), + exact_or_near_rate=(exact_or_near_count / len(scores)) * 100, + zero_score_rate=(zero_score_count / len(scores)) * 100, + zero_score_count=zero_score_count, + ) + + +def select_best_retrieval_row( + rows: list[RetrievalBenchmarkRow], +) -> RetrievalBenchmarkRow: + return max( + rows, + key=lambda row: ( + row.metrics.mean_reciprocal_rank, + row.metrics.hit_rate, + -row.min_score, + -row.max_hits, + ), + ) + + +def select_best_answer_row(rows: list[AnswerBenchmarkRow]) -> AnswerBenchmarkRow: + return max( + rows, + key=lambda row: ( + row.metrics.mean_score, + -row.metrics.zero_score_count, + row.metrics.exact_or_near_rate, + -row.min_score, + -row.max_hits, + ), + ) + + +def print_retrieval_rows(rows: list[RetrievalBenchmarkRow]) -> None: + print("=" * 72) + print("RETRIEVAL BENCHMARK (Episode 53 messageMatches ground truth)") + print("=" * 72) + print(f"{'Min Score':<12} | {'Max Hits':<10} | {'Hit Rate (%)':<15} | {'MRR':<10}") + print("-" * 65) + for row in rows: + print( + f"{row.min_score:<12.2f} | {row.max_hits:<10d} | " + f"{row.metrics.hit_rate:<15.2f} | " + f"{row.metrics.mean_reciprocal_rank:<10.4f}" + ) + print("-" * 65) + + +def print_answer_rows(rows: list[AnswerBenchmarkRow]) -> None: + print("=" * 94) + print("TRUE EVAL BENCHMARK (Episode 53 full answer pipeline)") + print("=" * 94) + print( + f"{'Min Score':<12} | {'Max Hits':<10} | {'Mean Score':<12} | " + f"{'Exact/Near (%)':<15} | {'Zero Scores':<12} | {'Zero Rate (%)':<14}" + ) + print("-" * 94) + for row in rows: + print( + f"{row.min_score:<12.2f} | {row.max_hits:<10d} | " + f"{row.metrics.mean_score:<12.4f} | " + f"{row.metrics.exact_or_near_rate:<15.2f} | " + f"{row.metrics.zero_score_count:<12d} | " + f"{row.metrics.zero_score_rate:<14.2f}" + ) + print("-" * 94) + + +async def run_retrieval_benchmark( + repo_root: Path, + model_spec: str | None, + min_scores: list[float], + max_hits_values: list[int], + batch_size: int, +) -> None: + message_texts = load_message_texts(repo_root) + query_cases = load_search_queries(repo_root) + if not query_cases: + raise ValueError("No search queries with messageMatches found in the dataset") + model, vector_base = await build_vector_base(model_spec, message_texts, batch_size) + query_embeddings = await model.get_embeddings([case.query for case in query_cases]) + + rows: list[RetrievalBenchmarkRow] = [] + for min_score in min_scores: + for max_hits in max_hits_values: + metrics = evaluate_search_queries( + vector_base, + query_cases, + query_embeddings, + min_score, + max_hits, + ) + rows.append(RetrievalBenchmarkRow(min_score, max_hits, metrics)) + + print(f"Mode: retrieval") + print(f"Model: {model.model_name}") + print(f"Messages indexed: {len(message_texts)}") + print(f"Queries evaluated: {len(query_cases)}") + print() + print_retrieval_rows(rows) + + best_row = select_best_retrieval_row(rows) + print() + print("Best-scoring retrieval row:") + print(f" min_score={best_row.min_score:.2f}") + print(f" max_hits={best_row.max_hits}") + print(f" hit_rate={best_row.metrics.hit_rate:.2f}%") + print(f" mrr={best_row.metrics.mean_reciprocal_rank:.4f}") + + +async def run_answer_benchmark( + repo_root: Path, + model_spec: str | None, + min_scores: list[float], + max_hits_values: list[int], + limit: int, +) -> None: + query_cases = load_answer_queries(repo_root) + if not query_cases: + raise ValueError("No answer eval cases found in the dataset") + if limit > 0: + query_cases = query_cases[:limit] + + context = await create_true_eval_context(repo_root, model_spec) + + rows: list[AnswerBenchmarkRow] = [] + for min_score in min_scores: + for max_hits in max_hits_values: + row_started_at = time.perf_counter() + print( + f"Evaluating min_score={min_score:.2f}, max_hits={max_hits}...", + flush=True, + ) + metrics = await evaluate_answer_queries( + context, + query_cases, + min_score, + max_hits, + ) + rows.append(AnswerBenchmarkRow(min_score, max_hits, metrics)) + row_elapsed = time.perf_counter() - row_started_at + print( + " Completed row: " + f"mean_score={metrics.mean_score:.4f}, " + f"zero_scores={metrics.zero_score_count}, " + f"exact_or_near_rate={metrics.exact_or_near_rate:.2f}% " + f"in {row_elapsed:.1f}s", + flush=True, + ) + + print() + print(f"Mode: answer") + print(f"Model: {context.embedding_model.model_name}") + print(f"Queries evaluated: {len(query_cases)}") + print() + print_answer_rows(rows) + + best_row = select_best_answer_row(rows) + print() + print("Best-scoring true-eval row:") + print(f" min_score={best_row.min_score:.2f}") + print(f" max_hits={best_row.max_hits}") + print(f" mean_score={best_row.metrics.mean_score:.4f}") + print(f" exact_or_near_rate={best_row.metrics.exact_or_near_rate:.2f}%") + print(f" zero_score_count={best_row.metrics.zero_score_count}") + print(f" zero_score_rate={best_row.metrics.zero_score_rate:.2f}%") + + +async def run_benchmark( + mode: BenchmarkMode, + model_spec: str | None, + min_scores: list[float], + max_hits_values: list[int], + batch_size: int, + limit: int, +) -> None: + load_dotenv() + repo_root = Path(__file__).resolve().parent.parent + + if mode == "retrieval": + await run_retrieval_benchmark( + repo_root, + model_spec, + min_scores, + max_hits_values, + batch_size, + ) + else: + await run_answer_benchmark( + repo_root, + model_spec, + min_scores, + max_hits_values, + limit, + ) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Benchmark retrieval settings for an embedding model." + ) + parser.add_argument( + "--mode", + type=str, + choices=["answer", "retrieval"], + default="answer", + help="Use 'answer' for the slow true eval path or 'retrieval' for the narrow messageMatches benchmark.", + ) + parser.add_argument( + "--model", + type=str, + default=None, + help="Provider and model name, e.g. 'openai:text-embedding-3-small'", + ) + parser.add_argument( + "--min-scores", + type=str, + default=None, + help="Comma-separated min_score values to test.", + ) + parser.add_argument( + "--max-hits", + type=str, + default=None, + help="Comma-separated max_hits values to test.", + ) + parser.add_argument( + "--batch-size", + type=int, + default=16, + help="Batch size used when building the retrieval-only benchmark index.", + ) + parser.add_argument( + "--limit", + type=int, + default=0, + help="Number of true-eval questions to run (default: all). Ignored in retrieval mode.", + ) + args = parser.parse_args() + + asyncio.run( + run_benchmark( + mode=args.mode, + model_spec=args.model, + min_scores=parse_float_list(args.min_scores), + max_hits_values=parse_int_list(args.max_hits), + batch_size=args.batch_size, + limit=args.limit, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/tools/repeat_embedding_benchmarks.py b/tools/repeat_embedding_benchmarks.py new file mode 100644 index 0000000..48ca8d1 --- /dev/null +++ b/tools/repeat_embedding_benchmarks.py @@ -0,0 +1,322 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Run embedding benchmarks repeatedly and save raw/summary JSON results. + +This script runs `tools/benchmark_embeddings.py` logic multiple times for each +embedding model, stores every run as JSON, and writes aggregate summaries that +can be used to justify tuned defaults. + +Usage: + uv run python tools/repeat_embedding_benchmarks.py + uv run python tools/repeat_embedding_benchmarks.py --runs 30 + uv run python tools/repeat_embedding_benchmarks.py --models openai:text-embedding-3-small,openai:text-embedding-3-large,openai:text-embedding-ada-002 +""" + +import argparse +import asyncio +from dataclasses import asdict, dataclass +from datetime import datetime, UTC +import json +from pathlib import Path +from statistics import mean + +from dotenv import load_dotenv + +from benchmark_embeddings import ( + build_vector_base, + DEFAULT_MAX_HITS, + DEFAULT_MIN_SCORES, + evaluate_search_queries, + load_message_texts, + load_search_queries, + parse_float_list, + parse_int_list, + RetrievalBenchmarkRow, + select_best_retrieval_row, +) + +DEFAULT_MODELS = [ + "openai:text-embedding-3-small", + "openai:text-embedding-3-large", + "openai:text-embedding-ada-002", +] +DEFAULT_OUTPUT_DIR = Path("benchmark_results") + + +@dataclass +class RunRow: + min_score: float + max_hits: int + hit_rate: float + mean_reciprocal_rank: float + + +@dataclass +class RunResult: + run_index: int + model_spec: str + resolved_model_name: str + message_count: int + query_count: int + rows: list[RunRow] + best_row: RunRow + + +def sanitize_model_name(model_spec: str) -> str: + return model_spec.replace(":", "__").replace("/", "_").replace("\\", "_") + + +def benchmark_row_to_run_row(row: RetrievalBenchmarkRow) -> RunRow: + return RunRow( + min_score=row.min_score, + max_hits=row.max_hits, + hit_rate=row.metrics.hit_rate, + mean_reciprocal_rank=row.metrics.mean_reciprocal_rank, + ) + + +def summarize_runs(model_spec: str, runs: list[RunResult]) -> dict[str, object]: + summary_rows: dict[tuple[float, int], list[RunRow]] = {} + for run in runs: + for row in run.rows: + summary_rows.setdefault((row.min_score, row.max_hits), []).append(row) + + averaged_rows: list[dict[str, float | int]] = [] + for (min_score, max_hits), rows in sorted(summary_rows.items()): + averaged_rows.append( + { + "min_score": min_score, + "max_hits": max_hits, + "mean_hit_rate": mean(row.hit_rate for row in rows), + "mean_mrr": mean(row.mean_reciprocal_rank for row in rows), + } + ) + + best_rows = [run.best_row for run in runs] + best_min_score_counts: dict[str, int] = {} + best_max_hits_counts: dict[str, int] = {} + for row in best_rows: + best_min_score_counts[f"{row.min_score:.2f}"] = ( + best_min_score_counts.get(f"{row.min_score:.2f}", 0) + 1 + ) + best_max_hits_counts[str(row.max_hits)] = ( + best_max_hits_counts.get(str(row.max_hits), 0) + 1 + ) + + averaged_best_row = max( + averaged_rows, + key=lambda row: ( + float(row["mean_mrr"]), + float(row["mean_hit_rate"]), + -float(row["min_score"]), + -int(row["max_hits"]), + ), + ) + + return { + "model_spec": model_spec, + "resolved_model_name": runs[0].resolved_model_name, + "run_count": len(runs), + "message_count": runs[0].message_count, + "query_count": runs[0].query_count, + "candidate_rows": averaged_rows, + "recommended_row": averaged_best_row, + "best_min_score_counts": best_min_score_counts, + "best_max_hits_counts": best_max_hits_counts, + } + + +def write_json(path: Path, data: object) -> None: + path.write_text(json.dumps(data, indent=2), encoding="utf-8") + + +def write_markdown_summary(path: Path, summaries: list[dict[str, object]]) -> None: + lines = [ + "# Repeated Embedding Benchmark Summary", + "", + "| Model | Runs | Recommended min_score | Recommended max_hits | Mean hit rate | Mean MRR |", + "| --- | ---: | ---: | ---: | ---: | ---: |", + ] + for summary in summaries: + recommended_row = summary["recommended_row"] + assert isinstance(recommended_row, dict) + lines.append( + "| " + f"{summary['resolved_model_name']} | " + f"{summary['run_count']} | " + f"{recommended_row['min_score']:.2f} | " + f"{recommended_row['max_hits']} | " + f"{recommended_row['mean_hit_rate']:.2f} | " + f"{recommended_row['mean_mrr']:.4f} |" + ) + lines.append("") + path.write_text("\n".join(lines), encoding="utf-8") + + +async def run_single_model_benchmark( + model_spec: str, + runs: int, + min_scores: list[float], + max_hits_values: list[int], + batch_size: int, + output_dir: Path, +) -> dict[str, object]: + repo_root = Path(__file__).resolve().parent.parent + message_texts = load_message_texts(repo_root) + query_cases = load_search_queries(repo_root) + model_output_dir = output_dir / sanitize_model_name(model_spec) + model_output_dir.mkdir(parents=True, exist_ok=True) + + run_results: list[RunResult] = [] + for run_index in range(1, runs + 1): + model, vector_base = await build_vector_base( + model_spec, message_texts, batch_size + ) + query_embeddings = await model.get_embeddings( + [case.query for case in query_cases] + ) + benchmark_rows: list[RetrievalBenchmarkRow] = [] + for min_score in min_scores: + for max_hits in max_hits_values: + metrics = evaluate_search_queries( + vector_base, + query_cases, + query_embeddings, + min_score, + max_hits, + ) + benchmark_rows.append( + RetrievalBenchmarkRow(min_score, max_hits, metrics) + ) + + best_row = select_best_retrieval_row(benchmark_rows) + run_result = RunResult( + run_index=run_index, + model_spec=model_spec, + resolved_model_name=model.model_name, + message_count=len(message_texts), + query_count=len(query_cases), + rows=[benchmark_row_to_run_row(row) for row in benchmark_rows], + best_row=benchmark_row_to_run_row(best_row), + ) + run_results.append(run_result) + write_json(model_output_dir / f"run_{run_index:02d}.json", asdict(run_result)) + + summary = summarize_runs(model_spec, run_results) + write_json(model_output_dir / "summary.json", summary) + return summary + + +async def run_repeated_benchmarks( + models: list[str], + runs: int, + min_scores: list[float], + max_hits_values: list[int], + batch_size: int, + output_root: Path, +) -> Path: + timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") + output_dir = output_root / timestamp + output_dir.mkdir(parents=True, exist_ok=True) + + metadata = { + "created_at_utc": timestamp, + "runs_per_model": runs, + "models": models, + "min_scores": min_scores, + "max_hits_values": max_hits_values, + "batch_size": batch_size, + } + write_json(output_dir / "metadata.json", metadata) + + summaries: list[dict[str, object]] = [] + for model_spec in models: + print(f"Running {runs} benchmark iterations for {model_spec}...") + summary = await run_single_model_benchmark( + model_spec=model_spec, + runs=runs, + min_scores=min_scores, + max_hits_values=max_hits_values, + batch_size=batch_size, + output_dir=output_dir, + ) + summaries.append(summary) + + write_json(output_dir / "summary.json", summaries) + write_markdown_summary(output_dir / "summary.md", summaries) + return output_dir + + +def parse_models(raw: str | None) -> list[str]: + if raw is None: + return DEFAULT_MODELS + models = [item.strip() for item in raw.split(",") if item.strip()] + if not models: + raise ValueError("--models must contain at least one model") + return models + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Run embedding benchmarks repeatedly and save JSON results." + ) + parser.add_argument( + "--models", + type=str, + default=None, + help="Comma-separated model specs to benchmark.", + ) + parser.add_argument( + "--runs", + type=int, + default=30, + help="Number of repeated runs per model.", + ) + parser.add_argument( + "--min-scores", + type=str, + default=",".join(f"{score:.2f}" for score in DEFAULT_MIN_SCORES), + help="Comma-separated min_score values to test.", + ) + parser.add_argument( + "--max-hits", + type=str, + default=",".join(str(value) for value in DEFAULT_MAX_HITS), + help="Comma-separated max_hits values to test.", + ) + parser.add_argument( + "--batch-size", + type=int, + default=16, + help="Batch size used when building the index.", + ) + parser.add_argument( + "--output-dir", + type=str, + default=str(DEFAULT_OUTPUT_DIR), + help="Directory where benchmark results will be written.", + ) + args = parser.parse_args() + + if args.runs <= 0: + raise ValueError("--runs must be a positive integer") + if args.batch_size <= 0: + raise ValueError("--batch-size must be a positive integer") + + load_dotenv() + output_dir = asyncio.run( + run_repeated_benchmarks( + models=parse_models(args.models), + runs=args.runs, + min_scores=parse_float_list(args.min_scores), + max_hits_values=parse_int_list(args.max_hits), + batch_size=args.batch_size, + output_root=Path(args.output_dir), + ) + ) + print(f"Wrote benchmark results to {output_dir}") + + +if __name__ == "__main__": + main()