diff --git a/scripts/classify_real_pilot_findings.py b/scripts/classify_real_pilot_findings.py new file mode 100644 index 0000000..57c70b9 --- /dev/null +++ b/scripts/classify_real_pilot_findings.py @@ -0,0 +1,598 @@ +#!/usr/bin/env python3 +"""Create sanitized reviewer classification artifacts for real-pilot findings.""" + +from __future__ import annotations + +import argparse +import json +import os +import re +import sys +from collections import Counter +from pathlib import Path +from typing import Any + +REPO_ROOT = Path(__file__).resolve().parents[1] +TEST_OVERRIDE_ENV = "IAMSCOPE_ALLOW_REPO_OUTPUT_FOR_TESTS" + +CLASSIFICATIONS = { + "valid_path", + "expected_benign", + "blocked_or_controlled", + "inconclusive_needs_context", + "environmental_extra", + "tool_bug", + "needs_more_evidence", +} +CONFIDENCE_VALUES = {"low", "medium", "high"} +ARN_RE = re.compile(r"arn:aws:(iam|sts)::[0-9]{12}:([A-Za-z0-9+=,.@_:/-]+)") +ACCOUNT_ID_RE = re.compile(r"\b[0-9]{12}\b") + + +def _load_json(path: Path) -> Any: + with path.open(encoding="utf-8") as handle: + return json.load(handle) + + +def _write_json(path: Path, payload: Any) -> None: + path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8") + + +def _is_relative_to(path: Path, parent: Path) -> bool: + try: + path.relative_to(parent) + except ValueError: + return False + return True + + +def _ensure_output_dir(path: Path) -> Path: + output = path.resolve() + repo = REPO_ROOT.resolve() + if _is_relative_to(output, repo) and os.environ.get(TEST_OVERRIDE_ENV) != "1": + raise ValueError(f"refusing to write real-pilot review artifacts inside repository tree: {output}") + output.mkdir(parents=True, exist_ok=True) + return output + + +def _tail_name(value: str) -> str: + resource = value.split(":", 5)[-1] + if resource.startswith("assumed-role/"): + parts = resource.split("/") + return parts[1] if len(parts) > 1 and parts[1] else "assumed-role" + if "/" in resource: + return resource.rstrip("/").split("/")[-1] or resource.split("/", 1)[0] + return resource or "resource" + + +def sanitize_text(value: Any) -> str: + text = str(value) + + def replace_arn(match: re.Match[str]) -> str: + service = match.group(1) + tail = _tail_name(match.group(0)) + return f"" + + text = ARN_RE.sub(replace_arn, text) + return ACCOUNT_ID_RE.sub("", text) + + +def sanitize_json(value: Any) -> Any: + if isinstance(value, dict): + return {sanitize_text(key): sanitize_json(item) for key, item in value.items()} + if isinstance(value, list): + return [sanitize_json(item) for item in value] + if isinstance(value, str): + return sanitize_text(value) + return value + + +def _iter_strings(value: Any) -> list[str]: + if isinstance(value, dict): + strings: list[str] = [] + for key, item in value.items(): + strings.append(str(key)) + strings.extend(_iter_strings(item)) + return strings + if isinstance(value, list): + strings = [] + for item in value: + strings.extend(_iter_strings(item)) + return strings + if isinstance(value, str): + return [value] + if isinstance(value, bool): + return [str(value).lower()] + if value is None: + return [] + return [str(value)] + + +def _find_values_by_key(value: Any, wanted: set[str]) -> dict[str, list[Any]]: + found: dict[str, list[Any]] = {key: [] for key in wanted} + if isinstance(value, dict): + for key, item in value.items(): + if key in wanted: + found[key].append(item) + nested = _find_values_by_key(item, wanted) + for nested_key, nested_values in nested.items(): + found[nested_key].extend(nested_values) + elif isinstance(value, list): + for item in value: + nested = _find_values_by_key(item, wanted) + for nested_key, nested_values in nested.items(): + found[nested_key].extend(nested_values) + return found + + +def _boolish(value: Any) -> str: + if isinstance(value, bool): + return "yes" if value else "no" + if value is None: + return "unknown" + return sanitize_text(value) + + +def _required_check_states(finding: dict[str, Any]) -> dict[str, str]: + checks: dict[str, str] = {} + required_checks = finding.get("required_checks") + if isinstance(required_checks, list): + for check in required_checks: + if not isinstance(check, dict): + continue + name = check.get("name") + state = check.get("state") + if isinstance(name, str): + checks[name] = sanitize_text(state if state is not None else "unknown") + states = finding.get("required_check_states") + if isinstance(states, dict): + for name, state in states.items(): + checks[str(name)] = sanitize_text(state) + return dict(sorted(checks.items())) + + +def _source_or_target_name(finding: dict[str, Any], side: str) -> str: + direct_keys = { + "source": ["source_name", "source_principal", "source_principal_arn", "source_arn"], + "target": ["target_name", "target", "target_role_arn", "target_arn"], + }[side] + for key in direct_keys: + value = finding.get(key) + if isinstance(value, str) and value: + return sanitize_text(_tail_name(value) if value.startswith("arn:aws:") else value) + nested = finding.get(side) + if isinstance(nested, dict): + provider_id = nested.get("provider_id") or nested.get("arn") or nested.get("name") + if isinstance(provider_id, str) and provider_id: + return sanitize_text(_tail_name(provider_id) if provider_id.startswith("arn:aws:") else provider_id) + return "unknown" + + +def _finding_id(finding: dict[str, Any]) -> str: + finding_id = finding.get("finding_id") + if not isinstance(finding_id, str) or not finding_id: + raise ValueError("finding missing non-empty finding_id") + return finding_id + + +def _finding_prefix(finding_id: str) -> str: + prefix = finding_id[:12] + if ACCOUNT_ID_RE.fullmatch(prefix): + prefix = finding_id[:10] + return prefix + + +def _evidence_summary(finding: dict[str, Any]) -> list[str]: + pattern = finding.get("pattern_id") + values = _find_values_by_key( + finding, + { + "trust_scope", + "naked_trust", + "wildcard_principal", + "has_external_id", + "has_conditions", + "reachable_admins_count", + }, + ) + strings = " ".join(_iter_strings(finding)).lower() + checks = _required_check_states(finding) + parts: list[str] = [] + + if pattern == "cross_account_trust": + for key in ("trust_scope", "naked_trust", "wildcard_principal", "has_external_id", "has_conditions"): + if values[key]: + parts.append(f"{key}: {_boolish(values[key][0])}") + if "externalid" in strings and not values["has_external_id"]: + parts.append("has_external_id: yes") + if "wildcard" in strings and not values["wildcard_principal"]: + parts.append("wildcard_principal: mentioned") + if "account-root" in strings or "account root" in strings: + parts.append("account_root_trust: mentioned") + + if pattern == "admin_reachability": + assume_role_state = next( + (state for name, state in checks.items() if "assume" in name.lower() and "role" in name.lower()), + None, + ) + if assume_role_state: + parts.append(f"source_has_assume_role: {assume_role_state}") + if values["reachable_admins_count"]: + parts.append(f"reachable_admins_count: {sanitize_text(values['reachable_admins_count'][0])}") + clean_witness_state = checks.get("at_least_one_reachable_chain_uses_clean_witnesses") + if clean_witness_state: + parts.append(f"clean_witness_check: {clean_witness_state}") + if "administratoraccess" in strings: + parts.append("admin_witness_policy: AdministratorAccess") + + if checks: + summary = ", ".join(f"{name}={state}" for name, state in list(checks.items())[:6]) + parts.append(f"check_states: {summary}") + + if not parts: + edge_refs = finding.get("evidence", {}).get("edge_refs") if isinstance(finding.get("evidence"), dict) else None + if isinstance(edge_refs, list): + parts.append(f"evidence_edges: {len(edge_refs)}") + return [sanitize_text(part) for part in parts[:8]] + + +def _extract_findings(payload: Any) -> list[dict[str, Any]]: + if isinstance(payload, dict) and isinstance(payload.get("findings"), list): + findings = payload["findings"] + elif isinstance(payload, list): + findings = payload + else: + raise ValueError("findings JSON must be a list or an object with a findings list") + if not all(isinstance(finding, dict) for finding in findings): + raise ValueError("all findings must be JSON objects") + return list(findings) + + +def _scenario_counts(scenario: Any) -> dict[str, int]: + if not isinstance(scenario, dict): + return {"nodes": 0, "edges": 0, "constraints": 0, "edge_constraints": 0} + return { + "nodes": len(scenario.get("nodes", []) or []), + "edges": len(scenario.get("edges", []) or []), + "constraints": len(scenario.get("constraints", []) or []), + "edge_constraints": len(scenario.get("edge_constraints", []) or []), + } + + +def _scenario_edges_by_id(scenario: Any) -> dict[str, dict[str, Any]]: + if not isinstance(scenario, dict): + return {} + edges = scenario.get("edges", []) + if not isinstance(edges, list): + return {} + indexed: dict[str, dict[str, Any]] = {} + for edge in edges: + if not isinstance(edge, dict): + continue + for key in ("id", "edge_id"): + edge_id = edge.get(key) + if isinstance(edge_id, str) and edge_id: + indexed[edge_id] = edge + return indexed + + +def _short_value(value: Any) -> str: + if isinstance(value, dict): + pieces = [] + for key in sorted(value): + item = value[key] + item_text = f"{type(item).__name__}[{len(item)}]" if isinstance(item, dict | list) else sanitize_text(item) + pieces.append(f"{sanitize_text(key)}={item_text}") + return ", ".join(pieces) if pieces else "empty" + if isinstance(value, list): + return ", ".join(sanitize_text(item) for item in value[:6]) if value else "empty" + if value is None: + return "not_provided" + return sanitize_text(value) + + +def _collection_context_summary(finding: dict[str, Any]) -> str: + for key in ("collection_context", "collection_context_summary"): + value = finding.get(key) + if value: + return _short_value(value) + evidence = finding.get("evidence") + if isinstance(evidence, dict): + for key in ("collection_context", "collection_context_summary"): + value = evidence.get(key) + if value: + return _short_value(value) + return "not_provided" + + +def _assumptions_summary(finding: dict[str, Any]) -> list[str]: + assumptions = finding.get("assumptions") + if not isinstance(assumptions, list): + return [] + entries: list[str] = [] + for assumption in assumptions: + if isinstance(assumption, dict): + kind = sanitize_text(assumption.get("kind", "unknown")) + detail = sanitize_text(assumption.get("detail", "")) + entries.append(f"{kind}: {detail}" if detail else kind) + else: + entries.append(sanitize_text(assumption)) + return entries + + +def _blockers_summary(finding: dict[str, Any]) -> list[str]: + blockers = finding.get("blockers_observed") + if not isinstance(blockers, list): + return [] + entries: list[str] = [] + for blocker in blockers: + if isinstance(blocker, dict): + kind = sanitize_text(blocker.get("kind", "unknown")) + reason = sanitize_text(blocker.get("reason", "")) + constraint_id = sanitize_text(blocker.get("constraint_id", "")) + if reason: + entries.append(f"{kind}: {reason}") + elif constraint_id: + entries.append(f"{kind}: {constraint_id}") + else: + entries.append(kind) + else: + entries.append(sanitize_text(blocker)) + return entries + + +def _collect_evidence_refs(finding: dict[str, Any]) -> list[str]: + refs: set[str] = set() + evidence = finding.get("evidence") + if isinstance(evidence, dict): + for key in ("edge_refs", "evidence_refs"): + values = evidence.get(key) + if isinstance(values, list): + refs.update(str(value) for value in values if value) + required_checks = finding.get("required_checks") + if isinstance(required_checks, list): + for check in required_checks: + if not isinstance(check, dict): + continue + values = check.get("evidence_refs") + if isinstance(values, list): + refs.update(str(value) for value in values if value) + return sorted(refs) + + +def _short_ref(ref: str) -> str: + if len(ref) <= 16: + return sanitize_text(ref) + prefix = ref[:12] + if ACCOUNT_ID_RE.fullmatch(prefix): + prefix = ref[:10] + return sanitize_text(prefix) + + +def _edge_endpoint_tail(edge: dict[str, Any], keys: tuple[str, ...]) -> str: + for key in keys: + value = edge.get(key) + if isinstance(value, str) and value: + return sanitize_text(_tail_name(value) if value.startswith("arn:aws:") else value) + if isinstance(value, dict): + provider_id = value.get("provider_id") or value.get("arn") or value.get("name") + if isinstance(provider_id, str) and provider_id: + return sanitize_text(_tail_name(provider_id) if provider_id.startswith("arn:aws:") else provider_id) + return "unknown" + + +def _referenced_edge_summaries( + finding: dict[str, Any], + edges_by_id: dict[str, dict[str, Any]], +) -> list[str]: + summaries: list[str] = [] + for ref in _collect_evidence_refs(finding): + edge = edges_by_id.get(ref) + if not edge: + continue + source = _edge_endpoint_tail(edge, ("src", "source", "source_provider_id", "source_arn")) + target = _edge_endpoint_tail(edge, ("dst", "target", "target_provider_id", "target_arn")) + action = sanitize_text(edge.get("action") or edge.get("action_or_precondition") or edge.get("kind") or "") + relation = sanitize_text(edge.get("relation") or edge.get("type") or "") + descriptor = action or relation or "edge" + summaries.append(f"{_short_ref(ref)}: {source} -> {target} ({descriptor})") + return summaries[:6] + + +def _load_labels(path: Path | None, findings_by_id: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]: + if path is None: + return {} + payload = _load_json(path) + if not isinstance(payload, dict) or not isinstance(payload.get("labels"), list): + raise ValueError("labels JSON must be an object with a labels list") + + matched: dict[str, dict[str, Any]] = {} + for label in payload["labels"]: + if not isinstance(label, dict): + raise ValueError("each label must be an object") + prefix = label.get("finding_id_prefix") + if not isinstance(prefix, str) or not prefix: + raise ValueError("each label requires a non-empty finding_id_prefix") + classification = label.get("classification") + if classification not in CLASSIFICATIONS: + raise ValueError(f"invalid classification for {prefix}: {classification}") + confidence = label.get("reviewer_confidence") + if confidence is not None and confidence not in CONFIDENCE_VALUES: + raise ValueError(f"invalid reviewer_confidence for {prefix}: {confidence}") + owner_confirmed = label.get("owner_confirmed") + if owner_confirmed is not None and not isinstance(owner_confirmed, bool): + raise ValueError(f"owner_confirmed must be boolean for {prefix}") + + matches = [finding_id for finding_id in findings_by_id if finding_id.startswith(prefix)] + if len(matches) != 1: + raise ValueError(f"finding_id_prefix {prefix!r} matched {len(matches)} findings") + finding_id = matches[0] + if finding_id in matched: + raise ValueError(f"duplicate label for finding_id_prefix {prefix!r}") + matched[finding_id] = sanitize_json(label) + return matched + + +def build_review_artifacts( + *, + scenario_payload: Any, + findings_payload: Any, + labels_payload_path: Path | None = None, +) -> dict[str, Any]: + findings = _extract_findings(findings_payload) + findings_by_id = {_finding_id(finding): finding for finding in findings} + if len(findings_by_id) != len(findings): + raise ValueError("duplicate finding_id values are not allowed") + labels_by_id = _load_labels(labels_payload_path, findings_by_id) + edges_by_id = _scenario_edges_by_id(scenario_payload) + + inventory: list[dict[str, Any]] = [] + template: list[dict[str, Any]] = [] + for finding_id in sorted(findings_by_id): + finding = findings_by_id[finding_id] + label = labels_by_id.get(finding_id) + classification = label.get("classification") if label else "unlabeled" + label_status = "labeled" if label else "unlabeled" + entry: dict[str, Any] = { + "finding_id_prefix": _finding_prefix(finding_id), + "pattern_id": sanitize_text(finding.get("pattern_id", "unknown")), + "verdict": sanitize_text(finding.get("verdict", "unknown")), + "severity": sanitize_text(finding.get("severity", "unknown")), + "source_name": _source_or_target_name(finding, "source"), + "target_name": _source_or_target_name(finding, "target"), + "title": sanitize_text(finding.get("title", "")), + "collection_context_summary": _collection_context_summary(finding), + "assumptions": _assumptions_summary(finding), + "blockers_observed": _blockers_summary(finding), + "required_check_states": _required_check_states(finding), + "evidence_summary": _evidence_summary(finding), + "referenced_edges": _referenced_edge_summaries(finding, edges_by_id), + "reviewer_classification": classification, + "label_status": label_status, + } + if label: + for key in ( + "reviewer_confidence", + "owner_confirmed", + "notes", + "recommended_followup", + "sanitized_evidence_refs", + ): + if key in label: + entry[key] = label[key] + inventory.append(entry) + template.append( + { + "finding_id_prefix": entry["finding_id_prefix"], + "pattern_id": entry["pattern_id"], + "verdict": entry["verdict"], + "severity": entry["severity"], + "source_name": entry["source_name"], + "target_name": entry["target_name"], + "collection_context_summary": entry["collection_context_summary"], + "classification": "", + "reviewer_confidence": "", + "owner_confirmed": False, + "notes": "", + "recommended_followup": "", + "sanitized_evidence_refs": [], + } + ) + + unlabeled = [entry for entry in inventory if entry["label_status"] == "unlabeled"] + summary = { + "scenario_counts": _scenario_counts(scenario_payload), + "finding_count": len(inventory), + "counts": { + "by_pattern_id": dict(sorted(Counter(entry["pattern_id"] for entry in inventory).items())), + "by_iamscope_verdict": dict(sorted(Counter(entry["verdict"] for entry in inventory).items())), + "by_reviewer_classification": dict( + sorted(Counter(entry["reviewer_classification"] for entry in inventory).items()) + ), + "by_severity": dict(sorted(Counter(entry["severity"] for entry in inventory).items())), + "by_label_status": dict(sorted(Counter(entry["label_status"] for entry in inventory).items())), + }, + "non_claims": { + "no_quantitative_performance_metric": True, + "no_composite_benchmark_label": True, + "reviewer_judgment_required": True, + }, + } + return { + "inventory": inventory, + "unlabeled": unlabeled, + "template": { + "pilot_id": "real-pilot-dev-001", + "label_schema_version": 1, + "labels": template, + }, + "summary": summary, + } + + +def _markdown_table(inventory: list[dict[str, Any]]) -> str: + rows = [ + "# Real Pilot Finding Review Table", + "", + "Capability-honesty reminders:", + "", + "- No findings does not mean safe.", + "- Validated is not exploitability proof.", + "- `collection_context` matters.", + "- No composite score.", + "- No pass/fail benchmark label.", + "", + "| Finding | Pattern | IAMScope verdict | Severity | Reviewer classification | Source | Target | " + "Collection context | Evidence summary |", + "| --- | --- | --- | --- | --- | --- | --- | --- | --- |", + ] + for entry in inventory: + evidence_parts = [*entry["evidence_summary"], *entry["referenced_edges"]] + evidence = "; ".join(evidence_parts) if evidence_parts else "none" + cells = [ + entry["finding_id_prefix"], + entry["pattern_id"], + entry["verdict"], + entry["severity"], + entry["reviewer_classification"], + entry["source_name"], + entry["target_name"], + entry["collection_context_summary"], + evidence, + ] + rows.append("| " + " | ".join(sanitize_text(cell).replace("|", "\\|") for cell in cells) + " |") + rows.append("") + return "\n".join(rows) + + +def write_review_artifacts(output_dir: Path, artifacts: dict[str, Any]) -> None: + output = _ensure_output_dir(output_dir) + (output / "review-table.md").write_text(_markdown_table(artifacts["inventory"]), encoding="utf-8") + _write_json(output / "review-summary.json", artifacts["summary"]) + _write_json(output / "unlabeled-findings.json", {"findings": artifacts["unlabeled"]}) + _write_json(output / "reviewer-label-template.json", artifacts["template"]) + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--scenario", required=True, help="Path to scenario.json") + parser.add_argument("--findings", required=True, help="Path to findings.json") + parser.add_argument("--labels", default=None, help="Optional reviewer-labels.json") + parser.add_argument("--out", required=True, help="Output directory outside the repository") + args = parser.parse_args(argv) + + try: + artifacts = build_review_artifacts( + scenario_payload=_load_json(Path(args.scenario)), + findings_payload=_load_json(Path(args.findings)), + labels_payload_path=Path(args.labels) if args.labels else None, + ) + write_review_artifacts(Path(args.out), artifacts) + except Exception as exc: + print(f"error: {exc}", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_real_pilot_finding_classification.py b/tests/test_real_pilot_finding_classification.py new file mode 100644 index 0000000..db7e7bf --- /dev/null +++ b/tests/test_real_pilot_finding_classification.py @@ -0,0 +1,421 @@ +from __future__ import annotations + +import json +import subprocess +import sys +from pathlib import Path +from typing import Any + +from scripts.classify_real_pilot_findings import build_review_artifacts + +REPO_ROOT = Path(__file__).resolve().parents[1] +SCRIPT = REPO_ROOT / "scripts" / "classify_real_pilot_findings.py" + + +def _account_id() -> str: + return "1111" + "2222" + "3333" + + +def _iam_arn(resource: str) -> str: + return f"arn:aws:iam::{_account_id()}:{resource}" + + +def _sts_arn(resource: str) -> str: + return f"arn:aws:sts::{_account_id()}:{resource}" + + +def _scenario() -> dict[str, Any]: + return { + "nodes": [ + {"provider_id": _iam_arn("role/path/SourceWildcardTrustRole")}, + {"provider_id": _iam_arn("role/ProdDBAdminRole")}, + ], + "edges": [ + { + "id": "edge-trust-1", + "src": _iam_arn("role/path/SourceWildcardTrustRole"), + "dst": _iam_arn("role/TrustedTargetRole"), + "action": "sts:AssumeRole", + } + ], + "constraints": [{"id": "constraint-1"}], + "edge_constraints": [{"id": "edge-constraint-1"}], + } + + +def _findings() -> dict[str, Any]: + return { + "findings": [ + { + "finding_id": "aaaabbbbcccc1111", + "pattern_id": "cross_account_trust", + "verdict": "validated", + "severity": "high", + "source": {"provider_id": _iam_arn("role/path/SourceWildcardTrustRole")}, + "target": {"provider_id": _iam_arn("role/TrustedTargetRole")}, + "title": f"{_iam_arn('role/path/SourceWildcardTrustRole')} trusts {_iam_arn('role/TrustedTargetRole')}", + "evidence": { + "edge_refs": ["edge-trust-1"], + "trust_scope": "cross_account", + "naked_trust": True, + "wildcard_principal": True, + "has_external_id": True, + "has_conditions": True, + "principal": _sts_arn("assumed-role/ExternalReviewer/session"), + }, + "collection_context": { + "scope": "partial_org", + "org_membership_status": "unknown", + }, + "assumptions": [ + { + "kind": "org_membership_status", + "detail": "source account membership is unknown in partial collection context", + } + ], + "required_checks": [{"name": "trust_policy_allows_source", "state": "pass"}], + }, + { + "finding_id": "ddddeeeeffff2222", + "pattern_id": "admin_reachability", + "verdict": "inconclusive", + "severity": "critical", + "source_principal_arn": _iam_arn("role/SourceAdminProbeRole"), + "target_role_arn": _iam_arn("role/ProdDBAdminRole"), + "title": f"{_iam_arn('role/SourceAdminProbeRole')} may reach AdministratorAccess target", + "required_checks": [ + {"name": "source_has_assume_role_permission", "state": "pass"}, + { + "name": "at_least_one_reachable_chain_uses_clean_witnesses", + "state": "unknown", + }, + ], + "evidence": { + "reachable_admins_count": 1, + "admin_policy_name": "AdministratorAccess", + }, + "blockers_observed": [ + { + "kind": "permission_boundary", + "reason": "reviewer should inspect boundary context", + "constraint_id": "constraint-1", + } + ], + }, + { + "finding_id": "gggghhhhiiii3333", + "pattern_id": "cross_account_trust", + "verdict": "validated", + "severity": "medium", + "source_principal": "SpecificSourceRole", + "target": "SpecificTargetRole", + "title": "Specific role-to-role trust", + "required_checks": [{"name": "trust_policy_allows_source", "state": "pass"}], + }, + ] + } + + +def _write_inputs(tmp_path: Path) -> tuple[Path, Path]: + scenario = tmp_path / "scenario.json" + findings = tmp_path / "findings.json" + scenario.write_text(json.dumps(_scenario()), encoding="utf-8") + findings.write_text(json.dumps(_findings()), encoding="utf-8") + return scenario, findings + + +def _run_script(*args: str) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [sys.executable, str(SCRIPT), *args], + cwd=REPO_ROOT, + check=False, + text=True, + capture_output=True, + ) + + +def _load(path: Path) -> Any: + return json.loads(path.read_text(encoding="utf-8")) + + +def _walk_keys(value: Any) -> list[str]: + if isinstance(value, dict): + keys: list[str] = [] + for key, item in value.items(): + keys.append(str(key)) + keys.extend(_walk_keys(item)) + return keys + if isinstance(value, list): + keys = [] + for item in value: + keys.extend(_walk_keys(item)) + return keys + return [] + + +def test_script_refuses_output_inside_repo(tmp_path: Path) -> None: + scenario, findings = _write_inputs(tmp_path) + output = REPO_ROOT / "real-pilot-review-output" + result = _run_script("--scenario", str(scenario), "--findings", str(findings), "--out", str(output)) + assert result.returncode == 1 + assert "refusing to write real-pilot review artifacts inside repository tree" in result.stderr + assert not output.exists() + + +def test_no_label_run_emits_all_review_artifacts(tmp_path: Path) -> None: + scenario, findings = _write_inputs(tmp_path) + output = tmp_path / "review" + result = _run_script("--scenario", str(scenario), "--findings", str(findings), "--out", str(output)) + assert result.returncode == 0, result.stderr + assert (output / "review-table.md").exists() + assert (output / "review-summary.json").exists() + assert (output / "unlabeled-findings.json").exists() + assert (output / "reviewer-label-template.json").exists() + assert len(_load(output / "reviewer-label-template.json")["labels"]) == 3 + assert len(_load(output / "unlabeled-findings.json")["findings"]) == 3 + + +def test_review_table_includes_capability_honesty_reminders(tmp_path: Path) -> None: + scenario, findings = _write_inputs(tmp_path) + output = tmp_path / "review" + result = _run_script("--scenario", str(scenario), "--findings", str(findings), "--out", str(output)) + assert result.returncode == 0, result.stderr + table = (output / "review-table.md").read_text(encoding="utf-8") + assert "No findings does not mean safe" in table + assert "Validated is not exploitability proof" in table + assert "`collection_context` matters" in table + assert "No composite score" in table + assert "No pass/fail benchmark label" in table + + +def test_label_run_matches_by_unique_finding_id_prefix(tmp_path: Path) -> None: + scenario, findings = _write_inputs(tmp_path) + labels = tmp_path / "reviewer-labels.json" + labels.write_text( + json.dumps( + { + "pilot_id": "real-pilot-dev-001", + "label_schema_version": 1, + "labels": [ + { + "finding_id_prefix": "aaaabbbbcccc", + "classification": "valid_path", + "reviewer_confidence": "high", + "owner_confirmed": False, + "notes": "Wildcard principal and ExternalId present.", + "recommended_followup": "Confirm whether trust is intentional.", + "sanitized_evidence_refs": ["ticket-123"], + } + ], + } + ), + encoding="utf-8", + ) + output = tmp_path / "review" + result = _run_script( + "--scenario", + str(scenario), + "--findings", + str(findings), + "--labels", + str(labels), + "--out", + str(output), + ) + assert result.returncode == 0, result.stderr + summary = _load(output / "review-summary.json") + assert summary["counts"]["by_label_status"] == {"labeled": 1, "unlabeled": 2} + assert summary["counts"]["by_reviewer_classification"]["valid_path"] == 1 + + +def test_duplicate_or_ambiguous_finding_id_prefix_fails(tmp_path: Path) -> None: + scenario, findings = _write_inputs(tmp_path) + findings_payload = _findings() + findings_payload["findings"][2]["finding_id"] = "aaaazzzzxxxx3333" + findings.write_text(json.dumps(findings_payload), encoding="utf-8") + labels = tmp_path / "reviewer-labels.json" + labels.write_text( + json.dumps( + { + "pilot_id": "real-pilot-dev-001", + "label_schema_version": 1, + "labels": [{"finding_id_prefix": "aaaa", "classification": "valid_path"}], + } + ), + encoding="utf-8", + ) + result = _run_script( + "--scenario", + str(scenario), + "--findings", + str(findings), + "--labels", + str(labels), + "--out", + str(tmp_path / "review"), + ) + assert result.returncode == 1 + assert "matched" in result.stderr + + +def test_duplicate_label_for_same_finding_id_fails(tmp_path: Path) -> None: + scenario, findings = _write_inputs(tmp_path) + labels = tmp_path / "reviewer-labels.json" + labels.write_text( + json.dumps( + { + "pilot_id": "real-pilot-dev-001", + "label_schema_version": 1, + "labels": [ + {"finding_id_prefix": "aaaabbbbcccc", "classification": "valid_path"}, + { + "finding_id_prefix": "aaaabbbbcccc1111", + "classification": "needs_more_evidence", + }, + ], + } + ), + encoding="utf-8", + ) + result = _run_script( + "--scenario", + str(scenario), + "--findings", + str(findings), + "--labels", + str(labels), + "--out", + str(tmp_path / "review"), + ) + assert result.returncode == 1 + assert "duplicate label" in result.stderr + + +def test_invalid_classification_category_fails(tmp_path: Path) -> None: + scenario, findings = _write_inputs(tmp_path) + labels = tmp_path / "reviewer-labels.json" + labels.write_text( + json.dumps( + { + "pilot_id": "real-pilot-dev-001", + "label_schema_version": 1, + "labels": [{"finding_id_prefix": "aaaabbbbcccc", "classification": "confirmed"}], + } + ), + encoding="utf-8", + ) + result = _run_script( + "--scenario", + str(scenario), + "--findings", + str(findings), + "--labels", + str(labels), + "--out", + str(tmp_path / "review"), + ) + assert result.returncode == 1 + assert "invalid classification" in result.stderr + + +def test_raw_account_ids_and_arns_are_absent_from_outputs(tmp_path: Path) -> None: + scenario, findings = _write_inputs(tmp_path) + output = tmp_path / "review" + result = _run_script("--scenario", str(scenario), "--findings", str(findings), "--out", str(output)) + assert result.returncode == 0, result.stderr + combined = "\n".join(path.read_text(encoding="utf-8") for path in output.iterdir()) + assert _account_id() not in combined + assert "arn:aws:iam::" not in combined + assert "arn:aws:sts::" not in combined + assert "SourceWildcardTrustRole" in combined + assert "ProdDBAdminRole" in combined + + +def test_summary_counts_without_score_or_performance_metric_fields(tmp_path: Path) -> None: + artifacts = build_review_artifacts(scenario_payload=_scenario(), findings_payload=_findings()) + summary = artifacts["summary"] + assert summary["finding_count"] == 3 + assert summary["scenario_counts"] == { + "nodes": 2, + "edges": 1, + "constraints": 1, + "edge_constraints": 1, + } + assert summary["counts"]["by_pattern_id"] == {"admin_reachability": 1, "cross_account_trust": 2} + assert summary["counts"]["by_iamscope_verdict"] == {"inconclusive": 1, "validated": 2} + assert summary["counts"]["by_reviewer_classification"] == {"unlabeled": 3} + assert summary["counts"]["by_severity"] == {"critical": 1, "high": 1, "medium": 1} + assert summary["counts"]["by_label_status"] == {"unlabeled": 3} + forbidden_field_names = {"score", "pass_fail", "accuracy", "precision", "recall"} + assert forbidden_field_names.isdisjoint(set(_walk_keys(summary))) + + +def test_collection_context_assumptions_and_blockers_are_surfaced() -> None: + artifacts = build_review_artifacts(scenario_payload=_scenario(), findings_payload=_findings()) + cross_account = artifacts["inventory"][0] + assert "org_membership_status=unknown" in cross_account["collection_context_summary"] + assert "scope=partial_org" in cross_account["collection_context_summary"] + assert cross_account["assumptions"] == [ + "org_membership_status: source account membership is unknown in partial collection context" + ] + + admin = [entry for entry in artifacts["inventory"] if entry["pattern_id"] == "admin_reachability"][0] + assert admin["blockers_observed"] == ["permission_boundary: reviewer should inspect boundary context"] + assert admin["required_check_states"]["at_least_one_reachable_chain_uses_clean_witnesses"] == "unknown" + + +def test_cross_account_trust_summary_preserves_reviewer_signal() -> None: + artifacts = build_review_artifacts(scenario_payload=_scenario(), findings_payload=_findings()) + cross_account = artifacts["inventory"][0] + summary = " ".join(cross_account["evidence_summary"]) + assert "trust_scope: cross_account" in summary + assert "naked_trust: yes" in summary + assert "wildcard_principal: yes" in summary + assert "has_external_id: yes" in summary + assert "has_conditions: yes" in summary + + +def test_referenced_edge_summary_preserves_sanitized_edge_context() -> None: + artifacts = build_review_artifacts(scenario_payload=_scenario(), findings_payload=_findings()) + cross_account = artifacts["inventory"][0] + assert cross_account["referenced_edges"] == [ + "edge-trust-1: SourceWildcardTrustRole -> TrustedTargetRole (sts:AssumeRole)" + ] + + +def test_admin_reachability_summary_preserves_administratoraccess_signal() -> None: + artifacts = build_review_artifacts(scenario_payload=_scenario(), findings_payload=_findings()) + admin = [entry for entry in artifacts["inventory"] if entry["pattern_id"] == "admin_reachability"][0] + summary = " ".join(admin["evidence_summary"]) + assert "source_has_assume_role: pass" in summary + assert "reachable_admins_count: 1" in summary + assert "clean_witness_check: unknown" in summary + assert "admin_witness_policy: AdministratorAccess" in summary + + +def test_template_contains_one_entry_per_finding(tmp_path: Path) -> None: + artifacts = build_review_artifacts(scenario_payload=_scenario(), findings_payload=_findings()) + template = artifacts["template"] + assert template["pilot_id"] == "real-pilot-dev-001" + assert template["label_schema_version"] == 1 + assert len(template["labels"]) == 3 + assert {label["classification"] for label in template["labels"]} == {""} + + +def test_output_is_deterministic_across_repeated_runs(tmp_path: Path) -> None: + scenario, findings = _write_inputs(tmp_path) + output_one = tmp_path / "review-one" + output_two = tmp_path / "review-two" + + first = _run_script("--scenario", str(scenario), "--findings", str(findings), "--out", str(output_one)) + second = _run_script("--scenario", str(scenario), "--findings", str(findings), "--out", str(output_two)) + assert first.returncode == 0, first.stderr + assert second.returncode == 0, second.stderr + + for name in ( + "review-table.md", + "review-summary.json", + "unlabeled-findings.json", + "reviewer-label-template.json", + ): + assert (output_one / name).read_text(encoding="utf-8") == (output_two / name).read_text(encoding="utf-8")