diff --git a/contributing/samples/gepa/experiment.py b/contributing/samples/gepa/experiment.py index f3751206a8..2710c3894c 100644 --- a/contributing/samples/gepa/experiment.py +++ b/contributing/samples/gepa/experiment.py @@ -43,7 +43,6 @@ from tau_bench.types import EnvRunResult from tau_bench.types import RunConfig import tau_bench_agent as tau_bench_agent_lib - import utils diff --git a/contributing/samples/gepa/run_experiment.py b/contributing/samples/gepa/run_experiment.py index d857da9635..e31db15788 100644 --- a/contributing/samples/gepa/run_experiment.py +++ b/contributing/samples/gepa/run_experiment.py @@ -25,7 +25,6 @@ from absl import flags import experiment from google.genai import types - import utils _OUTPUT_DIR = flags.DEFINE_string( diff --git a/demo_parallel_performance.py b/demo_parallel_performance.py new file mode 100644 index 0000000000..8ae41c6d5d --- /dev/null +++ b/demo_parallel_performance.py @@ -0,0 +1,125 @@ +""" +Performance demonstration for parallel LLM-as-judge evaluation. + +This script demonstrates the performance improvement from parallelizing +LLM evaluation calls using asyncio.gather(). +""" + +from __future__ import annotations + +import asyncio +from collections import defaultdict +import time +from typing import Optional + +from google.genai import types as genai_types + + +# Simulated LLM call with artificial delay +async def mock_llm_call(delay: float = 0.5): + """Simulates an LLM API call with specified delay.""" + await asyncio.sleep(delay) + return genai_types.Content( + parts=[genai_types.Part(text="Mock LLM response")], + role="model", + ) + + +async def serial_evaluation( + num_invocations: int, num_samples: int, delay: float +): + """Simulates the OLD serial evaluation approach.""" + results = [] + for i in range(num_invocations): + invocation_samples = [] + for j in range(num_samples): + response = await mock_llm_call(delay) + invocation_samples.append(response) + results.append(invocation_samples) + return results + + +async def parallel_evaluation( + num_invocations: int, num_samples: int, delay: float +): + """Simulates the NEW parallel evaluation approach.""" + tasks = [] + invocation_indices = [] + + # Create all N×M tasks + for i in range(num_invocations): + for j in range(num_samples): + tasks.append(mock_llm_call(delay)) + invocation_indices.append(i) + + # Execute in parallel + all_results = await asyncio.gather(*tasks) + + # Group by invocation + results_by_invocation = defaultdict(list) + for idx, result in zip(invocation_indices, all_results): + results_by_invocation[idx].append(result) + + return [ + results_by_invocation[i] for i in sorted(results_by_invocation.keys()) + ] + + +async def main(): + """Run performance comparison.""" + num_invocations = 5 + num_samples = 2 + delay = 0.5 # 500ms per call + + print("=" * 60) + print("LLM-as-Judge Parallel Evaluation Performance Test") + print("=" * 60) + print(f"Configuration:") + print(f" - Invocations: {num_invocations}") + print(f" - Samples per invocation: {num_samples}") + print(f" - Total LLM calls: {num_invocations * num_samples}") + print(f" - Simulated delay per call: {delay}s") + print() + + # Test serial approach + print("Testing SERIAL approach (old)...") + start_time = time.perf_counter() + serial_results = await serial_evaluation(num_invocations, num_samples, delay) + serial_time = time.perf_counter() - start_time + print(f"✓ Completed in {serial_time:.2f}s") + print() + + # Test parallel approach + print("Testing PARALLEL approach (new)...") + start_time = time.perf_counter() + parallel_results = await parallel_evaluation( + num_invocations, num_samples, delay + ) + parallel_time = time.perf_counter() - start_time + print(f"✓ Completed in {parallel_time:.2f}s") + print() + + # Calculate speedup + speedup = serial_time / parallel_time + time_saved = serial_time - parallel_time + + print("=" * 60) + print("RESULTS") + print("=" * 60) + print(f"Serial time: {serial_time:.2f}s") + print(f"Parallel time: {parallel_time:.2f}s") + print(f"Speedup: {speedup:.2f}x faster") + print( + f"Time saved: {time_saved:.2f}s ({time_saved/serial_time*100:.1f}%)" + ) + print("=" * 60) + + # Verify results are the same + assert len(serial_results) == len(parallel_results) + for i in range(len(serial_results)): + assert len(serial_results[i]) == len(parallel_results[i]) + print("✓ Results verified: both approaches produce same output structure") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/google/adk/evaluation/llm_as_judge.py b/src/google/adk/evaluation/llm_as_judge.py index de832395ab..365e0cee4b 100644 --- a/src/google/adk/evaluation/llm_as_judge.py +++ b/src/google/adk/evaluation/llm_as_judge.py @@ -15,6 +15,8 @@ from __future__ import annotations from abc import abstractmethod +import asyncio +from collections import defaultdict from typing import Optional from google.genai import types as genai_types @@ -33,7 +35,7 @@ from .eval_case import Invocation from .eval_metrics import BaseCriterion from .eval_metrics import EvalMetric -from .eval_metrics import RubricScore +from .eval_rubrics import RubricScore from .evaluator import EvaluationResult from .evaluator import Evaluator from .evaluator import PerInvocationResult @@ -114,6 +116,44 @@ def aggregate_invocation_results( ) -> EvaluationResult: """Aggregates the per invocation results to get the overall score.""" + async def _evaluate_single_sample( + self, + llm_request: LlmRequest, + actual: Invocation, + expected: Optional[Invocation], + ) -> PerInvocationResult: + """Evaluates a single sample for an invocation. + + Args: + llm_request: The LLM request to execute. + actual: The actual invocation to evaluate. + expected: The expected invocation (optional). + + Returns: + A PerInvocationResult containing the evaluation score and status. + """ + async with Aclosing( + self._judge_model.generate_content_async(llm_request) + ) as agen: + async for llm_response in agen: + # Non-streaming call, so there is only one response content. + auto_rater_score = self.convert_auto_rater_response_to_score( + llm_response + ) + return PerInvocationResult( + actual_invocation=actual, + expected_invocation=expected, + score=auto_rater_score.score, + eval_status=get_eval_status( + auto_rater_score.score, self._eval_metric.threshold + ), + rubric_scores=auto_rater_score.rubric_scores, + ) + # If we reach here, the LLM didn't return any response + raise RuntimeError( + "LLM evaluation failed: no response received from judge model" + ) + @override async def evaluate_invocations( self, @@ -133,8 +173,13 @@ async def evaluate_invocations( else expected_invocations ) - per_invocation_results = [] - for actual, expected in zip(actual_invocations, expected_invocations): + # Build all LLM evaluation tasks for parallel execution + tasks = [] + invocation_indices = [] # Track which invocation each task belongs to + + for invocation_idx, (actual, expected) in enumerate( + zip(actual_invocations, expected_invocations) + ): auto_rater_prompt = self.format_auto_rater_prompt(actual, expected) llm_request = LlmRequest( model=self._judge_model_options.judge_model, @@ -149,32 +194,30 @@ async def evaluate_invocations( ) add_default_retry_options_if_not_present(llm_request) num_samples = self._judge_model_options.num_samples - invocation_result_samples = [] + + # Create tasks for all samples of this invocation for _ in range(num_samples): - async with Aclosing( - self._judge_model.generate_content_async(llm_request) - ) as agen: - async for llm_response in agen: - # Non-streaming call, so there is only one response content. - auto_rater_score = self.convert_auto_rater_response_to_score( - llm_response - ) - invocation_result_samples.append( - PerInvocationResult( - actual_invocation=actual, - expected_invocation=expected, - score=auto_rater_score.score, - eval_status=get_eval_status( - auto_rater_score.score, self._eval_metric.threshold - ), - rubric_scores=auto_rater_score.rubric_scores, - ) - ) - if not invocation_result_samples: - continue - per_invocation_results.append( - self.aggregate_per_invocation_samples(invocation_result_samples) - ) + tasks.append( + self._evaluate_single_sample(llm_request, actual, expected) + ) + invocation_indices.append(invocation_idx) + + # Execute all tasks in parallel + all_results = await asyncio.gather(*tasks) + + # Group results by invocation + results_by_invocation = defaultdict(list) + for invocation_idx, result in zip(invocation_indices, all_results): + results_by_invocation[invocation_idx].append(result) + + # Aggregate samples for each invocation + per_invocation_results = [] + for invocation_idx in sorted(results_by_invocation.keys()): + invocation_result_samples = results_by_invocation[invocation_idx] + if invocation_result_samples: + per_invocation_results.append( + self.aggregate_per_invocation_samples(invocation_result_samples) + ) if per_invocation_results: return self.aggregate_invocation_results(per_invocation_results) diff --git a/tests/unittests/models/test_litellm.py b/tests/unittests/models/test_litellm.py index 2ebbc5dfe8..6eaf26efda 100644 --- a/tests/unittests/models/test_litellm.py +++ b/tests/unittests/models/test_litellm.py @@ -128,7 +128,7 @@ ] STREAMING_MODEL_RESPONSE = [ - ModelResponse( + ModelResponse.model_construct( model="test_model", choices=[ StreamingChoices( @@ -140,7 +140,7 @@ ) ], ), - ModelResponse( + ModelResponse.model_construct( model="test_model", choices=[ StreamingChoices( @@ -152,7 +152,7 @@ ) ], ), - ModelResponse( + ModelResponse.model_construct( model="test_model", choices=[ StreamingChoices( @@ -164,7 +164,7 @@ ) ], ), - ModelResponse( + ModelResponse.model_construct( model="test_model", choices=[ StreamingChoices( @@ -186,7 +186,7 @@ ) ], ), - ModelResponse( + ModelResponse.model_construct( model="test_model", choices=[ StreamingChoices( @@ -208,7 +208,7 @@ ) ], ), - ModelResponse( + ModelResponse.model_construct( model="test_model", choices=[ StreamingChoices( @@ -531,7 +531,7 @@ def test_schema_to_dict_filters_none_enum_values(): MULTIPLE_FUNCTION_CALLS_STREAM = [ - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -552,7 +552,7 @@ def test_schema_to_dict_filters_none_enum_values(): ) ] ), - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -573,7 +573,7 @@ def test_schema_to_dict_filters_none_enum_values(): ) ] ), - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -594,7 +594,7 @@ def test_schema_to_dict_filters_none_enum_values(): ) ] ), - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -615,7 +615,7 @@ def test_schema_to_dict_filters_none_enum_values(): ) ] ), - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason="tool_calls", @@ -626,7 +626,7 @@ def test_schema_to_dict_filters_none_enum_values(): STREAM_WITH_EMPTY_CHUNK = [ - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -647,7 +647,7 @@ def test_schema_to_dict_filters_none_enum_values(): ) ] ), - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -669,7 +669,7 @@ def test_schema_to_dict_filters_none_enum_values(): ] ), # This is the problematic empty chunk that should be ignored. - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -690,7 +690,7 @@ def test_schema_to_dict_filters_none_enum_values(): ) ] ), - ModelResponse( + ModelResponse.model_construct( choices=[StreamingChoices(finish_reason="tool_calls", delta=Delta())] ), ] @@ -726,7 +726,7 @@ def mock_response(): # indices all 0 # finish_reason stop instead of tool_calls NON_COMPLIANT_MULTIPLE_FUNCTION_CALLS_STREAM = [ - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -747,7 +747,7 @@ def mock_response(): ) ] ), - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -768,7 +768,7 @@ def mock_response(): ) ] ), - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -789,7 +789,7 @@ def mock_response(): ) ] ), - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -810,7 +810,7 @@ def mock_response(): ) ] ), - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason="stop", @@ -848,6 +848,8 @@ def __init__(self, acompletion_mock, completion_mock): async def acompletion(self, model, messages, tools, **kwargs): if kwargs.get("stream", False): + # For streaming, return an async generator + # Remove 'stream' from kwargs to avoid duplicate keyword argument kwargs_copy = dict(kwargs) kwargs_copy.pop("stream", None) @@ -859,6 +861,7 @@ async def stream_generator(): stream=True, **kwargs_copy, ) + # Iterate over the synchronous iterator and yield items asynchronously for item in stream_data: yield item @@ -2588,7 +2591,7 @@ def test_to_litellm_role(): "stop", ), ( - ModelResponse( + ModelResponse.model_construct( choices=[ StreamingChoices( finish_reason=None, @@ -2607,7 +2610,12 @@ def test_to_litellm_role(): ], ), ) - ] + ], + usage={ + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0, + }, ), [FunctionChunk(id="1", name="test_function", args='{"key": "va')], UsageMetadataChunk( @@ -2901,55 +2909,6 @@ async def test_generate_content_async_stream( ) -@pytest.mark.asyncio -async def test_generate_content_async_stream_sets_finish_reason( - mock_completion, lite_llm_instance -): - mock_completion.return_value = iter([ - ModelResponse( - model="test_model", - choices=[ - StreamingChoices( - finish_reason=None, - delta=Delta(role="assistant", content="Hello "), - ) - ], - ), - ModelResponse( - model="test_model", - choices=[ - StreamingChoices( - finish_reason=None, - delta=Delta(role="assistant", content="world"), - ) - ], - ), - ModelResponse( - model="test_model", - choices=[StreamingChoices(finish_reason="stop", delta=Delta())], - ), - ]) - - llm_request = LlmRequest( - contents=[ - types.Content( - role="user", parts=[types.Part.from_text(text="Test prompt")] - ) - ], - ) - - responses = [ - response - async for response in lite_llm_instance.generate_content_async( - llm_request, stream=True - ) - ] - - assert responses[-1].partial is False - assert responses[-1].finish_reason == types.FinishReason.STOP - assert responses[-1].content.parts[0].text == "Hello world" - - @pytest.mark.asyncio async def test_generate_content_async_stream_with_usage_metadata( mock_completion, lite_llm_instance