Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed MACS real-data tests passing `{"environment_data": task.environment_data}` instead of `task.environment_data` directly, which caused `setup_state` to silently receive an empty tools list. (PR: #58)
- Benchmark reports from `Benchmark.run()` now have a consistent schema across every outcome. Setup failures, setup timeouts, and unexpected worker failures in parallel runs previously produced reports missing the `usage` and `task` keys (with empty `traces`/`config`). Every report now always includes `task_id`, `repeat_idx`, `status`, `error`, `traces`, `config`, `usage`, `eval`, and `task`, and `report["error"]` is always populated whenever `status` is not `SUCCESS`. (PR: #61)
- `fail_on_setup_error`, `fail_on_task_error`, and `fail_on_evaluation_error` now abort a parallel `Benchmark.run()` the same way they abort a sequential run. Previously a parallel run swallowed the failure into a degraded report and kept going. (PR: #61)
- Token usage and cost for LLM judges and other evaluator-owned models are now correctly captured in per-task reports (`report["usage"]`) and in `benchmark.usage` / `benchmark.usage_by_component`. Previously these entries showed zero tokens because the usage snapshot was taken before evaluators ran. Affects every benchmark that registers a model in `setup_evaluators` (e.g. ConVerse, MultiAgentBench). (PR: #63)

### Removed

Expand Down
19 changes: 11 additions & 8 deletions maseval/core/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,12 +1254,11 @@ def _execute_task_repetition(

final_answers = None

# 3. Collect traces, configs, and usage (always attempt this)
# 3. Collect traces and configs (always attempt this)
execution_usage: Optional[Dict[str, Any]] = None
try:
execution_configs = self.collect_all_configs()
execution_traces = self.collect_all_traces()
execution_usage = self.collect_all_usage()
# Store in context for potential timeout errors
context.set_collected_traces(execution_traces)
except Exception as e:
Expand All @@ -1272,11 +1271,6 @@ def _execute_task_repetition(
"error": f"Failed to collect traces: {e}",
"error_type": type(e).__name__,
}
if execution_usage is None:
execution_usage = {
"error": f"Failed to collect usage: {e}",
"error_type": type(e).__name__,
}

# 4. Evaluate (skip if task execution failed)
if execution_status == TaskExecutionStatus.SUCCESS:
Expand Down Expand Up @@ -1311,7 +1305,16 @@ def _execute_task_repetition(
# Task execution failed, so skip evaluation
eval_results = None

# 5. Build report — all keys always present for consistent schema
# 5. Collect usage after evaluate() so judge/evaluator-owned model tokens are captured.
try:
execution_usage = self.collect_all_usage()
except Exception as e:
execution_usage = {
"error": f"Failed to collect usage: {e}",
"error_type": type(e).__name__,
}

# 6. Build report — all keys always present for consistent schema
report = self._build_report(
task,
repeat_idx,
Expand Down
153 changes: 153 additions & 0 deletions tests/test_core/test_benchmark/test_usage_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import pytest
from maseval import TaskQueue
from maseval.core.exceptions import AgentError
from conftest import DummyBenchmark, DummyModelAdapter


@pytest.mark.core
Expand Down Expand Up @@ -96,3 +98,154 @@ def test_usage_property_returns_total(self):
total = benchmark.usage
assert total is not None
# cost may be None if DummyModelAdapter doesn't provide usage


# ---------------------------------------------------------------------------
# Regression tests for issue #60: judge (evaluator) token usage collection
# ---------------------------------------------------------------------------


class _JudgeEvaluator:
"""Minimal evaluator that invokes a model adapter at evaluate-time.

Not an ``Evaluator`` subclass — it implements the duck-typed
interface (``filter_traces`` + ``__call__``) that ``DummyBenchmark.evaluate``
iterates over (see ``tests/conftest.py``).
"""

def __init__(self, model):
self.model = model

def filter_traces(self, traces):
return traces

def __call__(self, traces, final_answer=None):
# Invokes the judge at evaluate-time (not at setup).
self.model.chat([{"role": "user", "content": "judge this"}])
return {"score": 1.0, "passed": True}


def _make_judge_benchmark(judge_usage):
"""Build a JudgeBenchmark whose setup_evaluators registers a judge model.

The judge model is created with the provided per-call usage dict. Each
call to the model appends one usage record, so a single evaluator
invocation produces exactly one record's worth of tokens.
"""

class JudgeBenchmark(DummyBenchmark):
def setup_evaluators(self, environment, task, agents, user, seed_generator):
judge_model = DummyModelAdapter(model_id="judge", usage=judge_usage)
self.register("models", "judge_model", judge_model)
return [_JudgeEvaluator(model=judge_model)]

return JudgeBenchmark()


@pytest.mark.core
class TestBenchmarkJudgeUsage:
"""Tests that judge token usage reaches both per-task reports and ``benchmark.usage``."""

def test_judge_model_usage_captured_in_report(self):
"""A judge model invoked during evaluate() has non-zero usage in
report['usage']['models']['judge_model']."""
judge_usage = {"input_tokens": 100, "output_tokens": 50, "total_tokens": 150}
tasks = TaskQueue.from_list([{"query": "Test", "environment_data": {}}])
benchmark = _make_judge_benchmark(judge_usage)

reports = benchmark.run(tasks, agent_data={"model": "test"})

models = reports[0]["usage"]["models"]
assert "judge_model" in models, f"judge_model not registered; got: {list(models)}"
judge_entry = models["judge_model"]
assert judge_entry["input_tokens"] == 100
assert judge_entry["output_tokens"] == 50
assert judge_entry["total_tokens"] == 150

def test_judge_model_usage_aggregated_in_benchmark_total(self):
"""``benchmark.usage`` includes judge tokens, and
``benchmark.usage_by_component`` has a non-zero ``models:judge_model``
entry."""
judge_usage = {"input_tokens": 100, "output_tokens": 50, "total_tokens": 150}
tasks = TaskQueue.from_list([{"query": "Test", "environment_data": {}}])
benchmark = _make_judge_benchmark(judge_usage)

benchmark.run(tasks, agent_data={"model": "test"})

assert benchmark.usage.input_tokens >= 100
assert benchmark.usage.output_tokens >= 50

by_component = benchmark.usage_by_component
assert "models:judge_model" in by_component, f"keys: {list(by_component)}"
assert by_component["models:judge_model"].input_tokens == 100
assert by_component["models:judge_model"].output_tokens == 50

def test_agent_usage_captured_when_evaluation_raises(self):
"""When evaluate() raises and fail_on_evaluation_error=False, the
report still carries a real usage dict (step 5 runs after step 4)."""

class RaisingEvaluator:
def __init__(self, task, environment, user):
_ = task, environment, user

def filter_traces(self, traces):
return traces

def __call__(self, traces, final_answer=None):
raise RuntimeError("boom — simulated evaluator failure")

class RaisingJudgeBenchmark(DummyBenchmark):
def setup_evaluators(self, environment, task, agents, user, seed_generator):
# Register an agent-side model with usage so we can assert
# that pre-evaluate usage still survives the eval failure.
agent_model = DummyModelAdapter(
model_id="agent_model",
usage={"input_tokens": 42, "output_tokens": 7, "total_tokens": 49},
)
self.register("models", "agent_model", agent_model)
# Drive the model once so it has a usage record.
agent_model.chat([{"role": "user", "content": "hi"}])
return [RaisingEvaluator(task, environment, user)]

tasks = TaskQueue.from_list([{"query": "Test", "environment_data": {}}])
benchmark = RaisingJudgeBenchmark(fail_on_evaluation_error=False)

reports = benchmark.run(tasks, agent_data={"model": "test"})

report = reports[0]
assert report["status"] == "evaluation_failed"
usage = report["usage"]
assert "error" not in usage, f"usage became an error dict: {usage}"
assert usage["models"]["agent_model"]["input_tokens"] == 42
assert usage["models"]["agent_model"]["output_tokens"] == 7

def test_agent_usage_captured_when_execution_raises(self):
"""When run_agents raises (execution failure) and fail_on_task_error
is False, the report still carries a real usage dict. Evaluate is
skipped, but step 5 still runs."""

class FailingAgentBenchmark(DummyBenchmark):
def setup_agents(self, agent_data, environment, task, user, seed_generator):
agent_model = DummyModelAdapter(
model_id="agent_model",
usage={"input_tokens": 11, "output_tokens": 3, "total_tokens": 14},
)
self.register("models", "agent_model", agent_model)
# Drive the model once so it has a usage record.
agent_model.chat([{"role": "user", "content": "hi"}])
return super().setup_agents(agent_data, environment, task, user, seed_generator)

def run_agents(self, agents, task, environment, query):
raise AgentError("simulated agent failure", component="agent")

tasks = TaskQueue.from_list([{"query": "Test", "environment_data": {}}])
benchmark = FailingAgentBenchmark(fail_on_task_error=False)

reports = benchmark.run(tasks, agent_data={"model": "test"})

report = reports[0]
assert report["status"] == "agent_error"
usage = report["usage"]
assert "error" not in usage, f"usage became an error dict: {usage}"
assert usage["models"]["agent_model"]["input_tokens"] == 11
assert usage["models"]["agent_model"]["output_tokens"] == 3
Loading