Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@

"""
DESCRIPTION:
Generates an evaluation dataset from an agent's recent conversation
traces. The sample:
Generates an evaluation dataset from an agent's conversation traces.
The sample is fully self-contained:

1. Creates a `DataGenerationJob` (scenario=EVALUATION, type=traces) that
reads spans from Application Insights for an existing agent within a
time window and synthesizes question / answer pairs into a new
versioned Dataset.
2. Polls the job to completion and resolves the resulting `DatasetVersion`.
3. Cleans up the generated dataset and the data generation job.
1. Creates an agent and seeds spans with a sample conversation.
2. Waits for ingestion, then submits a `DataGenerationJob`
(scenario=EVALUATION, source=traces) that synthesizes Q/A pairs.
3. Polls the job and fetches the resulting `DatasetVersion`.
4. Cleans up the dataset, job, seeded conversations, and agent.

The Traces source consumes existing telemetry, so no `model_options` are
required — the service derives samples directly from the agent's traces.
The agent must have at least one trace recorded within the configured
look-back window or the job will succeed with zero generated samples.
Prerequisite: the project must have an Application Insights resource
connected so the agent emits server-side traces.

To adapt for an existing agent with recent traces, replace step 1 with
your agent's name and skip the ingestion wait.

USAGE:
python sample_dataset_generation_job_traces_for_evaluation.py
Expand All @@ -29,25 +29,17 @@
pip install "azure-ai-projects>=2.2.0" azure-identity python-dotenv

Set these environment variables with your own values:
1) FOUNDRY_PROJECT_ENDPOINT - Required. The Azure AI Project endpoint, as found
in the overview page of your Microsoft Foundry project.
2) FOUNDRY_AGENT_NAME - Required. The name of an agent (Foundry Agent or
OpenTelemetry-instrumented third-party agent) that has recent
conversation traces in Application Insights.
3) DATASET_NAME - Optional. Name to assign to the generated output dataset.
Defaults to `traces-eval-sample`. The service caps the rendered output
name at 50 characters, so keep custom values short — the sample appends
a unique run id suffix.
4) FOUNDRY_TRACES_WINDOW_DAYS - Optional. How far back, in days, to look for
agent traces. Defaults to 7.
5) POLL_INTERVAL_SECONDS - Optional. Number of seconds to sleep between status
polls for the data generation job. Defaults to 10.
1) FOUNDRY_PROJECT_ENDPOINT - Required. The Azure AI Project endpoint, as
found in the overview page of your Microsoft Foundry project.
2) FOUNDRY_MODEL_NAME - Required. The Azure OpenAI deployment name used
to drive the agent during trace seeding.
"""

import os
import time
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional

from dotenv import load_dotenv

Expand All @@ -61,100 +53,172 @@
DatasetDataGenerationJobOutput,
DatasetVersion,
JobStatus,
PromptAgentDefinition,
TracesDataGenerationJobOptions,
TracesDataGenerationJobSource,
)

load_dotenv()


# Minimal persona + prompt; one seeded turn is enough for the job to succeed
# (max_samples is the cap on generated samples, not a floor on input traces).
AGENT_INSTRUCTIONS = (
"Widgets & Gizmos support agent. Be concise. "
"Refunds: unopened 30 days; defective 90 days; 5-7 business days to process."
)
SEED_PROMPT = "What is your refund policy?"


endpoint = os.environ["FOUNDRY_PROJECT_ENDPOINT"]
agent_name = os.environ["FOUNDRY_AGENT_NAME"]
dataset_name = os.environ.get("DATASET_NAME", "traces-eval-sample")
traces_window_days = int(os.environ.get("FOUNDRY_TRACES_WINDOW_DAYS", "7"))
poll_interval_seconds = int(os.environ.get("POLL_INTERVAL_SECONDS", "10"))
model_deployment = os.environ["FOUNDRY_MODEL_NAME"]
DATASET_NAME = "traces-eval-sample"
POLL_INTERVAL_SECONDS = 10
TRACE_INGESTION_WAIT_SECONDS = 180

