diff --git a/sdk/ai/azure-ai-projects/samples/datasets/sample_dataset_generation_job_traces_for_evaluation.py b/sdk/ai/azure-ai-projects/samples/datasets/sample_dataset_generation_job_traces_for_evaluation.py index 9b2ca86a89bf..498a2e9bdca2 100644 --- a/sdk/ai/azure-ai-projects/samples/datasets/sample_dataset_generation_job_traces_for_evaluation.py +++ b/sdk/ai/azure-ai-projects/samples/datasets/sample_dataset_generation_job_traces_for_evaluation.py @@ -6,20 +6,20 @@ """ DESCRIPTION: - Generates an evaluation dataset from an agent's recent conversation - traces. The sample: + Generates an evaluation dataset from an agent's conversation traces. + The sample is fully self-contained: - 1. Creates a `DataGenerationJob` (scenario=EVALUATION, type=traces) that - reads spans from Application Insights for an existing agent within a - time window and synthesizes question / answer pairs into a new - versioned Dataset. - 2. Polls the job to completion and resolves the resulting `DatasetVersion`. - 3. Cleans up the generated dataset and the data generation job. + 1. Creates an agent and seeds spans with a sample conversation. + 2. Waits for ingestion, then submits a `DataGenerationJob` + (scenario=EVALUATION, source=traces) that synthesizes Q/A pairs. + 3. Polls the job and fetches the resulting `DatasetVersion`. + 4. Cleans up the dataset, job, seeded conversations, and agent. - The Traces source consumes existing telemetry, so no `model_options` are - required — the service derives samples directly from the agent's traces. - The agent must have at least one trace recorded within the configured - look-back window or the job will succeed with zero generated samples. + Prerequisite: the project must have an Application Insights resource + connected so the agent emits server-side traces. + + To adapt for an existing agent with recent traces, replace step 1 with + your agent's name and skip the ingestion wait. USAGE: python sample_dataset_generation_job_traces_for_evaluation.py @@ -29,25 +29,17 @@ pip install "azure-ai-projects>=2.2.0" azure-identity python-dotenv Set these environment variables with your own values: - 1) FOUNDRY_PROJECT_ENDPOINT - Required. The Azure AI Project endpoint, as found - in the overview page of your Microsoft Foundry project. - 2) FOUNDRY_AGENT_NAME - Required. The name of an agent (Foundry Agent or - OpenTelemetry-instrumented third-party agent) that has recent - conversation traces in Application Insights. - 3) DATASET_NAME - Optional. Name to assign to the generated output dataset. - Defaults to `traces-eval-sample`. The service caps the rendered output - name at 50 characters, so keep custom values short — the sample appends - a unique run id suffix. - 4) FOUNDRY_TRACES_WINDOW_DAYS - Optional. How far back, in days, to look for - agent traces. Defaults to 7. - 5) POLL_INTERVAL_SECONDS - Optional. Number of seconds to sleep between status - polls for the data generation job. Defaults to 10. + 1) FOUNDRY_PROJECT_ENDPOINT - Required. The Azure AI Project endpoint, as + found in the overview page of your Microsoft Foundry project. + 2) FOUNDRY_MODEL_NAME - Required. The Azure OpenAI deployment name used + to drive the agent during trace seeding. """ import os import time import uuid from datetime import datetime, timedelta, timezone +from typing import Optional from dotenv import load_dotenv @@ -61,100 +53,172 @@ DatasetDataGenerationJobOutput, DatasetVersion, JobStatus, + PromptAgentDefinition, TracesDataGenerationJobOptions, TracesDataGenerationJobSource, ) load_dotenv() + +# Minimal persona + prompt; one seeded turn is enough for the job to succeed +# (max_samples is the cap on generated samples, not a floor on input traces). +AGENT_INSTRUCTIONS = ( + "Widgets & Gizmos support agent. Be concise. " + "Refunds: unopened 30 days; defective 90 days; 5-7 business days to process." +) +SEED_PROMPT = "What is your refund policy?" + + endpoint = os.environ["FOUNDRY_PROJECT_ENDPOINT"] -agent_name = os.environ["FOUNDRY_AGENT_NAME"] -dataset_name = os.environ.get("DATASET_NAME", "traces-eval-sample") -traces_window_days = int(os.environ.get("FOUNDRY_TRACES_WINDOW_DAYS", "7")) -poll_interval_seconds = int(os.environ.get("POLL_INTERVAL_SECONDS", "10")) +model_deployment = os.environ["FOUNDRY_MODEL_NAME"] +DATASET_NAME = "traces-eval-sample" +POLL_INTERVAL_SECONDS = 10 +TRACE_INGESTION_WAIT_SECONDS = 180 -# Unique per-run output dataset name so repeated runs do not collide. -# Output names are capped at 50 characters by the service. +# Per-run id keeps repeated runs from colliding; output names are capped at 50 chars. run_id = f"{datetime.now(tz=timezone.utc).strftime('%y%m%d%H%M%S')}-{uuid.uuid4().hex[:4]}" -output_dataset_name = f"{dataset_name}-{run_id}" -if len(output_dataset_name) > 50: - raise ValueError( - f"Output dataset name `{output_dataset_name}` exceeds the 50-character service limit. " - f"Lower DATASET_NAME (currently `{dataset_name}`) so that `-` fits within 50 characters." - ) - -# Trace look-back window: now - `traces_window_days` ... now. -end_time = datetime.now(tz=timezone.utc) -start_time = end_time - timedelta(days=traces_window_days) +output_dataset_name = f"{DATASET_NAME}-{run_id}" +agent_name = f"{DATASET_NAME}-{run_id}" TERMINAL_STATUSES = {JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.CANCELLED} + with ( DefaultAzureCredential() as credential, AIProjectClient(endpoint=endpoint, credential=credential) as project_client, ): - # ------------------------------------------------------------------ - # 1. Submit a data generation job that reads agent traces. - # ------------------------------------------------------------------ - print(f"Create a data generation job from traces for agent `{agent_name}` (window: {traces_window_days} day(s)).") - job = DataGenerationJob( - inputs=DataGenerationJobInputs( - name=f"traces-eval-{run_id}", - scenario=DataGenerationJobScenario.EVALUATION, - sources=[ - TracesDataGenerationJobSource( - description="Application Insights conversation traces for the Foundry agent.", - agent_name=agent_name, - start_time=start_time, - end_time=end_time, + created_agent = None + created_conversation_id: Optional[str] = None + submitted_job_id: Optional[str] = None + created_dataset: Optional[DatasetVersion] = None + + try: + # 1. Create an agent and seed traces. + print(f"Create agent `{agent_name}` (model: `{model_deployment}`).") + created_agent = project_client.agents.create_version( + agent_name=agent_name, + definition=PromptAgentDefinition(model=model_deployment, instructions=AGENT_INSTRUCTIONS), + ) + print(f"Agent created (id: {created_agent.id}, version: {created_agent.version}).") + + seed_start = datetime.now(tz=timezone.utc) + print(f"Seed one conversation against the agent (prompt: {SEED_PROMPT!r}).") + with project_client.get_openai_client() as openai_client: + conversation = openai_client.conversations.create() + created_conversation_id = conversation.id + print(f" - conversation id: {conversation.id}") + openai_client.responses.create( + conversation=conversation.id, + input=SEED_PROMPT, + extra_body={ + "agent_reference": { + "name": created_agent.name, + "id": created_agent.id, + "type": "agent_reference", + } + }, + ) + + print(f"Wait {TRACE_INGESTION_WAIT_SECONDS}s for Application Insights to ingest the spans.", flush=True) + time.sleep(TRACE_INGESTION_WAIT_SECONDS) + + # 2. Submit a data generation job that reads the agent's traces. + # Small backoff so the seeded spans fall inside the queried window. + start_time = seed_start - timedelta(minutes=5) + end_time = datetime.now(tz=timezone.utc) + + print( + f"Create a data generation job from traces for agent `{agent_name}` " + f"(window: {start_time.isoformat()} .. {end_time.isoformat()})." + ) + job = project_client.beta.datasets.create_generation_job( + job=DataGenerationJob( + inputs=DataGenerationJobInputs( + name=f"traces-eval-{run_id}", + scenario=DataGenerationJobScenario.EVALUATION, + sources=[ + TracesDataGenerationJobSource( + description="Application Insights conversation traces for the agent.", + agent_name=agent_name, + start_time=start_time, + end_time=end_time, + ), + ], + # Service requires max_samples in [15, 1000]. It's a cap on + # generated samples - one seeded trace turn is enough. + options=TracesDataGenerationJobOptions(max_samples=15), + output_options=DataGenerationJobOutputOptions(name=output_dataset_name), ), - ], - options=TracesDataGenerationJobOptions( - # Service requires max_samples to be between 15 and 1000. - max_samples=15, ), - output_options=DataGenerationJobOutputOptions(name=output_dataset_name), - ), - ) - job = project_client.beta.datasets.create_generation_job(job=job) - print(f"Created data generation job `{job.id}` (status: `{job.status}`).") - - print(f"Poll job `{job.id}` until it reaches a terminal state.", end="", flush=True) - while True: - job = project_client.beta.datasets.get_generation_job(job_id=job.id) - if job.status in TERMINAL_STATUSES: - break - time.sleep(poll_interval_seconds) - print(".", end="", flush=True) - print() - print(f"Final job status: `{job.status}`.") - - if job.status != JobStatus.SUCCEEDED: - message = job.error.message if job.error is not None else "" - raise RuntimeError(f"Job `{job.id}` ended with status `{job.status}`: {message}") - - # Locate the Dataset output produced by the job. - output_name: str = "" - output_version: str = "" - for output in (job.result.outputs if job.result is not None else None) or []: - if isinstance(output, DatasetDataGenerationJobOutput): - output_name = output.name or "" - output_version = output.version or "" - break - if not output_name or not output_version: - raise RuntimeError(f"Job `{job.id}` did not produce a dataset output.") - - dataset: DatasetVersion = project_client.datasets.get(name=output_name, version=output_version) - print(f"Generated dataset: name=`{dataset.name}` version=`{dataset.version}` id=`{dataset.id}`") - if job.result is not None and job.result.generated_samples is not None: - print(f"Generated samples: {job.result.generated_samples}") - - # ------------------------------------------------------------------ - # 2. Clean up. - # ------------------------------------------------------------------ - print(f"Delete the generated dataset `{dataset.name}` v{dataset.version}.") - project_client.datasets.delete(name=dataset.name or "", version=dataset.version or "") - - print(f"Delete the data generation job `{job.id}`.") - project_client.beta.datasets.delete_generation_job(job_id=job.id) + ) + submitted_job_id = job.id + print(f"Created data generation job `{job.id}` (status: `{job.status}`).") + + print(f"Poll job `{job.id}` until it reaches a terminal state.", end="", flush=True) + while job.status not in TERMINAL_STATUSES: + time.sleep(POLL_INTERVAL_SECONDS) + print(".", end="", flush=True) + job = project_client.beta.datasets.get_generation_job(job_id=job.id) + print() + print(f"Final job status: `{job.status}`.") + + if job.status != JobStatus.SUCCEEDED: + message = job.error.message if job.error is not None else "" + raise RuntimeError(f"Job `{job.id}` ended with status `{job.status}`: {message}") + + # 3. Resolve the generated dataset. + outputs = (job.result.outputs if job.result is not None else None) or [] + dataset_output = next( + (o for o in outputs if isinstance(o, DatasetDataGenerationJobOutput)), None + ) + if dataset_output is None or not dataset_output.name or not dataset_output.version: + raise RuntimeError(f"Job `{job.id}` did not produce a dataset output.") + + created_dataset = project_client.datasets.get( + name=dataset_output.name, version=dataset_output.version + ) + print( + f"Generated dataset: name=`{created_dataset.name}` " + f"version=`{created_dataset.version}` id=`{created_dataset.id}`" + ) + if job.result is not None and job.result.generated_samples is not None: + print(f"Generated samples: {job.result.generated_samples}") + + finally: + # Best-effort cleanup, outputs -> producers (dataset, job, conversations, agent). + if created_dataset is not None: + try: + project_client.datasets.delete( + name=created_dataset.name or "", + version=created_dataset.version or "", + ) + print(f"Deleted dataset `{created_dataset.name}` v{created_dataset.version}.") + except Exception as exc: # pylint: disable=broad-exception-caught + print(f" (warning) could not delete dataset: {exc}") + + if submitted_job_id is not None: + try: + project_client.beta.datasets.delete_generation_job(job_id=submitted_job_id) + print(f"Deleted data generation job `{submitted_job_id}`.") + except Exception as exc: # pylint: disable=broad-exception-caught + print(f" (warning) could not delete job: {exc}") + + if created_conversation_id is not None: + try: + with project_client.get_openai_client() as openai_client: + openai_client.conversations.delete(conversation_id=created_conversation_id) + print(f"Deleted seeded conversation `{created_conversation_id}`.") + except Exception as exc: # pylint: disable=broad-exception-caught + print(f" (warning) could not delete conversation: {exc}") + + if created_agent is not None: + try: + project_client.agents.delete_version( + agent_name=created_agent.name, + agent_version=created_agent.version, + ) + print(f"Deleted agent `{created_agent.name}` v{created_agent.version}.") + except Exception as exc: # pylint: disable=broad-exception-caught + print(f" (warning) could not delete agent: {exc}")