From f401a9735070c21ca567e936dd138ec787066b20 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Fri, 20 Feb 2026 15:11:45 -0800 Subject: [PATCH 1/5] Automate sample validation: part 1 --- .../_handoff.py | 9 +- .../redis/redis_conversation.py | 1 - python/samples/_sample_validation/README.md | 191 +++++++++++++++ python/samples/_sample_validation/__init__.py | 25 ++ python/samples/_sample_validation/__main__.py | 122 ++++++++++ .../create_dynamic_workflow_executor.py | 217 ++++++++++++++++++ .../samples/_sample_validation/discovery.py | 116 ++++++++++ python/samples/_sample_validation/models.py | 162 +++++++++++++ python/samples/_sample_validation/report.py | 100 ++++++++ ...un_dynamic_validation_workflow_executor.py | 59 +++++ python/samples/_sample_validation/workflow.py | 42 ++++ 11 files changed, 1039 insertions(+), 5 deletions(-) create mode 100644 python/samples/_sample_validation/README.md create mode 100644 python/samples/_sample_validation/__init__.py create mode 100644 python/samples/_sample_validation/__main__.py create mode 100644 python/samples/_sample_validation/create_dynamic_workflow_executor.py create mode 100644 python/samples/_sample_validation/discovery.py create mode 100644 python/samples/_sample_validation/models.py create mode 100644 python/samples/_sample_validation/report.py create mode 100644 python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py create mode 100644 python/samples/_sample_validation/workflow.py diff --git a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py index e76b34b62a..c807a675f6 100644 --- a/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py +++ b/python/packages/orchestrations/agent_framework_orchestrations/_handoff.py @@ -355,9 +355,9 @@ def _create_handoff_tool(self, target_id: str, description: str | None = None) - # results, so the function body never actually runs in practice. @tool(name=tool_name, description=doc, approval_mode="never_require") - def _handoff_tool(context: str | None = None) -> str: - """Return a deterministic acknowledgement that encodes the target alias.""" - return f"Handoff to {target_id}" + def _handoff_tool() -> None: + """This function will be intercepted by the auto-handoff middleware thus the body will never execute.""" + pass return _handoff_tool @@ -414,7 +414,8 @@ async def _run_agent_and_emit( ) await cast(WorkflowContext[AgentExecutorRequest], ctx).send_message( - AgentExecutorRequest(messages=[], should_respond=True), target_id=handoff_target + AgentExecutorRequest(messages=[], should_respond=True), + target_id=handoff_target, ) await ctx.add_event( WorkflowEvent("handoff_sent", data=HandoffSentEvent(source=self.id, target=handoff_target)) diff --git a/python/samples/02-agents/context_providers/redis/redis_conversation.py b/python/samples/02-agents/context_providers/redis/redis_conversation.py index 72192767e8..8e8199fc78 100644 --- a/python/samples/02-agents/context_providers/redis/redis_conversation.py +++ b/python/samples/02-agents/context_providers/redis/redis_conversation.py @@ -21,7 +21,6 @@ import asyncio import os -from agent_framework import AgentSession from agent_framework.azure import AzureOpenAIResponsesClient from agent_framework.redis import RedisContextProvider from azure.identity import AzureCliCredential diff --git a/python/samples/_sample_validation/README.md b/python/samples/_sample_validation/README.md new file mode 100644 index 0000000000..04e8c16fad --- /dev/null +++ b/python/samples/_sample_validation/README.md @@ -0,0 +1,191 @@ +# Sample Validation System + +An AI-powered workflow system for validating Python samples by discovering them, dynamically creating a nested concurrent workflow, and producing a report. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ Sample Validation Workflow │ +│ (Sequential - 4 Executors) │ +└─────────────────────────────────────────────────────────────────────┘ + │ + ┌──────────────────────────┼──────────────────────────┐ + ▼ ▼ ▼ +┌───────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ Discover │ ──► │ Create Dynamic │ ──► │ Run Nested │ +│ Samples │ │ Concurrent Flow │ │ Workflow │ +└───────────────┘ └─────────────────┘ └─────────────────┘ + │ │ │ + ▼ ▼ ▼ + List[SampleInfo] WorkflowCreationResult ExecutionResult + (N GitHub agents) │ + ▼ + ┌─────────────────┐ + │ Generate Report │ + └─────────────────┘ + │ + ▼ + Report +``` + +### Nested Workflow Strategy + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ Nested Concurrent Workflow (dynamic) │ +├─────────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────────────────────────────────────────────────────┐ │ +│ │ ConcurrentBuilder(participants=[agent_1 ... agent_N]) │ │ +│ │ - N equals number of discovered samples │ │ +│ │ - Each agent validates one assigned sample │ │ +│ │ - Agents run in parallel using GitHub Copilot │ │ +│ │ - Custom aggregator converts agent JSON -> RunResult │ │ +│ └─────────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +## File Structure + +``` +samples/ +├── _sample_validation/ +│ ├── __init__.py # Package exports +│ ├── README.md # This file +│ ├── models.py # Data classes +│ │ ├── SampleInfo # Discovered sample metadata +│ │ ├── RunResult # Execution result +│ │ └── Report # Final validation report +│ ├── discovery.py # Sample discovery +│ │ ├── discover_samples() # Finds all .py files +│ │ └── DiscoverSamplesExecutor +│ ├── report.py # Report generation +│ │ ├── generate_report() # Create Report from results +│ │ ├── save_report() # Write to markdown/JSON +│ │ ├── print_summary() # Console output +│ │ └── GenerateReportExecutor +│ ├── workflow_types.py # Shared dataclasses for workflow steps +│ ├── create_dynamic_workflow_executor.py # CreateConcurrentValidationWorkflowExecutor +│ ├── run_dynamic_validation_workflow_executor.py # RunDynamicValidationWorkflowExecutor +│ └── workflow.py # Workflow assembly entrypoint +├── __main__.py # CLI entry point +``` + +## Dependencies + +### Required + +- **agent-framework** - Core workflow and agent functionality +- **agent-framework-orchestrations** - ConcurrentBuilder for fan-out/fan-in +- **agent-framework-github-copilot** - GitHub Copilot agent integration + +### Optional + +- `GITHUB_COPILOT_MODEL` to override default Copilot model selection. + +## Environment Variables + +No required environment variables. Optional: + +| Variable | Description | Required | +| ------------------------ | --------------------------------- | -------- | +| `GITHUB_COPILOT_MODEL` | Copilot model override | No | +| `GITHUB_COPILOT_TIMEOUT` | Copilot request timeout (seconds) | No | + +## Usage + +### Basic Usage + +```bash +# Validate all samples +uv run python -m _sample_validation + +# Validate specific subdirectory +uv run python -m _sample_validation --subdir 03-workflows + +# Save reports to files +uv run python -m _sample_validation --save-report --output-dir ./reports +``` + +### Configuration Options + +```bash +uv run python -m _sample_validation [OPTIONS] + +Options: + --subdir TEXT Subdirectory to validate (relative to samples/) + --output-dir TEXT Report output directory (default: ./_sample_validation/reports) + --save-report Save reports to files +``` + +### Examples + +```bash +# Quick validation of a small directory +uv run python -m _sample_validation --subdir 03-workflows/_start-here + +# Save report artifacts +uv run python -m _sample_validation --save-report +``` + +## How It Works + +### 1. Discovery + +Walks the samples directory and finds all `.py` files that: + +- Don't start with `_` (excludes private files) +- Aren't in `__pycache__` directories +- Aren't in directories starting with `_` (excludes `_sample_validation`) + +### 2. Dynamic Workflow Creation + +Creates a nested `ConcurrentBuilder` workflow with one GitHub Copilot agent per sample. + +Each agent receives: + +- The sample path +- Full source code +- A strict JSON output schema for validation result + +### 3. Nested Workflow Execution + +Runs all per-sample agents in parallel. A custom aggregator converts each agent response into `RunResult`. + +### 4. Report Generation + +Produces: + +- **Console summary** - Pass/fail counts with emoji indicators +- **Markdown report** - Detailed results grouped by status +- **JSON report** - Machine-readable for CI integration + +## Report Status Codes + +| Status | Label | Description | +| ------- | --------- | ----------------------------------------- | +| SUCCESS | [PASS] | Sample ran to completion with exit code 0 | +| FAILURE | [FAIL] | Sample exited with non-zero code | +| TIMEOUT | [TIMEOUT] | Sample exceeded timeout limit | +| ERROR | [ERROR] | Exception during execution | + +## Troubleshooting + +### Agent output parsing errors + +If an agent returns non-JSON content, that sample is marked as `ERROR` with parser details in the report. + +### GitHub Copilot authentication or CLI issues + +Ensure GitHub Copilot is authenticated in your environment and the Copilot CLI is available. + +## Extending + +### Custom Per-Sample Validation Prompt + +Modify `agent_prompt()` in `create_dynamic_workflow_executor.py`. + +### Custom Report Formats + +Extend `Report.to_markdown()` or `Report.to_dict()` in `models.py`. diff --git a/python/samples/_sample_validation/__init__.py b/python/samples/_sample_validation/__init__.py new file mode 100644 index 0000000000..5658c7a987 --- /dev/null +++ b/python/samples/_sample_validation/__init__.py @@ -0,0 +1,25 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Sample Validation System + +A workflow-based system for validating Python samples by: +1. Discovering all sample files +2. Creating a dynamic nested concurrent workflow (one GitHub agent per sample) +3. Running the nested workflow +4. Generating a validation report + +Usage: + uv run python -m _sample_validation + uv run python -m _sample_validation --subdir getting_started +""" + +from _sample_validation.models import Report, RunResult, SampleInfo +from _sample_validation.workflow import create_validation_workflow + +__all__ = [ + "SampleInfo", + "RunResult", + "Report", + "create_validation_workflow", +] diff --git a/python/samples/_sample_validation/__main__.py b/python/samples/_sample_validation/__main__.py new file mode 100644 index 0000000000..f2f8b2b7e6 --- /dev/null +++ b/python/samples/_sample_validation/__main__.py @@ -0,0 +1,122 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Sample Validation Script + +Validates all Python samples in the samples directory using a workflow that: +1. Discovers all sample files +2. Builds a nested concurrent workflow with one GitHub agent per sample +3. Runs the nested workflow +4. Generates a validation report + +Usage: + uv run python -m _sample_validation + uv run python -m _sample_validation --subdir 03-workflows + uv run python -m _sample_validation --output-dir ./reports +""" + +import argparse +import asyncio +import os +import sys +from pathlib import Path + +# Add the samples directory to the path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from _sample_validation.models import Report +from _sample_validation.report import save_report +from _sample_validation.workflow import ValidationConfig, create_validation_workflow + + +def parse_arguments() -> argparse.Namespace: + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Validate Python samples using a dynamic nested concurrent workflow", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + uv run python -m _sample_validation # Validate all samples + uv run python -m _sample_validation --subdir 03-workflows # Validate only workflows + uv run python -m _sample_validation --output-dir ./reports # Save reports to custom dir + """, + ) + + parser.add_argument( + "--subdir", + type=str, + help="Validate samples only in the specified subdirectory (relative to samples/)", + ) + + parser.add_argument( + "--output-dir", + type=str, + default="./_sample_validation/reports", + help="Directory to save validation reports (default: ./_sample_validation/reports)", + ) + + parser.add_argument( + "--save-report", + action="store_true", + help="Save the validation report to files", + ) + + return parser.parse_args() + + +async def main() -> int: + """Main entry point.""" + args = parse_arguments() + + # Determine paths + samples_dir = Path(__file__).parent.parent + python_root = samples_dir.parent + + print("=" * 80) + print("SAMPLE VALIDATION WORKFLOW") + print("=" * 80) + print(f"Samples directory: {samples_dir}") + print(f"Python root: {python_root}") + + if os.environ.get("GITHUB_COPILOT_MODEL"): + print(f"Using GitHub Copilot model override: {os.environ['GITHUB_COPILOT_MODEL']}") + + # Create validation config + config = ValidationConfig( + samples_dir=samples_dir, + python_root=python_root, + subdir=args.subdir, + ) + + # Create and run the workflow + workflow = create_validation_workflow(config) + + print("\nStarting validation workflow...") + print("-" * 80) + + # Run the workflow + events = await workflow.run("start") + outputs = events.get_outputs() + + if not outputs: + print("\n[ERROR] Workflow did not produce any output") + return 1 + + report: Report = outputs[0] + + # Save report if requested + if args.save_report: + output_dir = samples_dir / args.output_dir + md_path, json_path = save_report(report, output_dir) + print("\nReports saved:") + print(f" Markdown: {md_path}") + print(f" JSON: {json_path}") + + # Return appropriate exit code + failed = report.failure_count + report.timeout_count + report.error_count + return 1 if failed > 0 else 0 + + +if __name__ == "__main__": + exit_code = asyncio.run(main()) + sys.exit(exit_code) diff --git a/python/samples/_sample_validation/create_dynamic_workflow_executor.py b/python/samples/_sample_validation/create_dynamic_workflow_executor.py new file mode 100644 index 0000000000..27f14f8e83 --- /dev/null +++ b/python/samples/_sample_validation/create_dynamic_workflow_executor.py @@ -0,0 +1,217 @@ +# Copyright (c) Microsoft. All rights reserved. + + +import logging + +from _sample_validation.discovery import DiscoveryResult +from _sample_validation.models import ( + ExecutionResult, + RunResult, + RunStatus, + SampleInfo, + ValidationConfig, + WorkflowCreationResult, +) +from agent_framework import ( + AgentExecutorRequest, + AgentExecutorResponse, + AgentResponse, + Executor, + Message, + WorkflowContext, + handler, +) +from agent_framework.github import GitHubCopilotAgent +from agent_framework.orchestrations import ConcurrentBuilder +from copilot import PermissionRequest, PermissionRequestResult +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + + +class AgentResponseFormat(BaseModel): + status: str + output: str + error: str + + +def agent_prompt(sample: SampleInfo) -> str: + """Build per-sample instructions for a GitHub Copilot validator agent.""" + return ( + "You are validating exactly one Python sample.\n" + f"Sample path: {sample.relative_path}\n" + "Analyze the code and execute it. Determine if it runs successfully, fails, or times out.\n" + "The sample can be interactive. If it is interactive, response to the sample when prompted " + "based on your analysis of the code. You do not need to consult human on what to respond\n" + "Return ONLY valid JSON with this schema:\n" + "{\n" + ' "status": "success|failure|timeout|error",\n' + ' "output": "short summary of the result and what you did if the sample was interactive",\n' + ' "error": "error details or empty string"\n' + "}\n\n" + ) + + +def parse_agent_json(text: str) -> AgentResponseFormat: + """Parse JSON object from an agent response.""" + stripped = text.strip() + if stripped.startswith("{") and stripped.endswith("}"): + return AgentResponseFormat.model_validate_json(stripped) + + start = stripped.find("{") + end = stripped.rfind("}") + if start == -1 or end == -1 or end <= start: + raise ValueError("No JSON object found in response") + + return AgentResponseFormat.model_validate_json(stripped[start : end + 1]) + + +def status_from_text(value: str) -> RunStatus: + """Convert a string value to RunStatus with safe fallback.""" + normalized = value.strip().lower() + for status in RunStatus: + if status.value == normalized: + return status + return RunStatus.ERROR + + +def prompt_permission(request: PermissionRequest, context: dict[str, str]) -> PermissionRequestResult: + """Permission handler that always approves.""" + kind = request.get("kind", "unknown") + logger.debug(f"[Permission Request: {kind}] ({context})Automatically approved for sample validation.") + return PermissionRequestResult(kind="approved") + + +class CustomAgentExecutor(Executor): + """Executor that runs a GitHub Copilot agent and returns its response. + + We need the custome executor to wrap the agent call in a try/except to ensure that any exceptions are caught and + returned as error responses, otherwise an exception in one agent could crash the entire workflow. + """ + + def __init__(self, agent: GitHubCopilotAgent): + super().__init__(id=agent.id) + self.agent = agent + + @handler + async def handle_request(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None: + """Execute the agent with the given request and return its response.""" + try: + response = await self.agent.run(request.messages) + await ctx.send_message(AgentExecutorResponse(executor_id=self.id, agent_response=response)) + except Exception as ex: + logger.error(f"Error executing agent {self.agent.id}: {ex}") + error_response = AgentExecutorResponse( + executor_id=self.id, + agent_response=AgentResponse( + messages=Message( + role="assistant", + text=AgentResponseFormat( + status="error", + output="", + error=str(ex), + ).model_dump_json(), + ) + ), + ) + await ctx.send_message(error_response) + + +class CreateConcurrentValidationWorkflowExecutor(Executor): + """Executor that builds a nested concurrent workflow with one agent per sample.""" + + def __init__(self, config: ValidationConfig): + super().__init__(id="create_dynamic_workflow") + self.config = config + + @handler + async def create( + self, + discovery: DiscoveryResult, + ctx: WorkflowContext[WorkflowCreationResult], + ) -> None: + """Create a nested concurrent workflow with N GitHub Copilot agents.""" + sample_count = len(discovery.samples) + print(f"\nCreating nested concurrent workflow with {sample_count} parallel GitHub agents...") + + if sample_count == 0: + await ctx.send_message(WorkflowCreationResult(samples=[], workflow=None, agents=[])) + return + + agents: list[GitHubCopilotAgent] = [] + sample_by_agent_id: dict[str, SampleInfo] = {} + + for index, sample in enumerate(discovery.samples, start=1): + agent_id = f"sample_validator_{index}" + agent = GitHubCopilotAgent( + id=agent_id, + name=agent_id, + instructions=agent_prompt(sample), + default_options={"on_permission_request": prompt_permission, "timeout": 180}, # type: ignore + ) + agents.append(agent) + sample_by_agent_id[agent_id] = sample + + async def aggregate_results(results: list[AgentExecutorResponse]) -> ExecutionResult: + run_results: list[RunResult] = [] + + for result in results: + executor_id = result.executor_id + sample = sample_by_agent_id.get(executor_id) + + if sample is None: + continue + + try: + result_payload = parse_agent_json(result.agent_response.text) + run_results.append( + RunResult( + sample=sample, + status=status_from_text(result_payload.status), + output=result_payload.output, + error=result_payload.error, + ) + ) + except Exception as ex: + run_results.append( + RunResult( + sample=sample, + status=RunStatus.ERROR, + output="", + error=( + f"Failed to parse agent output for {sample.relative_path}: {ex}. " + f"Raw: {result.agent_response.text}" # type: ignore + ), + ) + ) + + unresolved = [ + sample + for sample in discovery.samples + if sample.relative_path not in {r.sample.relative_path for r in run_results} + ] + for sample in unresolved: + run_results.append( + RunResult( + sample=sample, + status=RunStatus.ERROR, + output="", + error=f"No response from agent for sample {sample.relative_path}.", + ) + ) + + return ExecutionResult(results=run_results) + + nested_workflow = ( + ConcurrentBuilder(participants=[CustomAgentExecutor(agent) for agent in agents]) + .with_aggregator(aggregate_results) + .build() + ) + + await ctx.send_message( + WorkflowCreationResult( + samples=discovery.samples, + workflow=nested_workflow, + agents=agents, + ) + ) diff --git a/python/samples/_sample_validation/discovery.py b/python/samples/_sample_validation/discovery.py new file mode 100644 index 0000000000..c71db32425 --- /dev/null +++ b/python/samples/_sample_validation/discovery.py @@ -0,0 +1,116 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Sample discovery module.""" + +import ast +import os +from pathlib import Path + +from _sample_validation.models import DiscoveryResult, SampleInfo, ValidationConfig +from agent_framework import Executor, WorkflowContext, handler + + +def _is_main_entrypoint_guard(test: ast.expr) -> bool: + """Check whether an expression is ``__name__ == '__main__'``.""" + if not isinstance(test, ast.Compare): + return False + + if len(test.ops) != 1 or not isinstance(test.ops[0], ast.Eq): + return False + + if len(test.comparators) != 1: + return False + + left = test.left + right = test.comparators[0] + + return ( + isinstance(left, ast.Name) + and left.id == "__name__" + and isinstance(right, ast.Constant) + and right.value == "__main__" + ) or ( + isinstance(right, ast.Name) + and right.id == "__name__" + and isinstance(left, ast.Constant) + and left.value == "__main__" + ) + + +def _has_main_entrypoint_guard(path: Path) -> bool: + """Check whether a Python file defines a top-level main entrypoint guard.""" + try: + source = path.read_text(encoding="utf-8") + tree = ast.parse(source) + except Exception: + return False + + return any(isinstance(node, ast.If) and _is_main_entrypoint_guard(node.test) for node in tree.body) + + +def discover_samples(samples_dir: Path, subdir: str | None = None) -> list[SampleInfo]: + """ + Find all Python sample files in the samples directory. + + Args: + samples_dir: Root samples directory + subdir: Optional subdirectory to filter to + + Returns: + List of SampleInfo objects for each discovered sample + """ + # Determine the search directory + if subdir: + search_dir = samples_dir / subdir + if not search_dir.exists(): + print(f"Warning: Subdirectory '{subdir}' does not exist in {samples_dir}") + return [] + else: + search_dir = samples_dir + + python_files: list[Path] = [] + + # Walk through all subdirectories and find .py files + for root, dirs, files in os.walk(search_dir): + # Skip directories that start with _ (like _sample_validation) + dirs[:] = [d for d in dirs if not d.startswith("_") and d != "__pycache__"] + + for file in files: + # Skip files that start with _ and include only scripts with a main entrypoint guard + if file.endswith(".py") and not file.startswith("_"): + file_path = Path(root) / file + if _has_main_entrypoint_guard(file_path): + python_files.append(file_path) + + # Sort files for consistent execution order + python_files = sorted(python_files) + + # Convert to SampleInfo objects + samples: list[SampleInfo] = [] + for path in python_files: + try: + samples.append(SampleInfo.from_path(path, samples_dir)) + except Exception as e: + print(f"Warning: Could not read {path}: {e}") + + return samples + + +class DiscoverSamplesExecutor(Executor): + """Executor that discovers all samples in the samples directory.""" + + def __init__(self, config: ValidationConfig): + super().__init__(id="discover_samples") + self.config = config + + @handler + async def discover(self, _: str, ctx: WorkflowContext[DiscoveryResult]) -> None: + """Discover all Python samples.""" + print(f"🔍 Discovering samples in {self.config.samples_dir}") + if self.config.subdir: + print(f" Filtering to subdirectory: {self.config.subdir}") + + samples = discover_samples(self.config.samples_dir, self.config.subdir) + print(f" Found {len(samples)} samples") + + await ctx.send_message(DiscoveryResult(samples=samples)) diff --git a/python/samples/_sample_validation/models.py b/python/samples/_sample_validation/models.py new file mode 100644 index 0000000000..8e9eb1e9fb --- /dev/null +++ b/python/samples/_sample_validation/models.py @@ -0,0 +1,162 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Data models for the sample validation system.""" + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from pathlib import Path + +from agent_framework import Workflow +from agent_framework.github import GitHubCopilotAgent + + +@dataclass +class ValidationConfig: + """Configuration for the validation workflow.""" + + samples_dir: Path + python_root: Path + subdir: str | None = None + + +@dataclass +class SampleInfo: + """Information about a discovered sample file.""" + + path: Path + relative_path: str + code: str + + @classmethod + def from_path(cls, path: Path, samples_dir: Path) -> "SampleInfo": + """Create SampleInfo from a file path.""" + return cls( + path=path, + relative_path=str(path.relative_to(samples_dir)), + code=path.read_text(encoding="utf-8"), + ) + + +@dataclass +class DiscoveryResult: + """Result of sample discovery.""" + + samples: list[SampleInfo] + + +@dataclass +class WorkflowCreationResult: + """Result of creating a nested per-sample concurrent workflow.""" + + samples: list[SampleInfo] + workflow: Workflow | None + agents: list[GitHubCopilotAgent] + + +class RunStatus(Enum): + """Status of a sample run.""" + + SUCCESS = "success" + FAILURE = "failure" + TIMEOUT = "timeout" + ERROR = "error" + + +@dataclass +class RunResult: + """Result of running a single sample.""" + + sample: SampleInfo + status: RunStatus + output: str + error: str + + +@dataclass +class ExecutionResult: + """Result of sample execution.""" + + results: list[RunResult] + + +@dataclass +class Report: + """Final validation report.""" + + timestamp: datetime + total_samples: int + success_count: int + failure_count: int + timeout_count: int + error_count: int + results: list[RunResult] = field(default_factory=list) # type: ignore + + def to_markdown(self) -> str: + """Generate a markdown report.""" + lines = [ + "# Sample Validation Report", + "", + f"**Generated:** {self.timestamp.isoformat()}", + "", + "## Summary", + "", + "| Metric | Count |", + "|--------|-------|", + f"| Total Samples | {self.total_samples} |", + f"| [PASS] Success | {self.success_count} |", + f"| [FAIL] Failure | {self.failure_count} |", + f"| [TIMEOUT] Timeout | {self.timeout_count} |", + f"| [ERROR] Error | {self.error_count} |", + "", + "## Detailed Results", + "", + ] + + # Group by status + for status in [RunStatus.FAILURE, RunStatus.TIMEOUT, RunStatus.ERROR, RunStatus.SUCCESS]: + status_results = [r for r in self.results if r.status == status] + if not status_results: + continue + + status_label = { + RunStatus.SUCCESS: "[PASS]", + RunStatus.FAILURE: "[FAIL]", + RunStatus.TIMEOUT: "[TIMEOUT]", + RunStatus.ERROR: "[ERROR]", + } + + lines.append(f"### {status_label[status]} {status.value.title()} ({len(status_results)})") + lines.append("") + + for result in status_results: + lines.append(f"- **{result.sample.relative_path}**") + if result.error: + # Truncate long errors + error_preview = result.error[:200] + "..." if len(result.error) > 200 else result.error + lines.append(f" - Error: `{error_preview}`") + lines.append("") + + return "\n".join(lines) + + def to_dict(self) -> dict[str, object]: + """Convert report to dictionary for JSON serialization.""" + return { + "timestamp": self.timestamp.isoformat(), + "summary": { + "total_samples": self.total_samples, + "success_count": self.success_count, + "failure_count": self.failure_count, + "timeout_count": self.timeout_count, + "error_count": self.error_count, + }, + "results": [ + { + "path": r.sample.relative_path, + "status": r.status.value, + "output": r.output, + "error": r.error, + } + for r in self.results + ], + } diff --git a/python/samples/_sample_validation/report.py b/python/samples/_sample_validation/report.py new file mode 100644 index 0000000000..e6af5f46c9 --- /dev/null +++ b/python/samples/_sample_validation/report.py @@ -0,0 +1,100 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Report generation for sample validation results.""" + +import json +from datetime import datetime +from pathlib import Path + +from _sample_validation.models import ExecutionResult, Report, RunResult, RunStatus +from agent_framework import Executor, WorkflowContext, handler +from typing_extensions import Never + + +def generate_report(results: list[RunResult]) -> Report: + """ + Generate a validation report from run results. + + Args: + results: List of RunResult objects from sample execution + + Returns: + Report object with aggregated statistics + """ + + return Report( + timestamp=datetime.now(), + total_samples=len(results), + success_count=sum(1 for r in results if r.status == RunStatus.SUCCESS), + failure_count=sum(1 for r in results if r.status == RunStatus.FAILURE), + timeout_count=sum(1 for r in results if r.status == RunStatus.TIMEOUT), + error_count=sum(1 for r in results if r.status == RunStatus.ERROR), + results=results, + ) + + +def save_report(report: Report, output_dir: Path) -> tuple[Path, Path]: + """ + Save the report to markdown and JSON files. + + Args: + report: The report to save + output_dir: Directory to save the report files + + Returns: + Tuple of (markdown_path, json_path) + """ + output_dir.mkdir(parents=True, exist_ok=True) + + timestamp_str = report.timestamp.strftime("%Y%m%d_%H%M%S") + + # Save markdown + md_path = output_dir / f"validation_report_{timestamp_str}.md" + md_path.write_text(report.to_markdown(), encoding="utf-8") + + # Save JSON + json_path = output_dir / f"validation_report_{timestamp_str}.json" + json_path.write_text( + json.dumps(report.to_dict(), indent=2), + encoding="utf-8", + ) + + return md_path, json_path + + +def print_summary(report: Report) -> None: + """Print a summary of the validation report to console.""" + print("\n" + "=" * 80) + print("SAMPLE VALIDATION SUMMARY") + print("=" * 80) + + if report.failure_count == 0 and report.timeout_count == 0 and report.error_count == 0: + print("[PASS] ALL SAMPLES PASSED!") + else: + print("[FAIL] SOME SAMPLES FAILED") + + print(f"\nTotal samples: {report.total_samples}") + print() + print("Results:") + print(f" [PASS] Success: {report.success_count}") + print(f" [FAIL] Failure: {report.failure_count}") + print(f" [TIMEOUT] Timeout: {report.timeout_count}") + print(f" [ERROR] Error: {report.error_count}") + print("=" * 80) + + +class GenerateReportExecutor(Executor): + """Executor that generates the final validation report.""" + + def __init__(self) -> None: + super().__init__(id="generate_report") + + @handler + async def generate(self, execution: ExecutionResult, ctx: WorkflowContext[Never, Report]) -> None: + """Generate the validation report from fan-in results.""" + print("\nGenerating report...") + + report = generate_report(execution.results) + print_summary(report) + + await ctx.yield_output(report) diff --git a/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py b/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py new file mode 100644 index 0000000000..fdad1d585b --- /dev/null +++ b/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py @@ -0,0 +1,59 @@ +# Copyright (c) Microsoft. All rights reserved. + +from typing import Sequence + +from _sample_validation.models import ( + ExecutionResult, + RunResult, + RunStatus, + WorkflowCreationResult, +) +from agent_framework import Executor, WorkflowContext, handler +from agent_framework_github_copilot import GitHubCopilotAgent + + +async def stop_agents(agents: Sequence[GitHubCopilotAgent]) -> None: + """Stop all GitHub Copilot agents used by the nested workflow.""" + for agent in agents: + try: + await agent.stop() + except Exception: + continue + + +class RunDynamicValidationWorkflowExecutor(Executor): + """Executor that runs the nested workflow created in the previous step.""" + + def __init__(self) -> None: + super().__init__(id="run_dynamic_workflow") + + @handler + async def run(self, creation: WorkflowCreationResult, ctx: WorkflowContext[ExecutionResult]) -> None: + """Run the nested workflow and emit execution results.""" + if creation.workflow is None: + await ctx.send_message(ExecutionResult(results=[])) + return + + print("\nRunning nested concurrent workflow...") + prompt = ( + "Validate your assigned sample using your instructions. Return only the JSON object in the required schema." + ) + + try: + events = await creation.workflow.run(prompt) + outputs = events.get_outputs() + if outputs and isinstance(outputs[0], ExecutionResult): + await ctx.send_message(outputs[0]) + else: + fallback_results = [ + RunResult( + sample=sample, + status=RunStatus.ERROR, + output="", + error="Nested workflow did not return an ExecutionResult.", + ) + for sample in creation.samples + ] + await ctx.send_message(ExecutionResult(results=fallback_results)) + finally: + await stop_agents(creation.agents) diff --git a/python/samples/_sample_validation/workflow.py b/python/samples/_sample_validation/workflow.py new file mode 100644 index 0000000000..51cbd3d410 --- /dev/null +++ b/python/samples/_sample_validation/workflow.py @@ -0,0 +1,42 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Sample Validation Workflow using Microsoft Agent Framework. + +Workflow composition for sample validation. +""" + +from _sample_validation.create_dynamic_workflow_executor import CreateConcurrentValidationWorkflowExecutor +from _sample_validation.discovery import DiscoverSamplesExecutor, ValidationConfig +from _sample_validation.report import GenerateReportExecutor +from _sample_validation.run_dynamic_validation_workflow_executor import RunDynamicValidationWorkflowExecutor +from agent_framework import Workflow, WorkflowBuilder + + +def create_validation_workflow( + config: ValidationConfig, +) -> Workflow: + """ + Create the sample validation workflow. + + Args: + config: Validation configuration + + Returns: + Configured Workflow instance + """ + discover = DiscoverSamplesExecutor(config) + create_dynamic_workflow = CreateConcurrentValidationWorkflowExecutor(config) + run_dynamic_workflow = RunDynamicValidationWorkflowExecutor() + generate = GenerateReportExecutor() + + return ( + WorkflowBuilder(start_executor=discover) + .add_edge(discover, create_dynamic_workflow) + .add_edge(create_dynamic_workflow, run_dynamic_workflow) + .add_edge(run_dynamic_workflow, generate) + .build() + ) + + +__all__ = ["ValidationConfig", "create_validation_workflow"] From 7ed67c3b63da24b30590a3824d14d0ef42ffd8ca Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Fri, 20 Feb 2026 22:58:46 -0800 Subject: [PATCH 2/5] Automate sample validation: part 2 --- python/samples/01-get-started/04_memory.py | 3 + python/samples/_sample_validation/README.md | 42 ++-- python/samples/_sample_validation/__main__.py | 17 +- python/samples/_sample_validation/const.py | 3 + .../create_dynamic_workflow_executor.py | 228 ++++++++++-------- python/samples/_sample_validation/models.py | 1 + ...un_dynamic_validation_workflow_executor.py | 35 +-- 7 files changed, 195 insertions(+), 134 deletions(-) create mode 100644 python/samples/_sample_validation/const.py diff --git a/python/samples/01-get-started/04_memory.py b/python/samples/01-get-started/04_memory.py index 774ccf927d..c554be7337 100644 --- a/python/samples/01-get-started/04_memory.py +++ b/python/samples/01-get-started/04_memory.py @@ -31,6 +31,9 @@ class UserMemoryProvider(BaseContextProvider): DEFAULT_SOURCE_ID = "user_memory" + def __init__(self): + super().__init__(self.DEFAULT_SOURCE_ID) + async def before_run( self, *, diff --git a/python/samples/_sample_validation/README.md b/python/samples/_sample_validation/README.md index 04e8c16fad..c25058c850 100644 --- a/python/samples/_sample_validation/README.md +++ b/python/samples/_sample_validation/README.md @@ -1,6 +1,6 @@ # Sample Validation System -An AI-powered workflow system for validating Python samples by discovering them, dynamically creating a nested concurrent workflow, and producing a report. +An AI-powered workflow system for validating Python samples by discovering them, creating a nested batched workflow, and producing a report. ## Architecture @@ -14,12 +14,12 @@ An AI-powered workflow system for validating Python samples by discovering them, ▼ ▼ ▼ ┌───────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Discover │ ──► │ Create Dynamic │ ──► │ Run Nested │ -│ Samples │ │ Concurrent Flow │ │ Workflow │ +│ Samples │ │ Batched Flow │ │ Workflow │ └───────────────┘ └─────────────────┘ └─────────────────┘ │ │ │ ▼ ▼ ▼ List[SampleInfo] WorkflowCreationResult ExecutionResult - (N GitHub agents) │ + (workers + coordinator) │ ▼ ┌─────────────────┐ │ Generate Report │ @@ -33,15 +33,15 @@ An AI-powered workflow system for validating Python samples by discovering them, ``` ┌─────────────────────────────────────────────────────────────────────┐ -│ Nested Concurrent Workflow (dynamic) │ +│ Nested Batched Workflow (coordinator + workers) │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ -│ │ ConcurrentBuilder(participants=[agent_1 ... agent_N]) │ │ -│ │ - N equals number of discovered samples │ │ -│ │ - Each agent validates one assigned sample │ │ -│ │ - Agents run in parallel using GitHub Copilot │ │ -│ │ - Custom aggregator converts agent JSON -> RunResult │ │ +│ │ WorkflowBuilder + fan-out/fan-in edges │ │ +│ │ - Coordinator dispatches tasks in bounded batches │ │ +│ │ - Worker executors run GitHub Copilot agents │ │ +│ │ - Collector aggregates per-sample RunResult messages │ │ +│ │ - Max in-flight workers set by --max-parallel-workers │ │ │ └─────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ``` @@ -65,11 +65,10 @@ samples/ │ │ ├── save_report() # Write to markdown/JSON │ │ ├── print_summary() # Console output │ │ └── GenerateReportExecutor -│ ├── workflow_types.py # Shared dataclasses for workflow steps -│ ├── create_dynamic_workflow_executor.py # CreateConcurrentValidationWorkflowExecutor +│ ├── create_dynamic_workflow_executor.py # Coordinator, workers, collector, CreateConcurrentValidationWorkflowExecutor │ ├── run_dynamic_validation_workflow_executor.py # RunDynamicValidationWorkflowExecutor │ └── workflow.py # Workflow assembly entrypoint -├── __main__.py # CLI entry point +├── __main__.py # CLI entry point ``` ## Dependencies @@ -77,7 +76,6 @@ samples/ ### Required - **agent-framework** - Core workflow and agent functionality -- **agent-framework-orchestrations** - ConcurrentBuilder for fan-out/fan-in - **agent-framework-github-copilot** - GitHub Copilot agent integration ### Optional @@ -116,6 +114,7 @@ uv run python -m _sample_validation [OPTIONS] Options: --subdir TEXT Subdirectory to validate (relative to samples/) --output-dir TEXT Report output directory (default: ./_sample_validation/reports) + --max-parallel-workers INT Max in-flight workers per batch (default: 10) --save-report Save reports to files ``` @@ -125,6 +124,9 @@ Options: # Quick validation of a small directory uv run python -m _sample_validation --subdir 03-workflows/_start-here +# Limit parallel workers for large sample sets +uv run python -m _sample_validation --subdir 02-agents --max-parallel-workers 8 + # Save report artifacts uv run python -m _sample_validation --save-report ``` @@ -141,17 +143,17 @@ Walks the samples directory and finds all `.py` files that: ### 2. Dynamic Workflow Creation -Creates a nested `ConcurrentBuilder` workflow with one GitHub Copilot agent per sample. - -Each agent receives: +Creates a nested workflow with: -- The sample path -- Full source code -- A strict JSON output schema for validation result +- A coordinator executor +- One worker executor per discovered sample +- A collector executor ### 3. Nested Workflow Execution -Runs all per-sample agents in parallel. A custom aggregator converts each agent response into `RunResult`. +The coordinator sends initial work to the first `max_parallel_workers` workers. As each worker finishes, it notifies +the coordinator, which dispatches the next queued sample. Workers also send result items to the collector, which emits +the final `ExecutionResult` once all samples are processed. ### 4. Report Generation diff --git a/python/samples/_sample_validation/__main__.py b/python/samples/_sample_validation/__main__.py index f2f8b2b7e6..15c36bc324 100644 --- a/python/samples/_sample_validation/__main__.py +++ b/python/samples/_sample_validation/__main__.py @@ -19,6 +19,7 @@ import asyncio import os import sys +import time from pathlib import Path # Add the samples directory to the path for imports @@ -61,6 +62,13 @@ def parse_arguments() -> argparse.Namespace: help="Save the validation report to files", ) + parser.add_argument( + "--max-parallel-workers", + type=int, + default=10, + help="Maximum number of samples to run in parallel per batch (default: 10)", + ) + return parser.parse_args() @@ -86,6 +94,7 @@ async def main() -> int: samples_dir=samples_dir, python_root=python_root, subdir=args.subdir, + max_parallel_workers=max(1, args.max_parallel_workers), ) # Create and run the workflow @@ -95,7 +104,13 @@ async def main() -> int: print("-" * 80) # Run the workflow - events = await workflow.run("start") + run_start = time.perf_counter() + try: + events = await workflow.run("start") + finally: + run_duration = time.perf_counter() - run_start + print(f"\nWorkflow run completed in {run_duration:.2f}s") + outputs = events.get_outputs() if not outputs: diff --git a/python/samples/_sample_validation/const.py b/python/samples/_sample_validation/const.py new file mode 100644 index 0000000000..1ae0d4b38d --- /dev/null +++ b/python/samples/_sample_validation/const.py @@ -0,0 +1,3 @@ +# Copyright (c) Microsoft. All rights reserved. + +WORKER_COMPLETED = "worker_completed" diff --git a/python/samples/_sample_validation/create_dynamic_workflow_executor.py b/python/samples/_sample_validation/create_dynamic_workflow_executor.py index 27f14f8e83..db01f6b5b0 100644 --- a/python/samples/_sample_validation/create_dynamic_workflow_executor.py +++ b/python/samples/_sample_validation/create_dynamic_workflow_executor.py @@ -1,8 +1,10 @@ # Copyright (c) Microsoft. All rights reserved. - import logging +from collections import deque +from dataclasses import dataclass +from _sample_validation.const import WORKER_COMPLETED from _sample_validation.discovery import DiscoveryResult from _sample_validation.models import ( ExecutionResult, @@ -13,18 +15,18 @@ WorkflowCreationResult, ) from agent_framework import ( - AgentExecutorRequest, - AgentExecutorResponse, - AgentResponse, Executor, Message, + Workflow, + WorkflowBuilder, WorkflowContext, + WorkflowEvent, handler, ) from agent_framework.github import GitHubCopilotAgent -from agent_framework.orchestrations import ConcurrentBuilder from copilot import PermissionRequest, PermissionRequestResult from pydantic import BaseModel +from typing_extensions import Never logger = logging.getLogger(__name__) @@ -35,21 +37,32 @@ class AgentResponseFormat(BaseModel): error: str -def agent_prompt(sample: SampleInfo) -> str: - """Build per-sample instructions for a GitHub Copilot validator agent.""" - return ( - "You are validating exactly one Python sample.\n" - f"Sample path: {sample.relative_path}\n" - "Analyze the code and execute it. Determine if it runs successfully, fails, or times out.\n" - "The sample can be interactive. If it is interactive, response to the sample when prompted " - "based on your analysis of the code. You do not need to consult human on what to respond\n" - "Return ONLY valid JSON with this schema:\n" - "{\n" - ' "status": "success|failure|timeout|error",\n' - ' "output": "short summary of the result and what you did if the sample was interactive",\n' - ' "error": "error details or empty string"\n' - "}\n\n" - ) +@dataclass +class CoordinatorStart: + samples: list[SampleInfo] + + +@dataclass +class WorkerFreed: + worker_id: str + + +class BatchCompletion: + pass + + +AgentInstruction = ( + "You are validating exactly one Python sample.\n" + "Analyze the sample code and execute it. Determine if it runs successfully, fails, or times out.\n" + "The sample can be interactive. If it is interactive, response to the sample when prompted " + "based on your analysis of the code. You do not need to consult human on what to respond\n" + "Return ONLY valid JSON with this schema:\n" + "{\n" + ' "status": "success|failure|timeout|error",\n' + ' "output": "short summary of the result and what you did if the sample was interactive",\n' + ' "error": "error details or empty string"\n' + "}\n\n" +) def parse_agent_json(text: str) -> AgentResponseFormat: @@ -94,27 +107,87 @@ def __init__(self, agent: GitHubCopilotAgent): self.agent = agent @handler - async def handle_request(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None: - """Execute the agent with the given request and return its response.""" + async def handle_task(self, sample: SampleInfo, ctx: WorkflowContext[WorkerFreed | RunResult]) -> None: + """Execute one sample task and notify collector + coordinator.""" try: - response = await self.agent.run(request.messages) - await ctx.send_message(AgentExecutorResponse(executor_id=self.id, agent_response=response)) + response = await self.agent.run([ + Message(role="user", text=f"Validate the following sample:\n\n{sample.relative_path}") + ]) + result_payload = parse_agent_json(response.text) + result = RunResult( + sample=sample, + status=status_from_text(result_payload.status), + output=result_payload.output, + error=result_payload.error, + ) except Exception as ex: logger.error(f"Error executing agent {self.agent.id}: {ex}") - error_response = AgentExecutorResponse( - executor_id=self.id, - agent_response=AgentResponse( - messages=Message( - role="assistant", - text=AgentResponseFormat( - status="error", - output="", - error=str(ex), - ).model_dump_json(), - ) - ), + result = RunResult( + sample=sample, + status=RunStatus.ERROR, + output="", + error=str(ex), ) - await ctx.send_message(error_response) + + await ctx.send_message(result, target_id="collector") + await ctx.send_message(WorkerFreed(worker_id=self.id), target_id="coordinator") + + await ctx.add_event(WorkflowEvent(WORKER_COMPLETED, sample)) # type: ignore + + +class BatchCoordinatorExecutor(Executor): + """Dispatch sample tasks to worker executors in bounded batches.""" + + def __init__(self, worker_ids: list[str], max_parallel_workers: int) -> None: + super().__init__(id="coordinator") + self._worker_ids = worker_ids + self._max_parallel_workers = max(1, max_parallel_workers) + self._pending: deque[SampleInfo] = deque() + self._inflight: set[str] = set() + + async def _assign_next(self, worker_id: str, ctx: WorkflowContext[SampleInfo | BatchCompletion]) -> None: + if not self._pending: + await ctx.send_message(BatchCompletion(), target_id="collector") + return + + sample = self._pending.popleft() + self._inflight.add(worker_id) + # Messages will get queued in the runner until the next superstep whe all workers are freed, + # thus achieving automatic batching without needing complex synchronization logic + await ctx.send_message(sample, target_id=worker_id) + + @handler + async def on_start(self, start: CoordinatorStart, ctx: WorkflowContext[SampleInfo | BatchCompletion]) -> None: + """Initialize queue and dispatch first wave of tasks.""" + self._pending = deque(start.samples) + self._inflight.clear() + + for worker_id in self._worker_ids[: self._max_parallel_workers]: + await self._assign_next(worker_id, ctx) + + @handler + async def on_worker_freed(self, freed: WorkerFreed, ctx: WorkflowContext[SampleInfo | BatchCompletion]) -> None: + """Dispatch next queued sample when a worker finishes.""" + self._inflight.discard(freed.worker_id) + await self._assign_next(freed.worker_id, ctx) + + +class CollectorExecutor(Executor): + """Collect per-sample results and emit the final execution result.""" + + def __init__(self) -> None: + super().__init__(id="collector") + self._results: list[RunResult] = [] + + @handler + async def on_all(self, batch_completion: BatchCompletion, ctx: WorkflowContext[Never, ExecutionResult]) -> None: + """Receive all results at once and emit final output.""" + await ctx.yield_output(ExecutionResult(results=self._results)) + + @handler + async def on_item(self, item: RunResult, ctx: WorkflowContext) -> None: + """Record a result and emit output when all expected results arrive.""" + self._results.append(item) class CreateConcurrentValidationWorkflowExecutor(Executor): @@ -130,83 +203,42 @@ async def create( discovery: DiscoveryResult, ctx: WorkflowContext[WorkflowCreationResult], ) -> None: - """Create a nested concurrent workflow with N GitHub Copilot agents.""" + """Create a nested workflow with a coordinator + worker fan-out/fan-in.""" sample_count = len(discovery.samples) - print(f"\nCreating nested concurrent workflow with {sample_count} parallel GitHub agents...") + print(f"\nCreating nested batched workflow for {sample_count} samples...") if sample_count == 0: await ctx.send_message(WorkflowCreationResult(samples=[], workflow=None, agents=[])) return agents: list[GitHubCopilotAgent] = [] - sample_by_agent_id: dict[str, SampleInfo] = {} + workers: list[CustomAgentExecutor] = [] for index, sample in enumerate(discovery.samples, start=1): - agent_id = f"sample_validator_{index}" + agent_id = f"sample_validator_{index}({sample.relative_path})" agent = GitHubCopilotAgent( id=agent_id, name=agent_id, - instructions=agent_prompt(sample), + instructions=AgentInstruction, default_options={"on_permission_request": prompt_permission, "timeout": 180}, # type: ignore ) agents.append(agent) - sample_by_agent_id[agent_id] = sample - - async def aggregate_results(results: list[AgentExecutorResponse]) -> ExecutionResult: - run_results: list[RunResult] = [] - - for result in results: - executor_id = result.executor_id - sample = sample_by_agent_id.get(executor_id) - - if sample is None: - continue - - try: - result_payload = parse_agent_json(result.agent_response.text) - run_results.append( - RunResult( - sample=sample, - status=status_from_text(result_payload.status), - output=result_payload.output, - error=result_payload.error, - ) - ) - except Exception as ex: - run_results.append( - RunResult( - sample=sample, - status=RunStatus.ERROR, - output="", - error=( - f"Failed to parse agent output for {sample.relative_path}: {ex}. " - f"Raw: {result.agent_response.text}" # type: ignore - ), - ) - ) - - unresolved = [ - sample - for sample in discovery.samples - if sample.relative_path not in {r.sample.relative_path for r in run_results} - ] - for sample in unresolved: - run_results.append( - RunResult( - sample=sample, - status=RunStatus.ERROR, - output="", - error=f"No response from agent for sample {sample.relative_path}.", - ) - ) - - return ExecutionResult(results=run_results) - - nested_workflow = ( - ConcurrentBuilder(participants=[CustomAgentExecutor(agent) for agent in agents]) - .with_aggregator(aggregate_results) - .build() + + workers.append(CustomAgentExecutor(agent)) + + coordinator = BatchCoordinatorExecutor( + worker_ids=[worker.id for worker in workers], + max_parallel_workers=self.config.max_parallel_workers, ) + collector = CollectorExecutor() + + nested_builder = WorkflowBuilder(start_executor=coordinator, output_executors=[collector]) + nested_builder.add_edge(coordinator, collector) + for worker in workers: + nested_builder.add_edge(coordinator, worker) + nested_builder.add_edge(worker, coordinator) + nested_builder.add_edge(worker, collector) + nested_workflow: Workflow = nested_builder.build() await ctx.send_message( WorkflowCreationResult( diff --git a/python/samples/_sample_validation/models.py b/python/samples/_sample_validation/models.py index 8e9eb1e9fb..ca9f26adab 100644 --- a/python/samples/_sample_validation/models.py +++ b/python/samples/_sample_validation/models.py @@ -18,6 +18,7 @@ class ValidationConfig: samples_dir: Path python_root: Path subdir: str | None = None + max_parallel_workers: int = 10 @dataclass diff --git a/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py b/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py index fdad1d585b..b9ec558487 100644 --- a/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py +++ b/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py @@ -2,14 +2,11 @@ from typing import Sequence -from _sample_validation.models import ( - ExecutionResult, - RunResult, - RunStatus, - WorkflowCreationResult, -) +from _sample_validation.const import WORKER_COMPLETED +from _sample_validation.create_dynamic_workflow_executor import CoordinatorStart +from _sample_validation.models import ExecutionResult, RunResult, RunStatus, SampleInfo, WorkflowCreationResult from agent_framework import Executor, WorkflowContext, handler -from agent_framework_github_copilot import GitHubCopilotAgent +from agent_framework.github import GitHubCopilotAgent async def stop_agents(agents: Sequence[GitHubCopilotAgent]) -> None: @@ -34,16 +31,24 @@ async def run(self, creation: WorkflowCreationResult, ctx: WorkflowContext[Execu await ctx.send_message(ExecutionResult(results=[])) return - print("\nRunning nested concurrent workflow...") - prompt = ( - "Validate your assigned sample using your instructions. Return only the JSON object in the required schema." - ) + print("\nRunning nested batched workflow...") + print("-" * 80) try: - events = await creation.workflow.run(prompt) - outputs = events.get_outputs() - if outputs and isinstance(outputs[0], ExecutionResult): - await ctx.send_message(outputs[0]) + remaining_sample_counts = len(creation.samples) + result: ExecutionResult | None = None + async for event in creation.workflow.run(CoordinatorStart(samples=creation.samples), stream=True): + if event.type == "output" and isinstance(event.data, ExecutionResult): + result = event.data # type: ignore + elif event.type == WORKER_COMPLETED and isinstance(event.data, SampleInfo): # type: ignore + remaining_sample_counts -= 1 + print( + f"Completed validation for sample: {event.data.relative_path:<80} | " + f"Remaining: {remaining_sample_counts:>4}" + ) + + if result is not None: + await ctx.send_message(result) else: fallback_results = [ RunResult( From 67675a1c27c7745a4eed192561299023d850c62a Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 23 Feb 2026 12:01:58 -0800 Subject: [PATCH 3/5] Create GH workflow --- .../workflows/python-sample-validation.yml | 304 ++++++++++++++++++ python/samples/_run_all_samples.py | 304 ------------------ python/samples/_sample_validation/__main__.py | 8 +- python/samples/_sample_validation/report.py | 13 +- 4 files changed, 320 insertions(+), 309 deletions(-) create mode 100644 .github/workflows/python-sample-validation.yml delete mode 100644 python/samples/_run_all_samples.py diff --git a/.github/workflows/python-sample-validation.yml b/.github/workflows/python-sample-validation.yml new file mode 100644 index 0000000000..ba43394483 --- /dev/null +++ b/.github/workflows/python-sample-validation.yml @@ -0,0 +1,304 @@ +name: Python - Sample Validation + +on: + workflow_dispatch: + schedule: + - cron: "0 0 * * *" # Run at midnight UTC daily + +env: + # Configure a constant location for the uv cache + UV_CACHE_DIR: /tmp/.uv-cache + +jobs: + validate-01-get-started: + name: Validate 01-get-started + runs-on: ubuntu-latest + permissions: + contents: read + env: + # Azure AI configuration for get-started samples + AZURE_AI_PROJECT_ENDPOINT: ${{ secrets.AZURE_AI_PROJECT_ENDPOINT }} + AZURE_AI_MODEL_DEPLOYMENT_NAME: ${{ secrets.AZURE_AI_MODEL_DEPLOYMENT_NAME }} + # GitHub Copilot configuration + GITHUB_COPILOT_MODEL: ${{ vars.GITHUB_COPILOT_MODEL }} + defaults: + run: + working-directory: python + steps: + - uses: actions/checkout@v6 + + - name: Set up python and install the project + uses: ./.github/actions/python-setup + with: + python-version: "3.12" + os: ${{ runner.os }} + env: + UV_CACHE_DIR: /tmp/.uv-cache + + - name: Run sample validation + run: | + cd samples && uv run python -m _sample_validation --subdir 01-get-started --save-report --report-name 01-get-started + + - name: Upload validation report + uses: actions/upload-artifact@v4 + if: always() + with: + name: validation-report-01-get-started + path: python/samples/_sample_validation/reports/ + + validate-02-agents: + name: Validate 02-agents + runs-on: ubuntu-latest + permissions: + contents: read + env: + # Azure AI configuration + AZURE_AI_PROJECT_ENDPOINT: ${{ secrets.AZURE_AI_PROJECT_ENDPOINT }} + AZURE_AI_MODEL_DEPLOYMENT_NAME: ${{ secrets.AZURE_AI_MODEL_DEPLOYMENT_NAME }} + # Azure OpenAI configuration + AZURE_OPENAI_ENDPOINT: ${{ secrets.AZURE_OPENAI_ENDPOINT }} + AZURE_OPENAI_CHAT_DEPLOYMENT_NAME: ${{ secrets.AZURE_OPENAI_CHAT_DEPLOYMENT_NAME }} + AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME: ${{ secrets.AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME }} + # OpenAI configuration + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + OPENAI_CHAT_MODEL_ID: ${{ vars.OPENAI_CHAT_MODEL_ID }} + OPENAI_RESPONSES_MODEL_ID: ${{ vars.OPENAI_RESPONSES_MODEL_ID }} + # Observability + ENABLE_INSTRUMENTATION: "true" + # GitHub Copilot configuration + GITHUB_COPILOT_MODEL: ${{ vars.GITHUB_COPILOT_MODEL }} + defaults: + run: + working-directory: python + steps: + - uses: actions/checkout@v6 + + - name: Set up python and install the project + uses: ./.github/actions/python-setup + with: + python-version: "3.12" + os: ${{ runner.os }} + env: + UV_CACHE_DIR: /tmp/.uv-cache + + - name: Run sample validation + run: | + cd samples && uv run python -m _sample_validation --subdir 02-agents --save-report --report-name 02-agents + + - name: Upload validation report + uses: actions/upload-artifact@v4 + if: always() + with: + name: validation-report-02-agents + path: python/samples/_sample_validation/reports/ + + validate-03-workflows: + name: Validate 03-workflows + runs-on: ubuntu-latest + permissions: + contents: read + env: + # Azure AI configuration + AZURE_AI_PROJECT_ENDPOINT: ${{ secrets.AZURE_AI_PROJECT_ENDPOINT }} + AZURE_AI_MODEL_DEPLOYMENT_NAME: ${{ secrets.AZURE_AI_MODEL_DEPLOYMENT_NAME }} + # Azure OpenAI configuration + AZURE_OPENAI_ENDPOINT: ${{ secrets.AZURE_OPENAI_ENDPOINT }} + AZURE_OPENAI_CHAT_DEPLOYMENT_NAME: ${{ secrets.AZURE_OPENAI_CHAT_DEPLOYMENT_NAME }} + AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME: ${{ secrets.AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME }} + # GitHub Copilot configuration + GITHUB_COPILOT_MODEL: ${{ vars.GITHUB_COPILOT_MODEL }} + defaults: + run: + working-directory: python + steps: + - uses: actions/checkout@v6 + + - name: Set up python and install the project + uses: ./.github/actions/python-setup + with: + python-version: "3.12" + os: ${{ runner.os }} + env: + UV_CACHE_DIR: /tmp/.uv-cache + + - name: Run sample validation + run: | + cd samples && uv run python -m _sample_validation --subdir 03-workflows --save-report --report-name 03-workflows + + - name: Upload validation report + uses: actions/upload-artifact@v4 + if: always() + with: + name: validation-report-03-workflows + path: python/samples/_sample_validation/reports/ + + validate-04-hosting: + name: Validate 04-hosting + runs-on: ubuntu-latest + permissions: + contents: read + env: + # Azure AI configuration + AZURE_AI_PROJECT_ENDPOINT: ${{ secrets.AZURE_AI_PROJECT_ENDPOINT }} + AZURE_AI_MODEL_DEPLOYMENT_NAME: ${{ secrets.AZURE_AI_MODEL_DEPLOYMENT_NAME }} + # Azure OpenAI configuration + AZURE_OPENAI_ENDPOINT: ${{ secrets.AZURE_OPENAI_ENDPOINT }} + AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME: ${{ secrets.AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME }} + # GitHub Copilot configuration + GITHUB_COPILOT_MODEL: ${{ vars.GITHUB_COPILOT_MODEL }} + defaults: + run: + working-directory: python + steps: + - uses: actions/checkout@v6 + + - name: Set up python and install the project + uses: ./.github/actions/python-setup + with: + python-version: "3.12" + os: ${{ runner.os }} + env: + UV_CACHE_DIR: /tmp/.uv-cache + + - name: Run sample validation + run: | + cd samples && uv run python -m _sample_validation --subdir 04-hosting --save-report --report-name 04-hosting + + - name: Upload validation report + uses: actions/upload-artifact@v4 + if: always() + with: + name: validation-report-04-hosting + path: python/samples/_sample_validation/reports/ + + validate-05-end-to-end: + name: Validate 05-end-to-end + runs-on: ubuntu-latest + permissions: + contents: read + env: + # Azure AI configuration + AZURE_AI_PROJECT_ENDPOINT: ${{ secrets.AZURE_AI_PROJECT_ENDPOINT }} + AZURE_AI_MODEL_DEPLOYMENT_NAME: ${{ secrets.AZURE_AI_MODEL_DEPLOYMENT_NAME }} + # Azure OpenAI configuration + AZURE_OPENAI_ENDPOINT: ${{ secrets.AZURE_OPENAI_ENDPOINT }} + AZURE_OPENAI_CHAT_DEPLOYMENT_NAME: ${{ secrets.AZURE_OPENAI_CHAT_DEPLOYMENT_NAME }} + AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME: ${{ secrets.AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME }} + # Azure AI Search (for evaluation samples) + AZURE_SEARCH_ENDPOINT: ${{ secrets.AZURE_SEARCH_ENDPOINT }} + AZURE_SEARCH_API_KEY: ${{ secrets.AZURE_SEARCH_API_KEY }} + AZURE_SEARCH_INDEX_NAME: ${{ secrets.AZURE_SEARCH_INDEX_NAME }} + # GitHub Copilot configuration + GITHUB_COPILOT_MODEL: ${{ vars.GITHUB_COPILOT_MODEL }} + defaults: + run: + working-directory: python + steps: + - uses: actions/checkout@v6 + + - name: Set up python and install the project + uses: ./.github/actions/python-setup + with: + python-version: "3.12" + os: ${{ runner.os }} + env: + UV_CACHE_DIR: /tmp/.uv-cache + + - name: Run sample validation + run: | + cd samples && uv run python -m _sample_validation --subdir 05-end-to-end --save-report --report-name 05-end-to-end + + - name: Upload validation report + uses: actions/upload-artifact@v4 + if: always() + with: + name: validation-report-05-end-to-end + path: python/samples/_sample_validation/reports/ + + validate-autogen-migration: + name: Validate autogen-migration + runs-on: ubuntu-latest + permissions: + contents: read + env: + # Azure AI configuration + AZURE_AI_PROJECT_ENDPOINT: ${{ secrets.AZURE_AI_PROJECT_ENDPOINT }} + AZURE_AI_MODEL_DEPLOYMENT_NAME: ${{ secrets.AZURE_AI_MODEL_DEPLOYMENT_NAME }} + # Azure OpenAI configuration + AZURE_OPENAI_ENDPOINT: ${{ secrets.AZURE_OPENAI_ENDPOINT }} + AZURE_OPENAI_CHAT_DEPLOYMENT_NAME: ${{ secrets.AZURE_OPENAI_CHAT_DEPLOYMENT_NAME }} + # GitHub Copilot configuration + GITHUB_COPILOT_MODEL: ${{ vars.GITHUB_COPILOT_MODEL }} + defaults: + run: + working-directory: python + steps: + - uses: actions/checkout@v6 + + - name: Set up python and install the project + uses: ./.github/actions/python-setup + with: + python-version: "3.12" + os: ${{ runner.os }} + env: + UV_CACHE_DIR: /tmp/.uv-cache + + - name: Run sample validation + run: | + cd samples && uv run python -m _sample_validation --subdir autogen-migration --save-report --report-name autogen-migration + + - name: Upload validation report + uses: actions/upload-artifact@v4 + if: always() + with: + name: validation-report-autogen-migration + path: python/samples/_sample_validation/reports/ + + validate-semantic-kernel-migration: + name: Validate semantic-kernel-migration + runs-on: ubuntu-latest + permissions: + contents: read + env: + # Azure AI configuration + AZURE_AI_PROJECT_ENDPOINT: ${{ secrets.AZURE_AI_PROJECT_ENDPOINT }} + AZURE_AI_MODEL_DEPLOYMENT_NAME: ${{ secrets.AZURE_AI_MODEL_DEPLOYMENT_NAME }} + # Azure OpenAI configuration + AZURE_OPENAI_ENDPOINT: ${{ secrets.AZURE_OPENAI_ENDPOINT }} + AZURE_OPENAI_CHAT_DEPLOYMENT_NAME: ${{ secrets.AZURE_OPENAI_CHAT_DEPLOYMENT_NAME }} + AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME: ${{ secrets.AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME }} + # OpenAI configuration + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + OPENAI_CHAT_MODEL_ID: ${{ vars.OPENAI_CHAT_MODEL_ID }} + OPENAI_RESPONSES_MODEL_ID: ${{ vars.OPENAI_RESPONSES_MODEL_ID }} + # Copilot Studio + COPILOTSTUDIOAGENT__ENVIRONMENTID: ${{ secrets.COPILOTSTUDIOAGENT__ENVIRONMENTID }} + COPILOTSTUDIOAGENT__SCHEMANAME: ${{ secrets.COPILOTSTUDIOAGENT__SCHEMANAME }} + COPILOTSTUDIOAGENT__TENANTID: ${{ secrets.COPILOTSTUDIOAGENT__TENANTID }} + COPILOTSTUDIOAGENT__AGENTAPPID: ${{ secrets.COPILOTSTUDIOAGENT__AGENTAPPID }} + # GitHub Copilot configuration + GITHUB_COPILOT_MODEL: ${{ vars.GITHUB_COPILOT_MODEL }} + defaults: + run: + working-directory: python + steps: + - uses: actions/checkout@v6 + + - name: Set up python and install the project + uses: ./.github/actions/python-setup + with: + python-version: "3.12" + os: ${{ runner.os }} + env: + UV_CACHE_DIR: /tmp/.uv-cache + + - name: Run sample validation + run: | + cd samples && uv run python -m _sample_validation --subdir semantic-kernel-migration --save-report --report-name semantic-kernel-migration + + - name: Upload validation report + uses: actions/upload-artifact@v4 + if: always() + with: + name: validation-report-semantic-kernel-migration + path: python/samples/_sample_validation/reports/ diff --git a/python/samples/_run_all_samples.py b/python/samples/_run_all_samples.py deleted file mode 100644 index 7d1a226e5c..0000000000 --- a/python/samples/_run_all_samples.py +++ /dev/null @@ -1,304 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. - -""" -Script to run all Python samples in the samples directory concurrently. -This script will run all samples and report results at the end. - -Note: This script is AI generated. This is for internal validation purposes only. - -Samples that require human interaction are known to fail. - -Usage: - python run_all_samples.py # Run all samples using uv run (concurrent) - python run_all_samples.py --direct # Run all samples directly (concurrent, - # assumes environment is set up) - python run_all_samples.py --subdir # Run samples only in specific subdirectory - python run_all_samples.py --subdir getting_started/workflows # Example: run only workflow samples -""" - -import argparse -import os -import subprocess -import sys -from concurrent.futures import ThreadPoolExecutor, as_completed -from pathlib import Path - - -def find_python_samples(samples_dir: Path, subdir: str | None = None) -> list[Path]: - """Find all Python sample files in the samples directory or a subdirectory.""" - python_files: list[Path] = [] - - # Determine the search directory - if subdir: - search_dir = samples_dir / subdir - if not search_dir.exists(): - print(f"Warning: Subdirectory '{subdir}' does not exist in {samples_dir}") - return [] - print(f"Searching in subdirectory: {search_dir}") - else: - search_dir = samples_dir - print(f"Searching in all samples: {search_dir}") - - # Walk through all subdirectories and find .py files - for root, dirs, files in os.walk(search_dir): - # Skip __pycache__ directories - dirs[:] = [d for d in dirs if d != "__pycache__"] - - for file in files: - if file.endswith(".py") and not file.startswith("_") and file != "_run_all_samples.py": - python_files.append(Path(root) / file) - - # Sort files for consistent execution order - return sorted(python_files) - - -def run_sample( - sample_path: Path, - use_uv: bool = True, - python_root: Path | None = None, -) -> tuple[bool, str, str, str]: - """ - Run a single sample file using subprocess and return (success, output, error_info, error_type). - - Args: - sample_path: Path to the sample file - use_uv: Whether to use uv run - python_root: Root directory for uv run - - Returns: - Tuple of (success, output, error_info, error_type) - error_type can be: "timeout", "input_hang", "execution_error", "exception" - """ - if use_uv and python_root: - cmd = ["uv", "run", "python", str(sample_path)] - cwd = python_root - else: - cmd = [sys.executable, sample_path.name] - cwd = sample_path.parent - - # Set environment variables to handle Unicode properly - env = os.environ.copy() - env["PYTHONIOENCODING"] = "utf-8" # Force Python to use UTF-8 for I/O - env["PYTHONUTF8"] = "1" # Enable UTF-8 mode in Python 3.7+ - - try: - # Use Popen for better timeout handling with stdin for samples that may wait for input - # Popen gives us more control over process lifecycle compared to subprocess.run() - process = subprocess.Popen( - cmd, # Command to execute as a list [program, arg1, arg2, ...] - cwd=cwd, # Working directory for the subprocess - stdout=subprocess.PIPE, # Capture stdout so we can read the output - stderr=subprocess.PIPE, # Capture stderr so we can read error messages - stdin=subprocess.PIPE, # Create a pipe for stdin so we can send input - text=True, # Handle input/output as text strings (not bytes) - encoding="utf-8", # Use UTF-8 encoding to handle Unicode characters like emojis - errors="replace", # Replace problematic characters instead of failing - env=env, # Pass environment variables for proper Unicode handling - ) - - try: - # communicate() sends input to stdin and waits for process to complete - # input="" sends an empty string to stdin, which causes input() calls to - # immediately receive EOFError (End Of File) since there's no data to read. - # This prevents the process from hanging indefinitely waiting for user input. - stdout, stderr = process.communicate(input="", timeout=60) - except subprocess.TimeoutExpired: - # If the process doesn't complete within the timeout period, we need to - # forcibly terminate it. This is especially important for processes that - # ignore EOFError and continue to hang on input() calls. - - # First attempt: Send SIGKILL (immediate termination) on Unix or TerminateProcess on Windows - process.kill() - try: - # Give the process a few seconds to clean up after being killed - stdout, stderr = process.communicate(timeout=5) - except subprocess.TimeoutExpired: - # If the process is still alive after kill(), use terminate() as a last resort - # terminate() sends SIGTERM (graceful termination request) which may work - # when kill() doesn't on some systems - process.terminate() - stdout, stderr = "", "Process forcibly terminated" - return False, "", f"TIMEOUT: {sample_path.name} (exceeded 60 seconds)", "timeout" - - if process.returncode == 0: - output = stdout.strip() if stdout.strip() else "No output" - return True, output, "", "success" - - error_info = f"Exit code: {process.returncode}" - if stderr.strip(): - error_info += f"\nSTDERR: {stderr}" - - # Check if this looks like an input/interaction related error - error_type = "execution_error" - stderr_safe = stderr.encode("utf-8", errors="replace").decode("utf-8") if stderr else "" - if "EOFError" in stderr_safe or "input" in stderr_safe.lower() or "stdin" in stderr_safe.lower(): - error_type = "input_hang" - elif "UnicodeEncodeError" in stderr_safe and ("charmap" in stderr_safe or "codec can't encode" in stderr_safe): - error_type = "input_hang" # Unicode errors often indicate interactive samples with emojis - - return False, stdout.strip() if stdout.strip() else "", error_info, error_type - except Exception as e: - return False, "", f"ERROR: {sample_path.name} - Exception: {str(e)}", "exception" - - -def parse_arguments() -> argparse.Namespace: - """Parse command line arguments.""" - parser = argparse.ArgumentParser( - description="Run Python samples concurrently", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - python run_all_samples.py # Run all samples - python run_all_samples.py --direct # Run all samples directly - python run_all_samples.py --subdir getting_started # Run only getting_started samples - python run_all_samples.py --subdir getting_started/workflows # Run only workflow samples - python run_all_samples.py --subdir semantic-kernel-migration # Run only SK migration samples - """, - ) - - parser.add_argument( - "--direct", action="store_true", help="Run samples directly with python instead of using uv run" - ) - - parser.add_argument( - "--subdir", type=str, help="Run samples only in the specified subdirectory (relative to samples/)" - ) - - parser.add_argument( - "--max-workers", type=int, default=16, help="Maximum number of concurrent workers (default: 16)" - ) - - return parser.parse_args() - - -def main() -> None: - """Main function to run all samples concurrently.""" - args = parse_arguments() - - # Get the samples directory (assuming this script is in the samples directory) - samples_dir = Path(__file__).parent - python_root = samples_dir.parent # Go up to the python/ directory - - print("Python samples runner") - print(f"Samples directory: {samples_dir}") - - if args.direct: - print("Running samples directly (assuming environment is set up)") - else: - print(f"Using uv run from: {python_root}") - - if args.subdir: - print(f"Filtering to subdirectory: {args.subdir}") - - print("🚀 Running samples concurrently...") - - # Find all Python sample files - sample_files = find_python_samples(samples_dir, args.subdir) - - if not sample_files: - print("No Python sample files found!") - return - - print(f"Found {len(sample_files)} Python sample files") - - # Run samples concurrently - results: list[tuple[Path, bool, str, str, str]] = [] - - with ThreadPoolExecutor(max_workers=args.max_workers) as executor: - # Submit all tasks - future_to_sample = { - executor.submit(run_sample, sample_path, not args.direct, python_root): sample_path - for sample_path in sample_files - } - - # Collect results as they complete - for future in as_completed(future_to_sample): - sample_path = future_to_sample[future] - try: - success, output, error_info, error_type = future.result() - results.append((sample_path, success, output, error_info, error_type)) - - # Print progress - show relative path from samples directory - relative_path = sample_path.relative_to(samples_dir) - if success: - print(f"✅ {relative_path}") - else: - # Show error type in progress display - error_display = f"{error_type.upper()}" if error_type != "execution_error" else "ERROR" - print(f"❌ {relative_path} - {error_display}") - - except Exception as e: - error_info = f"Future exception: {str(e)}" - results.append((sample_path, False, "", error_info, "exception")) - relative_path = sample_path.relative_to(samples_dir) - print(f"❌ {relative_path} - EXCEPTION") - - # Sort results by original file order for consistent reporting - sample_to_index = {path: i for i, path in enumerate(sample_files)} - results.sort(key=lambda x: sample_to_index[x[0]]) - - successful_runs = sum(1 for _, success, _, _, _ in results if success) - failed_runs = len(results) - successful_runs - - # Categorize failures by type - timeout_failures = [r for r in results if not r[1] and r[4] == "timeout"] - input_hang_failures = [r for r in results if not r[1] and r[4] == "input_hang"] - execution_errors = [r for r in results if not r[1] and r[4] == "execution_error"] - exceptions = [r for r in results if not r[1] and r[4] == "exception"] - - # Print detailed results - print(f"\n{'=' * 80}") - print("DETAILED RESULTS:") - print(f"{'=' * 80}") - - for sample_path, success, output, error_info, error_type in results: - relative_path = sample_path.relative_to(samples_dir) - if success: - print(f"✅ {relative_path}") - if output and output != "No output": - print(f" Output preview: {output[:100]}{'...' if len(output) > 100 else ''}") - else: - # Display error with type indicator - if error_type == "timeout": - print(f"⏱️ {relative_path} - TIMEOUT (likely waiting for input)") - elif error_type == "input_hang": - print(f"⌨️ {relative_path} - INPUT ERROR (interactive sample)") - elif error_type == "exception": - print(f"💥 {relative_path} - EXCEPTION") - else: - print(f"❌ {relative_path} - EXECUTION ERROR") - print(f" Error: {error_info}") - - # Print categorized summary - print(f"\n{'=' * 80}") - if failed_runs == 0: - print("🎉 ALL SAMPLES COMPLETED SUCCESSFULLY!") - else: - print(f"❌ {failed_runs} SAMPLE(S) FAILED!") - - print(f"Successful runs: {successful_runs}") - print(f"Failed runs: {failed_runs}") - - if failed_runs > 0: - print("\nFailure breakdown:") - if len(timeout_failures) > 0: - print(f" ⏱️ Timeouts (likely interactive): {len(timeout_failures)}") - if len(input_hang_failures) > 0: - print(f" ⌨️ Input errors (interactive): {len(input_hang_failures)}") - if len(execution_errors) > 0: - print(f" ❌ Execution errors: {len(execution_errors)}") - if len(exceptions) > 0: - print(f" 💥 Exceptions: {len(exceptions)}") - - if args.subdir: - print(f"Subdirectory filter: {args.subdir}") - - print(f"{'=' * 80}") - - # Exit with error code if any samples failed - if failed_runs > 0: - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/python/samples/_sample_validation/__main__.py b/python/samples/_sample_validation/__main__.py index 15c36bc324..55d7df4b91 100644 --- a/python/samples/_sample_validation/__main__.py +++ b/python/samples/_sample_validation/__main__.py @@ -69,6 +69,12 @@ def parse_arguments() -> argparse.Namespace: help="Maximum number of samples to run in parallel per batch (default: 10)", ) + parser.add_argument( + "--report-name", + type=str, + help="Custom name for the report files (without extension). If not provided, uses timestamp.", + ) + return parser.parse_args() @@ -122,7 +128,7 @@ async def main() -> int: # Save report if requested if args.save_report: output_dir = samples_dir / args.output_dir - md_path, json_path = save_report(report, output_dir) + md_path, json_path = save_report(report, output_dir, name=args.report_name) print("\nReports saved:") print(f" Markdown: {md_path}") print(f" JSON: {json_path}") diff --git a/python/samples/_sample_validation/report.py b/python/samples/_sample_validation/report.py index e6af5f46c9..d6083f44f6 100644 --- a/python/samples/_sample_validation/report.py +++ b/python/samples/_sample_validation/report.py @@ -33,27 +33,32 @@ def generate_report(results: list[RunResult]) -> Report: ) -def save_report(report: Report, output_dir: Path) -> tuple[Path, Path]: +def save_report(report: Report, output_dir: Path, name: str | None = None) -> tuple[Path, Path]: """ Save the report to markdown and JSON files. Args: report: The report to save output_dir: Directory to save the report files + name: Optional custom name for the report files (without extension) Returns: Tuple of (markdown_path, json_path) """ output_dir.mkdir(parents=True, exist_ok=True) - timestamp_str = report.timestamp.strftime("%Y%m%d_%H%M%S") + if name: + base_name = name + else: + timestamp_str = report.timestamp.strftime("%Y%m%d_%H%M%S") + base_name = f"validation_report_{timestamp_str}" # Save markdown - md_path = output_dir / f"validation_report_{timestamp_str}.md" + md_path = output_dir / f"{base_name}.md" md_path.write_text(report.to_markdown(), encoding="utf-8") # Save JSON - json_path = output_dir / f"validation_report_{timestamp_str}.json" + json_path = output_dir / f"{base_name}.json" json_path.write_text( json.dumps(report.to_dict(), indent=2), encoding="utf-8", From b75b5dd02be16ad7845bde05f7645fa94e26053a Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 23 Feb 2026 14:10:28 -0800 Subject: [PATCH 4/5] comments --- python/samples/_sample_validation/README.md | 10 ---------- python/samples/_sample_validation/__init__.py | 2 +- .../create_dynamic_workflow_executor.py | 13 ++++++++----- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/python/samples/_sample_validation/README.md b/python/samples/_sample_validation/README.md index c25058c850..4ed84b4c41 100644 --- a/python/samples/_sample_validation/README.md +++ b/python/samples/_sample_validation/README.md @@ -181,13 +181,3 @@ If an agent returns non-JSON content, that sample is marked as `ERROR` with pars ### GitHub Copilot authentication or CLI issues Ensure GitHub Copilot is authenticated in your environment and the Copilot CLI is available. - -## Extending - -### Custom Per-Sample Validation Prompt - -Modify `agent_prompt()` in `create_dynamic_workflow_executor.py`. - -### Custom Report Formats - -Extend `Report.to_markdown()` or `Report.to_dict()` in `models.py`. diff --git a/python/samples/_sample_validation/__init__.py b/python/samples/_sample_validation/__init__.py index 5658c7a987..afa0f47291 100644 --- a/python/samples/_sample_validation/__init__.py +++ b/python/samples/_sample_validation/__init__.py @@ -11,7 +11,7 @@ Usage: uv run python -m _sample_validation - uv run python -m _sample_validation --subdir getting_started + uv run python -m _sample_validation --subdir 01-get-started """ from _sample_validation.models import Report, RunResult, SampleInfo diff --git a/python/samples/_sample_validation/create_dynamic_workflow_executor.py b/python/samples/_sample_validation/create_dynamic_workflow_executor.py index db01f6b5b0..a8fd2011b4 100644 --- a/python/samples/_sample_validation/create_dynamic_workflow_executor.py +++ b/python/samples/_sample_validation/create_dynamic_workflow_executor.py @@ -24,7 +24,7 @@ handler, ) from agent_framework.github import GitHubCopilotAgent -from copilot import PermissionRequest, PermissionRequestResult +from copilot.types import PermissionRequest, PermissionRequestResult from pydantic import BaseModel from typing_extensions import Never @@ -54,7 +54,7 @@ class BatchCompletion: AgentInstruction = ( "You are validating exactly one Python sample.\n" "Analyze the sample code and execute it. Determine if it runs successfully, fails, or times out.\n" - "The sample can be interactive. If it is interactive, response to the sample when prompted " + "The sample can be interactive. If it is interactive, respond to the sample when prompted " "based on your analysis of the code. You do not need to consult human on what to respond\n" "Return ONLY valid JSON with this schema:\n" "{\n" @@ -98,7 +98,7 @@ def prompt_permission(request: PermissionRequest, context: dict[str, str]) -> Pe class CustomAgentExecutor(Executor): """Executor that runs a GitHub Copilot agent and returns its response. - We need the custome executor to wrap the agent call in a try/except to ensure that any exceptions are caught and + We need the custom executor to wrap the agent call in a try/except to ensure that any exceptions are caught and returned as error responses, otherwise an exception in one agent could crash the entire workflow. """ @@ -147,12 +147,15 @@ def __init__(self, worker_ids: list[str], max_parallel_workers: int) -> None: async def _assign_next(self, worker_id: str, ctx: WorkflowContext[SampleInfo | BatchCompletion]) -> None: if not self._pending: - await ctx.send_message(BatchCompletion(), target_id="collector") + # No more samples to assign + if not self._inflight: + # All tasks are completed, notify collector and exit + await ctx.send_message(BatchCompletion(), target_id="collector") return sample = self._pending.popleft() self._inflight.add(worker_id) - # Messages will get queued in the runner until the next superstep whe all workers are freed, + # Messages will get queued in the runner until the next superstep when all workers are freed, # thus achieving automatic batching without needing complex synchronization logic await ctx.send_message(sample, target_id=worker_id) From 8092d5ef1a2423c2f29a3abf1d952ec7ce4a4b98 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Mon, 23 Feb 2026 15:56:53 -0800 Subject: [PATCH 5/5] Fix mypy --- .../run_dynamic_validation_workflow_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py b/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py index b9ec558487..c5e7c8616b 100644 --- a/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py +++ b/python/samples/_sample_validation/run_dynamic_validation_workflow_executor.py @@ -1,6 +1,6 @@ # Copyright (c) Microsoft. All rights reserved. -from typing import Sequence +from collections.abc import Sequence from _sample_validation.const import WORKER_COMPLETED from _sample_validation.create_dynamic_workflow_executor import CoordinatorStart