# Unique per-run output dataset name so repeated runs do not collide.
# Output names are capped at 50 characters by the service.
# Per-run id keeps repeated runs from colliding; output names are capped at 50 chars.
run_id = f"{datetime.now(tz=timezone.utc).strftime('%y%m%d%H%M%S')}-{uuid.uuid4().hex[:4]}"
output_dataset_name = f"{dataset_name}-{run_id}"
if len(output_dataset_name) > 50:
raise ValueError(
f"Output dataset name `{output_dataset_name}` exceeds the 50-character service limit. "
f"Lower DATASET_NAME (currently `{dataset_name}`) so that `<DATASET_NAME>-<run id>` fits within 50 characters."
)

# Trace look-back window: now - `traces_window_days` ... now.
end_time = datetime.now(tz=timezone.utc)
start_time = end_time - timedelta(days=traces_window_days)
output_dataset_name = f"{DATASET_NAME}-{run_id}"
agent_name = f"{DATASET_NAME}-{run_id}"

TERMINAL_STATUSES = {JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.CANCELLED}


with (
DefaultAzureCredential() as credential,
AIProjectClient(endpoint=endpoint, credential=credential) as project_client,
):

# ------------------------------------------------------------------
# 1. Submit a data generation job that reads agent traces.
# ------------------------------------------------------------------
print(f"Create a data generation job from traces for agent `{agent_name}` (window: {traces_window_days} day(s)).")
job = DataGenerationJob(
inputs=DataGenerationJobInputs(
name=f"traces-eval-{run_id}",
scenario=DataGenerationJobScenario.EVALUATION,
sources=[
TracesDataGenerationJobSource(
description="Application Insights conversation traces for the Foundry agent.",
agent_name=agent_name,
start_time=start_time,
end_time=end_time,
created_agent = None
created_conversation_id: Optional[str] = None
submitted_job_id: Optional[str] = None
created_dataset: Optional[DatasetVersion] = None

try:
# 1. Create an agent and seed traces.
print(f"Create agent `{agent_name}` (model: `{model_deployment}`).")
created_agent = project_client.agents.create_version(
agent_name=agent_name,
definition=PromptAgentDefinition(model=model_deployment, instructions=AGENT_INSTRUCTIONS),
)
print(f"Agent created (id: {created_agent.id}, version: {created_agent.version}).")

seed_start = datetime.now(tz=timezone.utc)
print(f"Seed one conversation against the agent (prompt: {SEED_PROMPT!r}).")
with project_client.get_openai_client() as openai_client:
conversation = openai_client.conversations.create()
created_conversation_id = conversation.id
print(f" - conversation id: {conversation.id}")
openai_client.responses.create(
conversation=conversation.id,
input=SEED_PROMPT,
extra_body={
"agent_reference": {
"name": created_agent.name,
"id": created_agent.id,
"type": "agent_reference",
}
},
)

print(f"Wait {TRACE_INGESTION_WAIT_SECONDS}s for Application Insights to ingest the spans.", flush=True)
time.sleep(TRACE_INGESTION_WAIT_SECONDS)

# 2. Submit a data generation job that reads the agent's traces.
# Small backoff so the seeded spans fall inside the queried window.
start_time = seed_start - timedelta(minutes=5)
end_time = datetime.now(tz=timezone.utc)

print(
f"Create a data generation job from traces for agent `{agent_name}` "
f"(window: {start_time.isoformat()} .. {end_time.isoformat()})."
)
job = project_client.beta.datasets.create_generation_job(
job=DataGenerationJob(
inputs=DataGenerationJobInputs(
name=f"traces-eval-{run_id}",
scenario=DataGenerationJobScenario.EVALUATION,
sources=[
TracesDataGenerationJobSource(
description="Application Insights conversation traces for the agent.",
agent_name=agent_name,
start_time=start_time,
end_time=end_time,
),
],
# Service requires max_samples in [15, 1000]. It's a cap on
# generated samples - one seeded trace turn is enough.
options=TracesDataGenerationJobOptions(max_samples=15),
output_options=DataGenerationJobOutputOptions(name=output_dataset_name),
),
],
options=TracesDataGenerationJobOptions(
# Service requires max_samples to be between 15 and 1000.
max_samples=15,
),
output_options=DataGenerationJobOutputOptions(name=output_dataset_name),
),
)
job = project_client.beta.datasets.create_generation_job(job=job)
print(f"Created data generation job `{job.id}` (status: `{job.status}`).")

print(f"Poll job `{job.id}` until it reaches a terminal state.", end="", flush=True)
while True:
job = project_client.beta.datasets.get_generation_job(job_id=job.id)
if job.status in TERMINAL_STATUSES:
break
time.sleep(poll_interval_seconds)
print(".", end="", flush=True)
print()
print(f"Final job status: `{job.status}`.")

if job.status != JobStatus.SUCCEEDED:
message = job.error.message if job.error is not None else "<no error message>"
raise RuntimeError(f"Job `{job.id}` ended with status `{job.status}`: {message}")

# Locate the Dataset output produced by the job.
output_name: str = ""
output_version: str = ""
for output in (job.result.outputs if job.result is not None else None) or []:
if isinstance(output, DatasetDataGenerationJobOutput):
output_name = output.name or ""
output_version = output.version or ""
break
if not output_name or not output_version:
raise RuntimeError(f"Job `{job.id}` did not produce a dataset output.")

dataset: DatasetVersion = project_client.datasets.get(name=output_name, version=output_version)
print(f"Generated dataset: name=`{dataset.name}` version=`{dataset.version}` id=`{dataset.id}`")
if job.result is not None and job.result.generated_samples is not None:
print(f"Generated samples: {job.result.generated_samples}")

# ------------------------------------------------------------------
# 2. Clean up.
# ------------------------------------------------------------------
print(f"Delete the generated dataset `{dataset.name}` v{dataset.version}.")
project_client.datasets.delete(name=dataset.name or "", version=dataset.version or "")

print(f"Delete the data generation job `{job.id}`.")
project_client.beta.datasets.delete_generation_job(job_id=job.id)
)
submitted_job_id = job.id
print(f"Created data generation job `{job.id}` (status: `{job.status}`).")

print(f"Poll job `{job.id}` until it reaches a terminal state.", end="", flush=True)
while job.status not in TERMINAL_STATUSES:
time.sleep(POLL_INTERVAL_SECONDS)
print(".", end="", flush=True)
job = project_client.beta.datasets.get_generation_job(job_id=job.id)
print()
print(f"Final job status: `{job.status}`.")

if job.status != JobStatus.SUCCEEDED:
message = job.error.message if job.error is not None else "<no error message>"
raise RuntimeError(f"Job `{job.id}` ended with status `{job.status}`: {message}")

# 3. Resolve the generated dataset.
outputs = (job.result.outputs if job.result is not None else None) or []
dataset_output = next(
(o for o in outputs if isinstance(o, DatasetDataGenerationJobOutput)), None
)
if dataset_output is None or not dataset_output.name or not dataset_output.version:
raise RuntimeError(f"Job `{job.id}` did not produce a dataset output.")

created_dataset = project_client.datasets.get(
name=dataset_output.name, version=dataset_output.version
)
print(
f"Generated dataset: name=`{created_dataset.name}` "
f"version=`{created_dataset.version}` id=`{created_dataset.id}`"
)
if job.result is not None and job.result.generated_samples is not None:
print(f"Generated samples: {job.result.generated_samples}")

finally:
# Best-effort cleanup, outputs -> producers (dataset, job, conversations, agent).
if created_dataset is not None:
try:
project_client.datasets.delete(
name=created_dataset.name or "",
version=created_dataset.version or "",
)
print(f"Deleted dataset `{created_dataset.name}` v{created_dataset.version}.")
except Exception as exc: # pylint: disable=broad-exception-caught
print(f" (warning) could not delete dataset: {exc}")

if submitted_job_id is not None:
try:
project_client.beta.datasets.delete_generation_job(job_id=submitted_job_id)
print(f"Deleted data generation job `{submitted_job_id}`.")
except Exception as exc: # pylint: disable=broad-exception-caught
print(f" (warning) could not delete job: {exc}")

if created_conversation_id is not None:
try:
with project_client.get_openai_client() as openai_client:
openai_client.conversations.delete(conversation_id=created_conversation_id)
print(f"Deleted seeded conversation `{created_conversation_id}`.")
except Exception as exc: # pylint: disable=broad-exception-caught
print(f" (warning) could not delete conversation: {exc}")

if created_agent is not None:
try:
project_client.agents.delete_version(
agent_name=created_agent.name,
agent_version=created_agent.version,
)
print(f"Deleted agent `{created_agent.name}` v{created_agent.version}.")
except Exception as exc: # pylint: disable=broad-exception-caught
print(f" (warning) could not delete agent: {exc}")