diff --git a/doc/_toc.yml b/doc/_toc.yml
index 0845047cf3..2e231122a0 100644
--- a/doc/_toc.yml
+++ b/doc/_toc.yml
@@ -110,6 +110,7 @@ chapters:
- file: code/scoring/prompt_shield_scorer
- file: code/scoring/generic_scorers
- file: code/scoring/8_scorer_metrics
+ - file: code/analytics/1_result_analysis
- file: code/memory/0_memory
sections:
- file: code/memory/1_sqlite_memory
diff --git a/doc/code/analytics/1_result_analysis.ipynb b/doc/code/analytics/1_result_analysis.ipynb
new file mode 100644
index 0000000000..a2dbca70c2
--- /dev/null
+++ b/doc/code/analytics/1_result_analysis.ipynb
@@ -0,0 +1,898 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "0",
+ "metadata": {},
+ "source": [
+ "# Result Analysis\n",
+ "\n",
+ "The `analyze_results` function computes attack success rates from a list of `AttackResult` objects.\n",
+ "It supports flexible grouping across built-in dimensions (`attack_type`, `converter_type`, `label`)\n",
+ "as well as composite and custom dimensions."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1",
+ "metadata": {},
+ "source": [
+ "## Setup\n",
+ "\n",
+ "First, let's create some sample `AttackResult` objects to work with."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "2",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Created 5 sample AttackResult objects\n"
+ ]
+ }
+ ],
+ "source": [
+ "from pyrit.analytics import analyze_results\n",
+ "from pyrit.identifiers import ComponentIdentifier\n",
+ "from pyrit.models import AttackOutcome, AttackResult, MessagePiece\n",
+ "\n",
+ "\n",
+ "def make_converter(name: str) -> ComponentIdentifier:\n",
+ " return ComponentIdentifier(class_name=name, class_module=\"pyrit.prompt_converter\")\n",
+ "\n",
+ "\n",
+ "crescendo_id = ComponentIdentifier(class_name=\"CrescendoAttack\", class_module=\"pyrit.executor.attack\")\n",
+ "red_team_id = ComponentIdentifier(class_name=\"RedTeamingAttack\", class_module=\"pyrit.executor.attack\")\n",
+ "\n",
+ "# Build a small set of representative attack results\n",
+ "results = [\n",
+ " # Crescendo attacks with Base64Converter\n",
+ " AttackResult(\n",
+ " conversation_id=\"c1\",\n",
+ " objective=\"bypass safety filter\",\n",
+ " attack_identifier=crescendo_id,\n",
+ " outcome=AttackOutcome.SUCCESS,\n",
+ " last_response=MessagePiece(\n",
+ " role=\"user\",\n",
+ " original_value=\"response 1\",\n",
+ " converter_identifiers=[make_converter(\"Base64Converter\")],\n",
+ " labels={\"operation_name\": \"op_safety_bypass\", \"operator\": \"alice\"},\n",
+ " targeted_harm_categories=[\"violence\", \"hate_speech\"],\n",
+ " ),\n",
+ " ),\n",
+ " AttackResult(\n",
+ " conversation_id=\"c2\",\n",
+ " objective=\"bypass safety filter\",\n",
+ " attack_identifier=crescendo_id,\n",
+ " outcome=AttackOutcome.FAILURE,\n",
+ " last_response=MessagePiece(\n",
+ " role=\"user\",\n",
+ " original_value=\"response 2\",\n",
+ " converter_identifiers=[make_converter(\"Base64Converter\")],\n",
+ " labels={\"operation_name\": \"op_safety_bypass\", \"operator\": \"alice\"},\n",
+ " targeted_harm_categories=[\"violence\"],\n",
+ " ),\n",
+ " ),\n",
+ " # Red teaming attacks with ROT13Converter\n",
+ " AttackResult(\n",
+ " conversation_id=\"c3\",\n",
+ " objective=\"extract secrets\",\n",
+ " attack_identifier=red_team_id,\n",
+ " outcome=AttackOutcome.SUCCESS,\n",
+ " last_response=MessagePiece(\n",
+ " role=\"user\",\n",
+ " original_value=\"response 3\",\n",
+ " converter_identifiers=[make_converter(\"ROT13Converter\")],\n",
+ " labels={\"operation_name\": \"op_secret_extract\", \"operator\": \"bob\"},\n",
+ " targeted_harm_categories=[\"misinformation\"],\n",
+ " ),\n",
+ " ),\n",
+ " AttackResult(\n",
+ " conversation_id=\"c4\",\n",
+ " objective=\"extract secrets\",\n",
+ " attack_identifier=red_team_id,\n",
+ " outcome=AttackOutcome.SUCCESS,\n",
+ " last_response=MessagePiece(\n",
+ " role=\"user\",\n",
+ " original_value=\"response 4\",\n",
+ " converter_identifiers=[make_converter(\"ROT13Converter\")],\n",
+ " labels={\"operation_name\": \"op_secret_extract\", \"operator\": \"bob\"},\n",
+ " targeted_harm_categories=[\"hate_speech\", \"misinformation\"],\n",
+ " ),\n",
+ " ),\n",
+ " # An undetermined result (no converter, no labels)\n",
+ " AttackResult(\n",
+ " conversation_id=\"c5\",\n",
+ " objective=\"test prompt\",\n",
+ " attack_identifier=crescendo_id,\n",
+ " outcome=AttackOutcome.UNDETERMINED,\n",
+ " ),\n",
+ "]\n",
+ "\n",
+ "print(f\"Created {len(results)} sample AttackResult objects\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "3",
+ "metadata": {},
+ "source": [
+ "## Overall Stats (No Grouping)\n",
+ "\n",
+ "Pass `group_by=[]` to compute only the overall attack success rate, with no\n",
+ "dimensional breakdown."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "4",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Overall success rate: 0.75\n",
+ " Successes: 3\n",
+ " Failures: 1\n",
+ " Undetermined: 1\n",
+ " Total decided (excl. undetermined): 4\n"
+ ]
+ }
+ ],
+ "source": [
+ "result = analyze_results(results, group_by=[])\n",
+ "\n",
+ "print(f\"Overall success rate: {result.overall.success_rate}\")\n",
+ "print(f\" Successes: {result.overall.successes}\")\n",
+ "print(f\" Failures: {result.overall.failures}\")\n",
+ "print(f\" Undetermined: {result.overall.undetermined}\")\n",
+ "print(f\" Total decided (excl. undetermined): {result.overall.total_decided}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "5",
+ "metadata": {},
+ "source": [
+ "## Group by Attack Type\n",
+ "\n",
+ "See how success rates differ across attack strategies (e.g. `crescendo` vs `red_teaming`)."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "6",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " CrescendoAttack: success_rate=0.5, successes=1, failures=1, undetermined=1\n",
+ " RedTeamingAttack: success_rate=1.0, successes=2, failures=0, undetermined=0\n"
+ ]
+ }
+ ],
+ "source": [
+ "result = analyze_results(results, group_by=[\"attack_type\"])\n",
+ "\n",
+ "for attack_type, stats in result.dimensions[\"attack_type\"].items():\n",
+ " print(\n",
+ " f\" {attack_type}: success_rate={stats.success_rate}, \"\n",
+ " f\"successes={stats.successes}, failures={stats.failures}, \"\n",
+ " f\"undetermined={stats.undetermined}\"\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7",
+ "metadata": {},
+ "source": [
+ "## Group by Converter Type\n",
+ "\n",
+ "Break down success rates by which prompt converter was applied."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "8",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " Base64Converter: success_rate=0.5, successes=1, failures=1\n",
+ " ROT13Converter: success_rate=1.0, successes=2, failures=0\n",
+ " no_converter: success_rate=None, successes=0, failures=0\n"
+ ]
+ }
+ ],
+ "source": [
+ "result = analyze_results(results, group_by=[\"converter_type\"])\n",
+ "\n",
+ "for converter, stats in result.dimensions[\"converter_type\"].items():\n",
+ " print(f\" {converter}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "9",
+ "metadata": {},
+ "source": [
+ "## Group by Label\n",
+ "\n",
+ "Labels are key=value metadata attached to messages. Each label pair becomes its own\n",
+ "grouping key."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "10",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " operation_name=op_safety_bypass: success_rate=0.5, successes=1, failures=1\n",
+ " operator=alice: success_rate=0.5, successes=1, failures=1\n",
+ " operation_name=op_secret_extract: success_rate=1.0, successes=2, failures=0\n",
+ " operator=bob: success_rate=1.0, successes=2, failures=0\n",
+ " no_labels: success_rate=None, successes=0, failures=0\n"
+ ]
+ }
+ ],
+ "source": [
+ "result = analyze_results(results, group_by=[\"label\"])\n",
+ "\n",
+ "for label_key, stats in result.dimensions[\"label\"].items():\n",
+ " print(f\" {label_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "11",
+ "metadata": {},
+ "source": [
+ "## Group by Harm Category\n",
+ "\n",
+ "Break down success rates by the targeted harm categories associated with each prompt."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "12",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " violence: success_rate=0.5, successes=1, failures=1\n",
+ " hate_speech: success_rate=1.0, successes=2, failures=0\n",
+ " misinformation: success_rate=1.0, successes=2, failures=0\n",
+ " no_harm_category: success_rate=None, successes=0, failures=0\n"
+ ]
+ }
+ ],
+ "source": [
+ "result = analyze_results(results, group_by=[\"harm_category\"])\n",
+ "\n",
+ "for harm_cat, stats in result.dimensions[\"harm_category\"].items():\n",
+ " print(f\" {harm_cat}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "13",
+ "metadata": {},
+ "source": [
+ "## Multiple Dimensions at Once\n",
+ "\n",
+ "Pass several dimension names to `group_by` for independent breakdowns in a single call."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "14",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "--- By attack_type ---\n",
+ " CrescendoAttack: success_rate=0.5\n",
+ " RedTeamingAttack: success_rate=1.0\n",
+ "\n",
+ "--- By converter_type ---\n",
+ " Base64Converter: success_rate=0.5\n",
+ " ROT13Converter: success_rate=1.0\n",
+ " no_converter: success_rate=None\n"
+ ]
+ }
+ ],
+ "source": [
+ "result = analyze_results(results, group_by=[\"attack_type\", \"converter_type\"])\n",
+ "\n",
+ "print(\"--- By attack_type ---\")\n",
+ "for key, stats in result.dimensions[\"attack_type\"].items():\n",
+ " print(f\" {key}: success_rate={stats.success_rate}\")\n",
+ "\n",
+ "print(\"\\n--- By converter_type ---\")\n",
+ "for key, stats in result.dimensions[\"converter_type\"].items():\n",
+ " print(f\" {key}: success_rate={stats.success_rate}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "15",
+ "metadata": {},
+ "source": [
+ "## Composite Dimensions\n",
+ "\n",
+ "Use a tuple of dimension names to create a cross-product grouping. For example,\n",
+ "`(\"converter_type\", \"attack_type\")` produces keys like `(\"Base64Converter\", \"CrescendoAttack\")`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "16",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " ('Base64Converter', 'CrescendoAttack'): success_rate=0.5, successes=1, failures=1\n",
+ " ('ROT13Converter', 'RedTeamingAttack'): success_rate=1.0, successes=2, failures=0\n",
+ " ('no_converter', 'CrescendoAttack'): success_rate=None, successes=0, failures=0\n"
+ ]
+ }
+ ],
+ "source": [
+ "result = analyze_results(results, group_by=[(\"converter_type\", \"attack_type\")])\n",
+ "\n",
+ "for combo_key, stats in result.dimensions[(\"converter_type\", \"attack_type\")].items():\n",
+ " print(f\" {combo_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "17",
+ "metadata": {},
+ "source": [
+ "## Custom Dimensions\n",
+ "\n",
+ "Supply your own extractor function via `custom_dimensions`. An extractor takes an\n",
+ "`AttackResult` and returns a `list[str]` of dimension values. Here we group by the\n",
+ "attack objective."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "18",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ " bypass safety filter: success_rate=0.5, successes=1, failures=1\n",
+ " extract secrets: success_rate=1.0, successes=2, failures=0\n",
+ " test prompt: success_rate=None, successes=0, failures=0\n"
+ ]
+ }
+ ],
+ "source": [
+ "def extract_objective(attack: AttackResult) -> list[str]:\n",
+ " return [attack.objective]\n",
+ "\n",
+ "\n",
+ "result = analyze_results(\n",
+ " results,\n",
+ " group_by=[\"objective\"],\n",
+ " custom_dimensions={\"objective\": extract_objective},\n",
+ ")\n",
+ "\n",
+ "for objective, stats in result.dimensions[\"objective\"].items():\n",
+ " print(f\" {objective}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "19",
+ "metadata": {},
+ "source": [
+ "## Default Behavior\n",
+ "\n",
+ "When `group_by` is omitted, `analyze_results` groups by **all** registered\n",
+ "dimensions: `attack_type`, `converter_type`, and `label`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "20",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Dimensions returned: ['attack_type', 'converter_type', 'label']\n",
+ "Overall success rate: 0.75\n"
+ ]
+ }
+ ],
+ "source": [
+ "result = analyze_results(results)\n",
+ "\n",
+ "print(f\"Dimensions returned: {list(result.dimensions.keys())}\")\n",
+ "print(f\"Overall success rate: {result.overall.success_rate}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "21",
+ "metadata": {},
+ "source": [
+ "## Export to DataFrame\n",
+ "\n",
+ "Use the `to_dataframe()` method to export analysis results as a pandas DataFrame for further\n",
+ "analysis or visualization. Pass a dimension name to export a specific breakdown, or `None`\n",
+ "to export all dimensions in long-form."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "22",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "--- Harm Category DataFrame ---\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "
\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " | \n",
+ " dimension | \n",
+ " key | \n",
+ " successes | \n",
+ " failures | \n",
+ " undetermined | \n",
+ " total_decided | \n",
+ " success_rate | \n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " | 0 | \n",
+ " harm_category | \n",
+ " violence | \n",
+ " 1 | \n",
+ " 1 | \n",
+ " 0 | \n",
+ " 2 | \n",
+ " 0.5 | \n",
+ "
\n",
+ " \n",
+ " | 1 | \n",
+ " harm_category | \n",
+ " hate_speech | \n",
+ " 2 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 2 | \n",
+ " 1.0 | \n",
+ "
\n",
+ " \n",
+ " | 2 | \n",
+ " harm_category | \n",
+ " misinformation | \n",
+ " 2 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 2 | \n",
+ " 1.0 | \n",
+ "
\n",
+ " \n",
+ " | 3 | \n",
+ " harm_category | \n",
+ " no_harm_category | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 1 | \n",
+ " 0 | \n",
+ " NaN | \n",
+ "
\n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " dimension key successes failures undetermined \\\n",
+ "0 harm_category violence 1 1 0 \n",
+ "1 harm_category hate_speech 2 0 0 \n",
+ "2 harm_category misinformation 2 0 0 \n",
+ "3 harm_category no_harm_category 0 0 1 \n",
+ "\n",
+ " total_decided success_rate \n",
+ "0 2 0.5 \n",
+ "1 2 1.0 \n",
+ "2 2 1.0 \n",
+ "3 0 NaN "
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "--- Attack Type DataFrame ---\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " | \n",
+ " dimension | \n",
+ " key | \n",
+ " successes | \n",
+ " failures | \n",
+ " undetermined | \n",
+ " total_decided | \n",
+ " success_rate | \n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " | 0 | \n",
+ " attack_type | \n",
+ " CrescendoAttack | \n",
+ " 1 | \n",
+ " 1 | \n",
+ " 1 | \n",
+ " 2 | \n",
+ " 0.5 | \n",
+ "
\n",
+ " \n",
+ " | 1 | \n",
+ " attack_type | \n",
+ " RedTeamingAttack | \n",
+ " 2 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 2 | \n",
+ " 1.0 | \n",
+ "
\n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " dimension key successes failures undetermined \\\n",
+ "0 attack_type CrescendoAttack 1 1 1 \n",
+ "1 attack_type RedTeamingAttack 2 0 0 \n",
+ "\n",
+ " total_decided success_rate \n",
+ "0 2 0.5 \n",
+ "1 2 1.0 "
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "\n",
+ "--- All Dimensions DataFrame ---\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " | \n",
+ " dimension | \n",
+ " key | \n",
+ " successes | \n",
+ " failures | \n",
+ " undetermined | \n",
+ " total_decided | \n",
+ " success_rate | \n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " | 0 | \n",
+ " overall | \n",
+ " all | \n",
+ " 3 | \n",
+ " 1 | \n",
+ " 1 | \n",
+ " 4 | \n",
+ " 0.75 | \n",
+ "
\n",
+ " \n",
+ " | 1 | \n",
+ " harm_category | \n",
+ " violence | \n",
+ " 1 | \n",
+ " 1 | \n",
+ " 0 | \n",
+ " 2 | \n",
+ " 0.50 | \n",
+ "
\n",
+ " \n",
+ " | 2 | \n",
+ " harm_category | \n",
+ " hate_speech | \n",
+ " 2 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 2 | \n",
+ " 1.00 | \n",
+ "
\n",
+ " \n",
+ " | 3 | \n",
+ " harm_category | \n",
+ " misinformation | \n",
+ " 2 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 2 | \n",
+ " 1.00 | \n",
+ "
\n",
+ " \n",
+ " | 4 | \n",
+ " harm_category | \n",
+ " no_harm_category | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 1 | \n",
+ " 0 | \n",
+ " NaN | \n",
+ "
\n",
+ " \n",
+ " | 5 | \n",
+ " attack_type | \n",
+ " CrescendoAttack | \n",
+ " 1 | \n",
+ " 1 | \n",
+ " 1 | \n",
+ " 2 | \n",
+ " 0.50 | \n",
+ "
\n",
+ " \n",
+ " | 6 | \n",
+ " attack_type | \n",
+ " RedTeamingAttack | \n",
+ " 2 | \n",
+ " 0 | \n",
+ " 0 | \n",
+ " 2 | \n",
+ " 1.00 | \n",
+ "
\n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " dimension key successes failures undetermined \\\n",
+ "0 overall all 3 1 1 \n",
+ "1 harm_category violence 1 1 0 \n",
+ "2 harm_category hate_speech 2 0 0 \n",
+ "3 harm_category misinformation 2 0 0 \n",
+ "4 harm_category no_harm_category 0 0 1 \n",
+ "5 attack_type CrescendoAttack 1 1 1 \n",
+ "6 attack_type RedTeamingAttack 2 0 0 \n",
+ "\n",
+ " total_decided success_rate \n",
+ "0 4 0.75 \n",
+ "1 2 0.50 \n",
+ "2 2 1.00 \n",
+ "3 2 1.00 \n",
+ "4 0 NaN \n",
+ "5 2 0.50 \n",
+ "6 2 1.00 "
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "from IPython.display import display\n",
+ "\n",
+ "result = analyze_results(results, group_by=[\"harm_category\", \"attack_type\"])\n",
+ "\n",
+ "# Export a single dimension as a table\n",
+ "print(\"--- Harm Category DataFrame ---\")\n",
+ "df_harm = result.to_dataframe(dimension=\"harm_category\")\n",
+ "display(df_harm)\n",
+ "\n",
+ "# Export a single dimension as a table\n",
+ "print(\"--- Attack Type DataFrame ---\")\n",
+ "df_attack = result.to_dataframe(dimension=\"attack_type\")\n",
+ "display(df_attack)\n",
+ "\n",
+ "# Export all dimensions in long-form as a table\n",
+ "print(\"\\n--- All Dimensions DataFrame ---\")\n",
+ "df_all = result.to_dataframe()\n",
+ "display(df_all)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "265c9e56",
+ "metadata": {},
+ "source": [
+ "## Interactive HTML Report\n",
+ "\n",
+ "Use `save_html()` to generate a fully interactive HTML report with:\n",
+ "- KPI summary cards\n",
+ "- A dimension selector dropdown to switch between harm_category, attack_type, etc.\n",
+ "- Cross-dimensional heatmaps\n",
+ "- Data coverage table\n",
+ "\n",
+ "The report is self-contained and can be opened in any browser."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "ba3ecf2f",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Report saved to: attack_analysis_report.html\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ " \n",
+ " "
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "from pyrit.analytics import save_html\n",
+ "from IPython.display import IFrame\n",
+ "\n",
+ "# Run analysis with multiple dimensions for rich visualization\n",
+ "result = analyze_results(\n",
+ " results,\n",
+ " group_by=[\n",
+ " \"harm_category\",\n",
+ " \"attack_type\",\n",
+ " \"converter_type\",\n",
+ " \"label\",\n",
+ " (\"harm_category\", \"attack_type\"),\n",
+ " (\"harm_category\", \"converter_type\"),\n",
+ " ],\n",
+ ")\n",
+ "\n",
+ "# Save the interactive HTML report\n",
+ "report_path = save_html(result, \"attack_analysis_report.html\", title=\"Attack Analysis Report\")\n",
+ "print(f\"Report saved to: {report_path}\")\n",
+ "\n",
+ "# Display the report inline (works in Jupyter notebooks)\n",
+ "IFrame(src=str(report_path), width=\"100%\", height=800)"
+ ]
+ }
+ ],
+ "metadata": {
+ "jupytext": {
+ "cell_metadata_filter": "-all"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.11.13"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/doc/code/analytics/1_result_analysis.py b/doc/code/analytics/1_result_analysis.py
new file mode 100644
index 0000000000..9c68083259
--- /dev/null
+++ b/doc/code/analytics/1_result_analysis.py
@@ -0,0 +1,295 @@
+# ---
+# jupyter:
+# jupytext:
+# cell_metadata_filter: -all
+# text_representation:
+# extension: .py
+# format_name: percent
+# format_version: '1.3'
+# jupytext_version: 1.19.0
+# kernelspec:
+# display_name: pyrit-dev
+# language: python
+# name: python3
+# ---
+
+# %% [markdown]
+# # Result Analysis
+#
+# The `analyze_results` function computes attack success rates from a list of `AttackResult` objects.
+# It supports flexible grouping across built-in dimensions (`attack_type`, `converter_type`, `label`)
+# as well as composite and custom dimensions.
+
+# %% [markdown]
+# ## Setup
+#
+# First, let's create some sample `AttackResult` objects to work with.
+
+# %%
+from pyrit.analytics import analyze_results
+from pyrit.identifiers import ComponentIdentifier
+from pyrit.models import AttackOutcome, AttackResult, MessagePiece
+
+
+def make_converter(name: str) -> ComponentIdentifier:
+ return ComponentIdentifier(class_name=name, class_module="pyrit.prompt_converter")
+
+
+crescendo_id = ComponentIdentifier(class_name="CrescendoAttack", class_module="pyrit.executor.attack")
+red_team_id = ComponentIdentifier(class_name="RedTeamingAttack", class_module="pyrit.executor.attack")
+
+# Build a small set of representative attack results
+results = [
+ # Crescendo attacks with Base64Converter
+ AttackResult(
+ conversation_id="c1",
+ objective="bypass safety filter",
+ attack_identifier=crescendo_id,
+ outcome=AttackOutcome.SUCCESS,
+ last_response=MessagePiece(
+ role="user",
+ original_value="response 1",
+ converter_identifiers=[make_converter("Base64Converter")],
+ labels={"operation_name": "op_safety_bypass", "operator": "alice"},
+ targeted_harm_categories=["violence", "hate_speech"],
+ ),
+ ),
+ AttackResult(
+ conversation_id="c2",
+ objective="bypass safety filter",
+ attack_identifier=crescendo_id,
+ outcome=AttackOutcome.FAILURE,
+ last_response=MessagePiece(
+ role="user",
+ original_value="response 2",
+ converter_identifiers=[make_converter("Base64Converter")],
+ labels={"operation_name": "op_safety_bypass", "operator": "alice"},
+ targeted_harm_categories=["violence"],
+ ),
+ ),
+ # Red teaming attacks with ROT13Converter
+ AttackResult(
+ conversation_id="c3",
+ objective="extract secrets",
+ attack_identifier=red_team_id,
+ outcome=AttackOutcome.SUCCESS,
+ last_response=MessagePiece(
+ role="user",
+ original_value="response 3",
+ converter_identifiers=[make_converter("ROT13Converter")],
+ labels={"operation_name": "op_secret_extract", "operator": "bob"},
+ targeted_harm_categories=["misinformation"],
+ ),
+ ),
+ AttackResult(
+ conversation_id="c4",
+ objective="extract secrets",
+ attack_identifier=red_team_id,
+ outcome=AttackOutcome.SUCCESS,
+ last_response=MessagePiece(
+ role="user",
+ original_value="response 4",
+ converter_identifiers=[make_converter("ROT13Converter")],
+ labels={"operation_name": "op_secret_extract", "operator": "bob"},
+ targeted_harm_categories=["hate_speech", "misinformation"],
+ ),
+ ),
+ # An undetermined result (no converter, no labels)
+ AttackResult(
+ conversation_id="c5",
+ objective="test prompt",
+ attack_identifier=crescendo_id,
+ outcome=AttackOutcome.UNDETERMINED,
+ ),
+]
+
+print(f"Created {len(results)} sample AttackResult objects")
+
+# %% [markdown]
+# ## Overall Stats (No Grouping)
+#
+# Pass `group_by=[]` to compute only the overall attack success rate, with no
+# dimensional breakdown.
+
+# %%
+result = analyze_results(results, group_by=[])
+
+print(f"Overall success rate: {result.overall.success_rate}")
+print(f" Successes: {result.overall.successes}")
+print(f" Failures: {result.overall.failures}")
+print(f" Undetermined: {result.overall.undetermined}")
+print(f" Total decided (excl. undetermined): {result.overall.total_decided}")
+
+# %% [markdown]
+# ## Group by Attack Type
+#
+# See how success rates differ across attack strategies (e.g. `crescendo` vs `red_teaming`).
+
+# %%
+result = analyze_results(results, group_by=["attack_type"])
+
+for attack_type, stats in result.dimensions["attack_type"].items():
+ print(
+ f" {attack_type}: success_rate={stats.success_rate}, "
+ f"successes={stats.successes}, failures={stats.failures}, "
+ f"undetermined={stats.undetermined}"
+ )
+
+# %% [markdown]
+# ## Group by Converter Type
+#
+# Break down success rates by which prompt converter was applied.
+
+# %%
+result = analyze_results(results, group_by=["converter_type"])
+
+for converter, stats in result.dimensions["converter_type"].items():
+ print(f" {converter}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}")
+
+# %% [markdown]
+# ## Group by Label
+#
+# Labels are key=value metadata attached to messages. Each label pair becomes its own
+# grouping key.
+
+# %%
+result = analyze_results(results, group_by=["label"])
+
+for label_key, stats in result.dimensions["label"].items():
+ print(f" {label_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}")
+
+# %% [markdown]
+# ## Group by Harm Category
+#
+# Break down success rates by the targeted harm categories associated with each prompt.
+
+# %%
+result = analyze_results(results, group_by=["harm_category"])
+
+for harm_cat, stats in result.dimensions["harm_category"].items():
+ print(f" {harm_cat}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}")
+
+# %% [markdown]
+# ## Multiple Dimensions at Once
+#
+# Pass several dimension names to `group_by` for independent breakdowns in a single call.
+
+# %%
+result = analyze_results(results, group_by=["attack_type", "converter_type"])
+
+print("--- By attack_type ---")
+for key, stats in result.dimensions["attack_type"].items():
+ print(f" {key}: success_rate={stats.success_rate}")
+
+print("\n--- By converter_type ---")
+for key, stats in result.dimensions["converter_type"].items():
+ print(f" {key}: success_rate={stats.success_rate}")
+
+# %% [markdown]
+# ## Composite Dimensions
+#
+# Use a tuple of dimension names to create a cross-product grouping. For example,
+# `("converter_type", "attack_type")` produces keys like `("Base64Converter", "CrescendoAttack")`.
+
+# %%
+result = analyze_results(results, group_by=[("converter_type", "attack_type")])
+
+for combo_key, stats in result.dimensions[("converter_type", "attack_type")].items():
+ print(f" {combo_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}")
+
+# %% [markdown]
+# ## Custom Dimensions
+#
+# Supply your own extractor function via `custom_dimensions`. An extractor takes an
+# `AttackResult` and returns a `list[str]` of dimension values. Here we group by the
+# attack objective.
+
+# %%
+
+
+def extract_objective(attack: AttackResult) -> list[str]:
+ return [attack.objective]
+
+
+result = analyze_results(
+ results,
+ group_by=["objective"],
+ custom_dimensions={"objective": extract_objective},
+)
+
+for objective, stats in result.dimensions["objective"].items():
+ print(f" {objective}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}")
+
+# %% [markdown]
+# ## Default Behavior
+#
+# When `group_by` is omitted, `analyze_results` groups by **all** registered
+# dimensions: `attack_type`, `converter_type`, and `label`.
+
+# %%
+result = analyze_results(results)
+
+print(f"Dimensions returned: {list(result.dimensions.keys())}")
+print(f"Overall success rate: {result.overall.success_rate}")
+
+# %% [markdown]
+# ## Export to DataFrame
+#
+# Use the `to_dataframe()` method to export analysis results as a pandas DataFrame for further
+# analysis or visualization. Pass a dimension name to export a specific breakdown, or `None`
+# to export all dimensions in long-form.
+
+# %%
+from IPython.display import display
+
+result = analyze_results(results, group_by=["harm_category", "attack_type"])
+
+# Export a single dimension as a table
+print("--- Harm Category DataFrame ---")
+df_harm = result.to_dataframe(dimension="harm_category")
+display(df_harm)
+
+# Export a single dimension as a table
+print("--- Attack Type DataFrame ---")
+df_attack = result.to_dataframe(dimension="attack_type")
+display(df_attack)
+
+# Export all dimensions in long-form as a table
+print("\n--- All Dimensions DataFrame ---")
+df_all = result.to_dataframe()
+display(df_all)
+
+# %% [markdown]
+# ## Interactive HTML Report
+#
+# Use `save_html()` to generate a fully interactive HTML report with:
+# - KPI summary cards
+# - A dimension selector dropdown to switch between harm_category, attack_type, etc.
+# - Cross-dimensional heatmaps
+# - Data coverage table
+#
+# The report is self-contained and can be opened in any browser.
+
+# %%
+from pyrit.analytics import save_html
+from IPython.display import IFrame
+
+# Run analysis with multiple dimensions for rich visualization
+result = analyze_results(
+ results,
+ group_by=[
+ "harm_category",
+ "attack_type",
+ "converter_type",
+ "label",
+ ("harm_category", "attack_type"),
+ ("harm_category", "converter_type"),
+ ],
+)
+
+# Save the interactive HTML report
+report_path = save_html(result, "attack_analysis_report.html", title="Attack Analysis Report")
+print(f"Report saved to: {report_path}")
+
+# Display the report inline (works in Jupyter notebooks)
+IFrame(src=str(report_path), width="100%", height=800)
diff --git a/pyrit/analytics/__init__.py b/pyrit/analytics/__init__.py
index f75d401dd7..04b329df05 100644
--- a/pyrit/analytics/__init__.py
+++ b/pyrit/analytics/__init__.py
@@ -4,18 +4,27 @@
"""Analytics module for PyRIT conversation and result analysis."""
from pyrit.analytics.conversation_analytics import ConversationAnalytics
-from pyrit.analytics.result_analysis import AttackStats, analyze_results
+from pyrit.analytics.result_analysis import (
+ AnalysisResult,
+ AttackStats,
+ DimensionExtractor,
+ analyze_results,
+)
from pyrit.analytics.text_matching import (
ApproximateTextMatching,
ExactTextMatching,
TextMatching,
)
+from pyrit.analytics.visualization import save_html
__all__ = [
"analyze_results",
+ "AnalysisResult",
"ApproximateTextMatching",
"AttackStats",
"ConversationAnalytics",
+ "DimensionExtractor",
"ExactTextMatching",
+ "save_html",
"TextMatching",
]
diff --git a/pyrit/analytics/result_analysis.py b/pyrit/analytics/result_analysis.py
index cc5f58fa70..1cb7947ec7 100644
--- a/pyrit/analytics/result_analysis.py
+++ b/pyrit/analytics/result_analysis.py
@@ -1,13 +1,26 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
+import warnings
from collections import defaultdict
-from dataclasses import dataclass
-from typing import Optional
+from collections.abc import Callable
+from dataclasses import dataclass, field
+from itertools import product
+from typing import Optional, Union
from pyrit.models import AttackOutcome, AttackResult
+# ---------------------------------------------------------------------------
+# Type alias for dimension extractors.
+# An extractor receives an AttackResult and returns a list of string keys
+# (list to support one-to-many mappings, e.g. multiple converters per attack).
+# ---------------------------------------------------------------------------
+DimensionExtractor = Callable[[AttackResult], list[str]]
+
+# ---------------------------------------------------------------------------
+# Data classes
+# ---------------------------------------------------------------------------
@dataclass
class AttackStats:
"""Statistics for attack analysis results."""
@@ -19,7 +32,206 @@ class AttackStats:
undetermined: int
-def _compute_stats(successes: int, failures: int, undetermined: int) -> AttackStats:
+@dataclass
+class AnalysisResult:
+ """
+ Structured result from attack analysis.
+
+ Attributes:
+ overall (AttackStats): Aggregate stats across all attack results.
+ dimensions (dict): Per-dimension breakdown. Keys are dimension names
+ (str) for single dimensions, or tuples of dimension names for
+ composite groupings. Values map dimension keys to AttackStats.
+ """
+
+ overall: AttackStats
+ dimensions: dict[Union[str, tuple[str, ...]], dict[Union[str, tuple[str, ...]], AttackStats]] = field(
+ default_factory=dict
+ )
+
+ def to_dataframe(
+ self,
+ dimension: Optional[Union[str, tuple[str, ...]]] = None,
+ ) -> "pandas.DataFrame": # type: ignore[name-defined] # noqa: F821
+ """
+ Export analysis results as a pandas DataFrame.
+
+ When *dimension* is provided, only that dimension's breakdown is
+ returned. For composite dimensions the tuple keys are exploded into
+ individual columns. When *dimension* is ``None``, all dimensions and
+ the overall stats are returned in a single long-form DataFrame with a
+ ``dimension`` column.
+
+ Args:
+ dimension (str | tuple[str, ...] | None): The dimension to export.
+ Pass a string for a single dimension (e.g. ``"harm_category"``),
+ a tuple for a composite dimension (e.g.
+ ``("harm_category", "attack_type")``), or ``None`` to export
+ everything. Defaults to ``None``.
+
+ Returns:
+ pandas.DataFrame: A DataFrame with columns for dimension key(s)
+ and stats (``successes``, ``failures``, ``undetermined``,
+ ``total_decided``, ``success_rate``).
+
+ Raises:
+ ImportError: If pandas is not installed.
+ KeyError: If the requested dimension is not in the results.
+ """
+ try:
+ import pandas as pd
+ except ImportError as err:
+ raise ImportError("pandas is required for to_dataframe(). Install it with: pip install pandas") from err
+
+ stats_columns = ["successes", "failures", "undetermined", "total_decided", "success_rate"]
+
+ def _stats_row(stats: AttackStats) -> dict[str, object]:
+ return {
+ "successes": stats.successes,
+ "failures": stats.failures,
+ "undetermined": stats.undetermined,
+ "total_decided": stats.total_decided,
+ "success_rate": stats.success_rate,
+ }
+
+ def _dim_rows(
+ dim_name: Union[str, tuple[str, ...]],
+ dim_data: dict[Union[str, tuple[str, ...]], AttackStats],
+ ) -> list[dict[str, object]]:
+ rows = []
+ for key, stats in dim_data.items():
+ row: dict[str, object]
+ if isinstance(dim_name, tuple):
+ # Explode composite key into individual columns
+ row = dict(zip(dim_name, key, strict=True))
+ else:
+ row = {"dimension": dim_name, "key": key}
+ row.update(_stats_row(stats))
+ rows.append(row)
+ return rows
+
+ # Single dimension requested
+ if dimension is not None:
+ if dimension not in self.dimensions:
+ raise KeyError(f"Dimension {dimension!r} not found. Available: {list(self.dimensions.keys())}")
+ rows = _dim_rows(dimension, self.dimensions[dimension])
+ cols: list[str]
+ if isinstance(dimension, tuple):
+ cols = list(dimension) + stats_columns
+ else:
+ cols = ["dimension", "key"] + stats_columns
+ return pd.DataFrame(rows, columns=cols)
+
+ # All dimensions + overall
+ overall_row: dict[str, object] = {"dimension": "overall", "key": "all"}
+ overall_row.update(_stats_row(self.overall))
+ all_rows: list[dict[str, object]] = [overall_row]
+
+ for dim_name, dim_data in self.dimensions.items():
+ if isinstance(dim_name, tuple):
+ # Composite dimensions: flatten as "dim1 × dim2" in the dimension column
+ label = " \u00d7 ".join(dim_name)
+ for key, stats in dim_data.items():
+ row: dict[str, object] = {"dimension": label, "key": " \u00d7 ".join(str(k) for k in key)}
+ row.update(_stats_row(stats))
+ all_rows.append(row)
+ else:
+ all_rows.extend(_dim_rows(dim_name, dim_data))
+
+ return pd.DataFrame(all_rows, columns=["dimension", "key"] + stats_columns)
+
+
+# ---------------------------------------------------------------------------
+# Built-in dimension extractors
+# ---------------------------------------------------------------------------
+def _extract_attack_type(result: AttackResult) -> list[str]:
+ """
+ Extract the attack type from the attack identifier.
+
+ Reads the ``class_name`` attribute from the ComponentIdentifier.
+
+ Returns:
+ list[str]: A single-element list containing the attack type.
+ """
+ return [result.attack_identifier.class_name if result.attack_identifier else "unknown"]
+
+
+def _extract_converter_types(result: AttackResult) -> list[str]:
+ """
+ Extract converter class names from the last response.
+
+ Returns:
+ list[str]: Converter class names, or ``["no_converter"]`` if none.
+ """
+ if result.last_response is not None and result.last_response.converter_identifiers:
+ return [conv.class_name for conv in result.last_response.converter_identifiers]
+ return ["no_converter"]
+
+
+def _extract_labels(result: AttackResult) -> list[str]:
+ """
+ Extract label key=value pairs from the last response.
+
+ Returns:
+ list[str]: Label strings as ``"key=value"``, or ``["no_labels"]`` if none.
+ """
+ if result.last_response is not None and result.last_response.labels:
+ return [f"{k}={v}" for k, v in result.last_response.labels.items()]
+ return ["no_labels"]
+
+
+def _extract_harm_categories(result: AttackResult) -> list[str]:
+ """
+ Extract targeted harm categories from the last response.
+
+ Returns:
+ list[str]: Harm category strings, or ``["no_harm_category"]`` if none.
+ """
+ if result.last_response is not None and result.last_response.targeted_harm_categories:
+ return result.last_response.targeted_harm_categories
+ return ["no_harm_category"]
+
+
+DEFAULT_DIMENSIONS: dict[str, DimensionExtractor] = {
+ "attack_type": _extract_attack_type,
+ "converter_type": _extract_converter_types,
+ "harm_category": _extract_harm_categories,
+ "label": _extract_labels,
+}
+
+# Deprecated aliases — maps old name to canonical name.
+# Using the old name emits a DeprecationWarning.
+_DEPRECATED_DIMENSION_ALIASES: dict[str, str] = {
+ "attack_identifier": "attack_type",
+}
+
+
+# ---------------------------------------------------------------------------
+# Internal helpers
+# ---------------------------------------------------------------------------
+_OUTCOME_KEYS: dict[AttackOutcome, str] = {
+ AttackOutcome.SUCCESS: "successes",
+ AttackOutcome.FAILURE: "failures",
+}
+
+
+def _outcome_key(outcome: AttackOutcome) -> str:
+ """
+ Map an AttackOutcome to its counter key.
+
+ Returns:
+ str: The counter key (``"successes"``, ``"failures"``, or ``"undetermined"``).
+ """
+ return _OUTCOME_KEYS.get(outcome, "undetermined")
+
+
+def _compute_stats(*, successes: int, failures: int, undetermined: int) -> AttackStats:
+ """
+ Compute AttackStats from raw counts.
+
+ Returns:
+ AttackStats: The computed statistics.
+ """
total_decided = successes + failures
success_rate = successes / total_decided if total_decided > 0 else None
return AttackStats(
@@ -31,65 +243,162 @@ def _compute_stats(successes: int, failures: int, undetermined: int) -> AttackSt
)
-def analyze_results(attack_results: list[AttackResult]) -> dict[str, AttackStats | dict[str, AttackStats]]:
+def _build_stats(counts: defaultdict[str, int]) -> AttackStats:
"""
- Analyze a list of AttackResult objects and return overall and grouped statistics.
+ Build AttackStats from a counter dict.
Returns:
- A dictionary of AttackStats objects. The overall stats are accessible with the key
- "Overall", and the stats of any attack can be retrieved using "By_attack_identifier"
- followed by the identifier of the attack.
+ AttackStats: The computed statistics.
+ """
+ return _compute_stats(
+ successes=counts["successes"],
+ failures=counts["failures"],
+ undetermined=counts["undetermined"],
+ )
+
+
+def _resolve_dimension_name(*, name: str, extractors: dict[str, DimensionExtractor]) -> str:
+ """
+ Resolve a single dimension name, handling deprecated aliases.
+
+ Returns:
+ str: The canonical dimension name.
Raises:
- ValueError: if attack_results is empty.
- TypeError: if any element is not an AttackResult.
+ ValueError: If the dimension name is unknown.
+ """
+ if name in extractors:
+ return name
+ canonical = _DEPRECATED_DIMENSION_ALIASES.get(name)
+ if canonical and canonical in extractors:
+ warnings.warn(
+ f"Dimension '{name}' is deprecated and will be removed in v0.13.0. Use '{canonical}' instead.",
+ DeprecationWarning,
+ stacklevel=4,
+ )
+ return canonical
+ raise ValueError(f"Unknown dimension '{name}'. Available: {sorted(extractors.keys())}")
+
- Example:
- >>> analyze_results(attack_results)
- {
- "Overall": AttackStats,
- "By_attack_identifier": dict[str, AttackStats]
- }
+def _resolve_dimension_spec(
+ *, spec: Union[str, tuple[str, ...]], extractors: dict[str, DimensionExtractor]
+) -> Union[str, tuple[str, ...]]:
+ """
+ Resolve a group_by spec (single or composite), handling deprecated aliases.
+
+ Returns:
+ Union[str, tuple[str, ...]]: The resolved spec with canonical dimension names.
+ """
+ if isinstance(spec, str):
+ return _resolve_dimension_name(name=spec, extractors=extractors)
+ return tuple(_resolve_dimension_name(name=n, extractors=extractors) for n in spec)
+
+
+# ---------------------------------------------------------------------------
+# Public API
+# ---------------------------------------------------------------------------
+def analyze_results(
+ attack_results: list[AttackResult],
+ *,
+ group_by: list[Union[str, tuple[str, ...]]] | None = None,
+ custom_dimensions: dict[str, DimensionExtractor] | None = None,
+) -> AnalysisResult:
+ """
+ Analyze attack results with flexible, dimension-based grouping.
+
+ Computes overall stats and breaks down results by one or more dimensions.
+ Dimensions can be single (e.g. ``"converter_type"``) or composite tuples
+ (e.g. ``("converter_type", "attack_type")``) for cross-dimensional
+ grouping.
+
+ Args:
+ attack_results (list[AttackResult]): The attack results to analyze.
+ group_by (list[str | tuple[str, ...]] | None): Dimensions to group by.
+ Each element is either a dimension name (str) for independent
+ grouping, or a tuple of dimension names for composite grouping.
+ Defaults to all registered single dimensions.
+ custom_dimensions (dict[str, DimensionExtractor] | None): Additional
+ or overriding dimension extractors keyed by name. Merged with
+ built-in defaults.
+
+ Returns:
+ AnalysisResult: Overall stats and per-dimension breakdowns.
+
+ Raises:
+ ValueError: If attack_results is empty or a dimension name is unknown.
+ TypeError: If any element is not an AttackResult.
+
+ Examples:
+ Group by a single built-in dimension::
+
+ result = analyze_results(attacks, group_by=["attack_type"])
+ for name, stats in result.dimensions["attack_type"].items():
+ print(f"{name}: {stats.success_rate}")
+
+ Group by a composite (cross-product) of two dimensions::
+
+ result = analyze_results(
+ attacks,
+ group_by=[("converter_type", "attack_type")],
+ )
+
+ Supply a custom dimension extractor::
+
+ def by_objective(r: AttackResult) -> list[str]:
+ return [r.objective]
+
+ result = analyze_results(
+ attacks,
+ group_by=["objective"],
+ custom_dimensions={"objective": by_objective},
+ )
"""
if not attack_results:
raise ValueError("attack_results cannot be empty")
+ # Merge extractors
+ extractors = dict(DEFAULT_DIMENSIONS)
+ if custom_dimensions:
+ extractors.update(custom_dimensions)
+
+ # Resolve group_by — default to every registered dimension independently
+ if group_by is None:
+ group_by = list(extractors.keys())
+
+ # Resolve deprecated aliases and validate dimension names
+ group_by = [_resolve_dimension_spec(spec=spec, extractors=extractors) for spec in group_by]
+
+ # Accumulators
overall_counts: defaultdict[str, int] = defaultdict(int)
- by_type_counts: defaultdict[str, defaultdict[str, int]] = defaultdict(lambda: defaultdict(int))
+ dim_counts: dict[
+ Union[str, tuple[str, ...]],
+ defaultdict[Union[str, tuple[str, ...]], defaultdict[str, int]],
+ ] = {spec: defaultdict(lambda: defaultdict(int)) for spec in group_by}
+ # Single pass over results
for attack in attack_results:
if not isinstance(attack, AttackResult):
raise TypeError(f"Expected AttackResult, got {type(attack).__name__}: {attack!r}")
- outcome = attack.outcome
- attack_type = attack.attack_identifier.class_name if attack.attack_identifier else "unknown"
-
- if outcome == AttackOutcome.SUCCESS:
- overall_counts["successes"] += 1
- by_type_counts[attack_type]["successes"] += 1
- elif outcome == AttackOutcome.FAILURE:
- overall_counts["failures"] += 1
- by_type_counts[attack_type]["failures"] += 1
- else:
- overall_counts["undetermined"] += 1
- by_type_counts[attack_type]["undetermined"] += 1
-
- overall_stats = _compute_stats(
- successes=overall_counts["successes"],
- failures=overall_counts["failures"],
- undetermined=overall_counts["undetermined"],
- )
+ key = _outcome_key(attack.outcome)
+ overall_counts[key] += 1
- by_type_stats = {
- attack_type: _compute_stats(
- successes=counts["successes"],
- failures=counts["failures"],
- undetermined=counts["undetermined"],
- )
- for attack_type, counts in by_type_counts.items()
- }
+ for spec in group_by:
+ if isinstance(spec, str):
+ for dim_value in extractors[spec](attack):
+ dim_counts[spec][dim_value][key] += 1
+ else:
+ # Composite: cross-product of all sub-dimension values
+ sub_values = [extractors[name](attack) for name in spec]
+ for combo in product(*sub_values):
+ dim_counts[spec][combo][key] += 1
+
+ # Build result
+ dimension_stats: dict[Union[str, tuple[str, ...]], dict[Union[str, tuple[str, ...]], AttackStats]] = {}
+ for spec, counts_by_key in dim_counts.items():
+ dimension_stats[spec] = {dim_key: _build_stats(counts) for dim_key, counts in counts_by_key.items()}
- return {
- "Overall": overall_stats,
- "By_attack_identifier": by_type_stats,
- }
+ return AnalysisResult(
+ overall=_build_stats(overall_counts),
+ dimensions=dimension_stats,
+ )
diff --git a/pyrit/analytics/visualization.py b/pyrit/analytics/visualization.py
new file mode 100644
index 0000000000..f1370faa52
--- /dev/null
+++ b/pyrit/analytics/visualization.py
@@ -0,0 +1,720 @@
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT license.
+
+"""
+Visualization utilities for analytics results.
+
+This module provides interactive HTML report generation for ``AnalysisResult``
+using Plotly. Plotly is lazy-imported so the core analytics module has no
+hard dependency on it. If plotly is not installed, a clear error is raised
+at call time.
+"""
+
+from __future__ import annotations
+
+from pathlib import Path
+from typing import TYPE_CHECKING, Any, Optional, Union
+
+if TYPE_CHECKING:
+ from pyrit.analytics.result_analysis import AnalysisResult, AttackStats
+
+# ---------------------------------------------------------------------------
+# Constants
+# ---------------------------------------------------------------------------
+
+_ASR_COLORSCALE: list[list[object]] = [
+ [0.0, "#e74c3c"],
+ [0.5, "#f39c12"],
+ [1.0, "#2ecc71"],
+]
+
+_HTML_TEMPLATE = """\
+
+
+
+
+ {title}
+
+
+
+
+{title}
+{body}
+
+
+"""
+
+_DEFAULT_COMPOSITE_DIMS: list[tuple[str, str]] = [
+ ("harm_category", "attack_type"),
+ ("harm_category", "converter_type"),
+ ("attack_type", "converter_type"),
+]
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _import_plotly() -> tuple[Any, Any]:
+ """
+ Lazy-import plotly and return (graph_objects, io) modules.
+
+ Returns:
+ tuple: The ``plotly.graph_objects`` and ``plotly.io`` modules.
+
+ Raises:
+ ImportError: If plotly is not installed.
+ """
+ try:
+ import plotly.graph_objects as go
+ import plotly.io as pio
+
+ return go, pio
+ except ImportError as err:
+ raise ImportError("plotly is required for HTML report features. Install it with: pip install plotly") from err
+
+
+def _asr_css_class(rate: Optional[float]) -> str:
+ """
+ Return a CSS class name for colour-coding a success rate.
+
+ Returns:
+ str: ``"green"``, ``"yellow"``, ``"red"``, or ``""`` when unknown.
+ """
+ if rate is None:
+ return ""
+ if rate >= 0.6:
+ return "green"
+ if rate >= 0.3:
+ return "yellow"
+ return "red"
+
+
+def _asr_bar_color(rate: Optional[float]) -> str:
+ """
+ Return an RGB colour string for a bar based on success rate.
+
+ Returns:
+ str: CSS colour string.
+ """
+ if rate is None:
+ return "rgba(180,180,180,0.5)"
+ if rate >= 0.6:
+ return "#2ecc71"
+ if rate >= 0.3:
+ return "#f39c12"
+ return "#e74c3c"
+
+
+def _is_sparse(
+ dim_data: dict[Any, AttackStats],
+ *,
+ threshold: float = 0.5,
+) -> bool:
+ """
+ Return True when more than *threshold* fraction of cells lack decided data.
+
+ Returns:
+ bool: True if the dimension data is too sparse to display.
+ """
+ if not dim_data:
+ return True
+ none_count = sum(1 for s in dim_data.values() if s.success_rate is None)
+ return none_count / len(dim_data) > threshold
+
+
+# ---------------------------------------------------------------------------
+# Section builders
+# ---------------------------------------------------------------------------
+
+
+def _build_summary_html(*, result: AnalysisResult, title: str) -> str:
+ """
+ Build the top-level KPI summary card as an HTML string.
+
+ Returns:
+ str: HTML string for the summary card.
+ """
+ o = result.overall
+ rate_str = f"{o.success_rate:.0%}" if o.success_rate is not None else "N/A"
+ cls = _asr_css_class(o.success_rate)
+ total = o.successes + o.failures + o.undetermined
+ kpis = [
+ (rate_str, "Overall ASR", cls),
+ (str(total), "Total Attacks", ""),
+ (str(o.successes), "Successes", "green"),
+ (str(o.failures), "Failures", "red"),
+ (str(o.undetermined), "Undetermined", ""),
+ ]
+ kpi_html = "".join(
+ f''
+ for v, lbl, c in kpis
+ )
+ return f''
+
+
+def _build_bar_figure(
+ go: Any,
+ *,
+ dim_name: str,
+ dim_data: dict[Any, AttackStats],
+) -> Any:
+ """
+ Build a sorted horizontal bar chart of success rates for one dimension.
+
+ Returns:
+ plotly.graph_objects.Figure: The bar chart figure.
+ """
+ items = sorted(
+ dim_data.items(),
+ key=lambda kv: kv[1].success_rate if kv[1].success_rate is not None else -1,
+ reverse=True,
+ )
+ labels = [str(k) for k, _ in items]
+ rates = [s.success_rate if s.success_rate is not None else 0.0 for _, s in items]
+ colors = [_asr_bar_color(s.success_rate) for _, s in items]
+ hover = [
+ (
+ f"{k}
ASR: {s.success_rate:.1%}
✓ {s.successes} ✗ {s.failures} ? {s.undetermined}"
+ if s.success_rate is not None
+ else f"{k}
No decided outcomes ? {s.undetermined}"
+ )
+ for k, s in items
+ ]
+ text = [f"{r:.0%}" if s.success_rate is not None else "—" for (_, s), r in zip(items, rates, strict=True)]
+ fig = go.Figure(
+ go.Bar(
+ x=rates,
+ y=labels,
+ orientation="h",
+ marker_color=colors,
+ hovertemplate="%{customdata}",
+ customdata=hover,
+ text=text,
+ textposition="outside",
+ )
+ )
+ fig.update_layout(
+ title=f"Success Rate by {dim_name}",
+ xaxis={"title": "Success Rate", "range": [0, 1.25], "tickformat": ".0%"},
+ yaxis={"autorange": "reversed"},
+ height=max(300, len(labels) * 44 + 100),
+ margin={"l": 10, "r": 80, "t": 48, "b": 40},
+ plot_bgcolor="white",
+ paper_bgcolor="white",
+ )
+ return fig
+
+
+def _build_dropdown_bar_figure(
+ go: Any,
+ *,
+ result: "AnalysisResult",
+) -> Any:
+ """
+ Build a bar chart with a dropdown to switch between dimensions.
+
+ Returns:
+ plotly.graph_objects.Figure: The bar chart figure with dropdown.
+ """
+ single_dims = [d for d in result.dimensions if isinstance(d, str)]
+ if not single_dims:
+ return None
+
+ traces = []
+ max_labels = 0
+
+ for i, dim_name in enumerate(single_dims):
+ dim_data = result.dimensions[dim_name]
+ items = sorted(
+ dim_data.items(),
+ key=lambda kv: kv[1].success_rate if kv[1].success_rate is not None else -1,
+ reverse=True,
+ )
+ labels = [str(k) for k, _ in items]
+ max_labels = max(max_labels, len(labels))
+ rates = [s.success_rate if s.success_rate is not None else 0.0 for _, s in items]
+ colors = [_asr_bar_color(s.success_rate) for _, s in items]
+ hover = [
+ (
+ f"{k}
ASR: {s.success_rate:.1%}
✓ {s.successes} ✗ {s.failures} ? {s.undetermined}"
+ if s.success_rate is not None
+ else f"{k}
No decided outcomes ? {s.undetermined}"
+ )
+ for k, s in items
+ ]
+ text = [f"{r:.0%}" if s.success_rate is not None else "—" for (_, s), r in zip(items, rates, strict=True)]
+
+ traces.append(
+ go.Bar(
+ x=rates,
+ y=labels,
+ orientation="h",
+ marker_color=colors,
+ hovertemplate="%{customdata}",
+ customdata=hover,
+ text=text,
+ textposition="outside",
+ visible=(i == 0),
+ name=dim_name,
+ )
+ )
+
+ # Create dropdown buttons
+ buttons = []
+ for i, dim_name in enumerate(single_dims):
+ visibility = [j == i for j in range(len(single_dims))]
+ label = dim_name.replace("_", " ").title()
+ buttons.append(
+ {
+ "label": label,
+ "method": "update",
+ "args": [
+ {"visible": visibility},
+ {"title": f"Success Rate by {label}"},
+ ],
+ }
+ )
+
+ first_label = single_dims[0].replace("_", " ").title()
+ fig = go.Figure(data=traces)
+ fig.update_layout(
+ title=f"Success Rate by {first_label}",
+ xaxis={"title": "Success Rate", "range": [0, 1.25], "tickformat": ".0%"},
+ yaxis={"autorange": "reversed"},
+ height=max(350, max_labels * 44 + 120),
+ margin={"l": 10, "r": 80, "t": 80, "b": 40},
+ plot_bgcolor="white",
+ paper_bgcolor="white",
+ updatemenus=[
+ {
+ "buttons": buttons,
+ "direction": "down",
+ "showactive": True,
+ "x": 0.0,
+ "xanchor": "left",
+ "y": 1.12,
+ "yanchor": "top",
+ "bgcolor": "white",
+ "bordercolor": "#ccc",
+ }
+ ],
+ )
+ return fig
+
+
+def _build_z_matrix(
+ *,
+ row_keys: list[str],
+ col_keys: list[str],
+ lookup: dict[tuple[str, str], AttackStats],
+) -> tuple[list[list[Optional[float]]], list[list[str]]]:
+ """
+ Build the z-value and annotation matrices for a heatmap.
+
+ Returns:
+ tuple: (z matrix, text annotation matrix).
+ """
+ z: list[list[Optional[float]]] = []
+ text: list[list[str]] = []
+ for row in row_keys:
+ z_row: list[Optional[float]] = []
+ t_row: list[str] = []
+ for col in col_keys:
+ stats = lookup.get((row, col))
+ if stats and stats.success_rate is not None:
+ z_row.append(stats.success_rate)
+ t_row.append(f"{stats.success_rate:.0%}
{stats.successes}/{stats.total_decided}")
+ else:
+ z_row.append(None)
+ t_row.append("—" if not stats else f"?{stats.undetermined}")
+ z.append(z_row)
+ text.append(t_row)
+ return z, text
+
+
+def _build_heatmap_figure(
+ go: Any,
+ *,
+ dim_name: tuple[str, str],
+ dim_data: dict[Any, AttackStats],
+) -> Any:
+ """
+ Build a 2D success-rate heatmap for a composite dimension.
+
+ Returns:
+ plotly.graph_objects.Figure: The heatmap figure.
+ """
+ row_dim, col_dim = dim_name
+ row_keys = sorted({str(k[0]) for k in dim_data})
+ col_keys = sorted({str(k[1]) for k in dim_data})
+ lookup = {(str(k[0]), str(k[1])): v for k, v in dim_data.items()}
+ z, text = _build_z_matrix(row_keys=row_keys, col_keys=col_keys, lookup=lookup)
+ fig = go.Figure(
+ go.Heatmap(
+ z=z,
+ x=col_keys,
+ y=row_keys,
+ text=text,
+ texttemplate="%{text}",
+ colorscale=_ASR_COLORSCALE,
+ zmin=0,
+ zmax=1,
+ colorbar={"title": "ASR", "tickformat": ".0%"},
+ hovertemplate=f"{row_dim}: %{{y}}
{col_dim}: %{{x}}
%{{text}}",
+ )
+ )
+ fig.update_layout(
+ title=f"Success Rate: {row_dim} \u00d7 {col_dim}",
+ xaxis_title=col_dim,
+ yaxis_title=row_dim,
+ height=max(350, len(row_keys) * 54 + 130),
+ margin={"l": 10, "r": 10, "t": 54, "b": 60},
+ plot_bgcolor="white",
+ paper_bgcolor="white",
+ )
+ return fig
+
+
+def _build_dropdown_heatmap_figure(
+ go: Any,
+ *,
+ result: "AnalysisResult",
+ composite_dims: list[tuple[str, str]],
+ sparsity_threshold: float = 0.5,
+) -> Any:
+ """
+ Build a heatmap with a dropdown to switch between composite dimensions.
+
+ Returns:
+ plotly.graph_objects.Figure or None: The heatmap figure with dropdown,
+ or None if no valid composite dimensions.
+ """
+ # Filter to valid composite dimensions
+ valid_dims = []
+ for dims in composite_dims:
+ if dims in result.dimensions and not _is_sparse(
+ result.dimensions[dims], threshold=sparsity_threshold
+ ):
+ valid_dims.append(dims)
+
+ if not valid_dims:
+ return None
+
+ traces = []
+ max_rows = 0
+
+ for i, dim_name in enumerate(valid_dims):
+ dim_data = result.dimensions[dim_name]
+ row_dim, col_dim = dim_name
+ row_keys = sorted({str(k[0]) for k in dim_data})
+ col_keys = sorted({str(k[1]) for k in dim_data})
+ max_rows = max(max_rows, len(row_keys))
+ lookup = {(str(k[0]), str(k[1])): v for k, v in dim_data.items()}
+ z, text = _build_z_matrix(row_keys=row_keys, col_keys=col_keys, lookup=lookup)
+
+ traces.append(
+ go.Heatmap(
+ z=z,
+ x=col_keys,
+ y=row_keys,
+ text=text,
+ texttemplate="%{text}",
+ colorscale=_ASR_COLORSCALE,
+ zmin=0,
+ zmax=1,
+ colorbar={"title": "ASR", "tickformat": ".0%"},
+ hovertemplate=f"{row_dim}: %{{y}}
{col_dim}: %{{x}}
%{{text}}",
+ visible=(i == 0),
+ name=f"{row_dim} × {col_dim}",
+ )
+ )
+
+ # Create dropdown buttons
+ buttons = []
+ for i, dim_name in enumerate(valid_dims):
+ row_dim, col_dim = dim_name
+ visibility = [j == i for j in range(len(valid_dims))]
+ label = f"{row_dim.replace('_', ' ').title()} × {col_dim.replace('_', ' ').title()}"
+ buttons.append(
+ {
+ "label": label,
+ "method": "update",
+ "args": [
+ {"visible": visibility},
+ {
+ "title": f"Success Rate: {row_dim} × {col_dim}",
+ "xaxis.title.text": col_dim,
+ "yaxis.title.text": row_dim,
+ },
+ ],
+ }
+ )
+
+ first_row_dim, first_col_dim = valid_dims[0]
+ fig = go.Figure(data=traces)
+ fig.update_layout(
+ title=f"Success Rate: {first_row_dim} × {first_col_dim}",
+ xaxis_title=first_col_dim,
+ yaxis_title=first_row_dim,
+ height=max(400, max_rows * 54 + 150),
+ margin={"l": 10, "r": 10, "t": 80, "b": 60},
+ plot_bgcolor="white",
+ paper_bgcolor="white",
+ updatemenus=[
+ {
+ "buttons": buttons,
+ "direction": "down",
+ "showactive": True,
+ "x": 0.0,
+ "xanchor": "left",
+ "y": 1.12,
+ "yanchor": "top",
+ "bgcolor": "white",
+ "bordercolor": "#ccc",
+ }
+ ],
+ )
+ return fig
+
+
+def _build_faceted_heatmap_figure(
+ go: Any,
+ *,
+ dim_name: tuple[str, str, str],
+ dim_data: dict[Any, AttackStats],
+) -> Any:
+ """
+ Build a 3D heatmap with a dropdown to filter by the third dimension.
+
+ Rows = dim_name[0], Columns = dim_name[1], Dropdown = dim_name[2].
+
+ Returns:
+ plotly.graph_objects.Figure: The faceted heatmap figure.
+ """
+ row_dim, col_dim, facet_dim = dim_name
+ row_keys = sorted({str(k[0]) for k in dim_data})
+ col_keys = sorted({str(k[1]) for k in dim_data})
+ facet_vals = sorted({str(k[2]) for k in dim_data})
+
+ traces: list[Any] = []
+ for i, fval in enumerate(facet_vals):
+ lookup = {(str(k[0]), str(k[1])): v for k, v in dim_data.items() if str(k[2]) == fval}
+ z, text = _build_z_matrix(row_keys=row_keys, col_keys=col_keys, lookup=lookup)
+ traces.append(
+ go.Heatmap(
+ z=z,
+ x=col_keys,
+ y=row_keys,
+ text=text,
+ texttemplate="%{text}",
+ colorscale=_ASR_COLORSCALE,
+ zmin=0,
+ zmax=1,
+ visible=(i == 0),
+ name=fval,
+ colorbar={"title": "ASR", "tickformat": ".0%"},
+ hovertemplate=f"{row_dim}: %{{y}}
{col_dim}: %{{x}}
%{{text}}",
+ )
+ )
+
+ buttons = [
+ {
+ "label": fval,
+ "method": "update",
+ "args": [
+ {"visible": [j == i for j in range(len(facet_vals))]},
+ {"title": f"{row_dim} \u00d7 {col_dim} | {facet_dim}: {fval}"},
+ ],
+ }
+ for i, fval in enumerate(facet_vals)
+ ]
+ fig = go.Figure(traces)
+ fig.update_layout(
+ title=f"{row_dim} \u00d7 {col_dim} | {facet_dim}: {facet_vals[0]}",
+ xaxis_title=col_dim,
+ yaxis_title=row_dim,
+ updatemenus=[
+ {
+ "buttons": buttons,
+ "direction": "down",
+ "showactive": True,
+ "x": 0.01,
+ "xanchor": "left",
+ "y": 1.18,
+ "yanchor": "top",
+ }
+ ],
+ height=max(400, len(row_keys) * 54 + 160),
+ margin={"l": 10, "r": 10, "t": 90, "b": 60},
+ plot_bgcolor="white",
+ paper_bgcolor="white",
+ )
+ return fig
+
+
+def _build_coverage_table_figure(
+ go: Any,
+ *,
+ result: AnalysisResult,
+) -> Any:
+ """
+ Build a data coverage table showing sample sizes per dimension key.
+
+ Cells with fewer than 5 decided outcomes are highlighted in pink.
+
+ Returns:
+ plotly.graph_objects.Figure: The table figure.
+ """
+ rows: list[tuple[str, str, AttackStats]] = [("overall", "all", result.overall)]
+ for dim_name, dim_data in result.dimensions.items():
+ if isinstance(dim_name, str):
+ for key, stats in dim_data.items():
+ rows.append((dim_name, str(key), stats))
+
+ dims = [r[0] for r in rows]
+ keys = [r[1] for r in rows]
+ decided = [r[2].total_decided for r in rows]
+ undetermined = [r[2].undetermined for r in rows]
+ asr = [f"{r[2].success_rate:.0%}" if r[2].success_rate is not None else "—" for r in rows]
+ decided_colors = ["#fff5f5" if n < 5 else "white" for n in decided]
+
+ fig = go.Figure(
+ go.Table(
+ header={
+ "values": [
+ "Dimension",
+ "Key",
+ "Decided",
+ "Undetermined",
+ "ASR",
+ ],
+ "fill_color": "#f0f2f5",
+ "align": "left",
+ "font": {"size": 12, "color": "#444"},
+ "height": 36,
+ },
+ cells={
+ "values": [dims, keys, decided, undetermined, asr],
+ "fill_color": ["white", "white", decided_colors, "white", "white"],
+ "align": "left",
+ "font": {"size": 11},
+ "height": 28,
+ },
+ )
+ )
+ fig.update_layout(
+ title="Data Coverage (pink = fewer than 5 decided outcomes)",
+ height=min(600, max(300, len(rows) * 30 + 120)),
+ margin={"l": 0, "r": 0, "t": 44, "b": 10},
+ )
+ return fig
+
+
+# ---------------------------------------------------------------------------
+# Public API
+# ---------------------------------------------------------------------------
+
+
+def save_html(
+ result: AnalysisResult,
+ path: Union[str, Path],
+ *,
+ title: str = "Attack Analysis Report",
+ composite_dims: Optional[list[tuple[str, str]]] = None,
+ sparsity_threshold: float = 0.5,
+) -> Path:
+ """
+ Save a fully interactive HTML attack analysis report using Plotly.
+
+ The report contains:
+
+ * A KPI summary card (overall ASR, total attacks, outcome counts).
+ * One horizontal bar chart per single dimension in *result*.
+ * Heatmaps for each 2-tuple composite dimension (default: all three
+ combinations of ``attack_type``, ``converter_type``, and
+ ``harm_category``).
+ * A dropdown-faceted heatmap for any 3-tuple composite dimension found
+ in *result* (e.g. ``("harm_category", "converter_type", "attack_type")``).
+ * A data-coverage table flagging low sample sizes.
+
+ The output is a single self-contained ``.html`` file — no server needed.
+
+ Args:
+ result (AnalysisResult): The analysis result to report on.
+ path (str | Path): Output file path (e.g. ``"report.html"``).
+ title (str): Report title shown in the header. Defaults to
+ ``"Attack Analysis Report"``.
+ composite_dims (list[tuple[str, str]] | None): 2D heatmap pairs to
+ include. Defaults to all combinations of ``attack_type``,
+ ``converter_type``, and ``harm_category``.
+ sparsity_threshold (float): Skip a heatmap when the fraction of
+ empty cells exceeds this value. Defaults to ``0.5``.
+
+ Returns:
+ Path: The path to the saved HTML file.
+
+ Raises:
+ ImportError: If plotly is not installed.
+ """
+ go, pio = _import_plotly()
+ path = Path(path)
+ path.parent.mkdir(parents=True, exist_ok=True)
+
+ if composite_dims is None:
+ composite_dims = _DEFAULT_COMPOSITE_DIMS
+
+ def _div(fig: Any) -> str:
+ html: str = pio.to_html(fig, include_plotlyjs=False, full_html=False)
+ return html
+
+ sections: list[str] = [_build_summary_html(result=result, title=title)]
+
+ # Single-dimension bar chart with dropdown selector
+ dropdown_fig = _build_dropdown_bar_figure(go, result=result)
+ if dropdown_fig is not None:
+ sections.append(f'By Dimension
{_div(dropdown_fig)}')
+
+ # 2D heatmaps with dropdown selector
+ heatmap_parts: list[str] = []
+ dropdown_heatmap_fig = _build_dropdown_heatmap_figure(
+ go, result=result, composite_dims=composite_dims, sparsity_threshold=sparsity_threshold
+ )
+ if dropdown_heatmap_fig is not None:
+ heatmap_parts.append(_div(dropdown_heatmap_fig))
+
+ # 3-tuple faceted heatmaps
+ three_d_dims = [d for d in result.dimensions if isinstance(d, tuple) and len(d) == 3]
+ for dim_name in three_d_dims:
+ if _is_sparse(result.dimensions[dim_name], threshold=sparsity_threshold):
+ continue
+ heatmap_parts.append(
+ _div(_build_faceted_heatmap_figure(go, dim_name=dim_name, dim_data=result.dimensions[dim_name]))
+ )
+
+ if heatmap_parts:
+ sections.append(f'Cross-Dimensional Analysis
{"".join(heatmap_parts)}')
+
+ # Coverage table
+ sections.append(f'{_div(_build_coverage_table_figure(go, result=result))}
')
+
+ body = "\n".join(sections)
+ path.write_text(_HTML_TEMPLATE.format(title=title, body=body), encoding="utf-8")
+ return path
diff --git a/tests/unit/analytics/test_result_analysis.py b/tests/unit/analytics/test_result_analysis.py
index 05d1b94f0e..c55aa146fa 100644
--- a/tests/unit/analytics/test_result_analysis.py
+++ b/tests/unit/analytics/test_result_analysis.py
@@ -1,27 +1,34 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
-from typing import Optional
+import warnings
import pytest
-from pyrit.analytics.result_analysis import AttackStats, analyze_results
+from pyrit.analytics.result_analysis import (
+ AnalysisResult,
+ AttackStats,
+ analyze_results,
+)
from pyrit.identifiers import ComponentIdentifier
-from pyrit.models import AttackOutcome, AttackResult
+from pyrit.models import AttackOutcome, AttackResult, MessagePiece
-# helpers
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
def make_attack(
outcome: AttackOutcome,
- attack_type: Optional[str] = "default",
+ attack_type: str | None = "PromptSendingAttack",
conversation_id: str = "conv-1",
) -> AttackResult:
- """
- Minimal valid AttackResult for analytics tests.
- """
- attack_identifier: Optional[ComponentIdentifier] = None
+ """Minimal valid AttackResult for analytics tests."""
+ attack_identifier = None
if attack_type is not None:
- attack_identifier = ComponentIdentifier(class_name=attack_type, class_module="tests.unit.analytics")
+ attack_identifier = ComponentIdentifier(
+ class_name=attack_type,
+ class_module="pyrit.executor.attack",
+ )
return AttackResult(
conversation_id=conversation_id,
@@ -31,108 +38,729 @@ def make_attack(
)
-def test_analyze_results_empty_raises():
- with pytest.raises(ValueError):
- analyze_results([])
-
-
-def test_analyze_results_raises_on_invalid_object():
- with pytest.raises(TypeError):
- analyze_results(["not-an-AttackResult"])
-
-
-@pytest.mark.parametrize(
- "outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate",
- [
- # all successes
- ([AttackOutcome.SUCCESS, AttackOutcome.SUCCESS], 2, 0, 0, 1.0),
- # all failures
- ([AttackOutcome.FAILURE, AttackOutcome.FAILURE], 0, 2, 0, 0.0),
- # mixed decided
- ([AttackOutcome.SUCCESS, AttackOutcome.FAILURE], 1, 1, 0, 0.5),
- # include undetermined (excluded from denominator)
- ([AttackOutcome.SUCCESS, AttackOutcome.UNDETERMINED], 1, 0, 1, 1.0),
- ([AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED], 0, 1, 1, 0.0),
- # multiple with undetermined
- (
- [AttackOutcome.SUCCESS, AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED],
- 1,
- 1,
- 1,
- 0.5,
- ),
- ],
-)
-def test_overall_success_rate_parametrized(
- outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate
-):
- attacks = [make_attack(o) for o in outcomes]
- result = analyze_results(attacks)
-
- assert isinstance(result["Overall"], AttackStats)
- overall = result["Overall"]
- assert overall.successes == expected_successes
- assert overall.failures == expected_failures
- assert overall.undetermined == expected_undetermined
- assert overall.total_decided == expected_successes + expected_failures
- assert overall.success_rate == expected_rate
-
-
-@pytest.mark.parametrize(
- "items, type_key, exp_succ, exp_fail, exp_und, exp_rate",
- [
- # single type, mixed decided + undetermined
- (
- [
- (AttackOutcome.SUCCESS, "crescendo"),
- (AttackOutcome.FAILURE, "crescendo"),
- (AttackOutcome.UNDETERMINED, "crescendo"),
- ],
- "crescendo",
- 1,
- 1,
- 1,
- 0.5,
- ),
- # two types with different balances
- (
- [
- (AttackOutcome.SUCCESS, "crescendo"),
- (AttackOutcome.FAILURE, "crescendo"),
- (AttackOutcome.SUCCESS, "red_teaming"),
- (AttackOutcome.FAILURE, "red_teaming"),
- (AttackOutcome.SUCCESS, "red_teaming"),
- ],
- "red_teaming",
- 2,
- 1,
- 0,
- 2 / 3,
- ),
- # unknown type fallback (missing "type" key)
- (
- [
- (AttackOutcome.FAILURE, None),
- (AttackOutcome.UNDETERMINED, None),
- (AttackOutcome.SUCCESS, None),
+def make_converter(
+ class_name: str,
+ class_module: str = "pyrit.prompt_converter.test_converter",
+) -> ComponentIdentifier:
+ """Create a test ComponentIdentifier for converter with minimal required fields."""
+ return ComponentIdentifier(
+ class_name=class_name,
+ class_module=class_module,
+ )
+
+
+def make_attack_with_converters(
+ outcome: AttackOutcome,
+ converter_names: list[str],
+ attack_type: str = "test",
+ conversation_id: str = "conv-1",
+) -> AttackResult:
+ """Create an AttackResult with converter identifiers on last_response."""
+ converters = [make_converter(name) for name in converter_names]
+ message = MessagePiece(
+ role="user",
+ original_value="test",
+ converter_identifiers=converters,
+ )
+ attack_identifier = ComponentIdentifier(
+ class_name=attack_type,
+ class_module="pyrit.executor.attack",
+ )
+ return AttackResult(
+ conversation_id=conversation_id,
+ objective="test",
+ attack_identifier=attack_identifier,
+ outcome=outcome,
+ last_response=message,
+ )
+
+
+# ---------------------------------------------------------------------------
+# Validation
+# ---------------------------------------------------------------------------
+class TestAnalyzeResultsValidation:
+ """Input validation for analyze_results."""
+
+ def test_empty_raises(self):
+ with pytest.raises(ValueError, match="cannot be empty"):
+ analyze_results([])
+
+ def test_invalid_object_raises(self):
+ with pytest.raises(TypeError, match="Expected AttackResult"):
+ analyze_results(["not-an-AttackResult"])
+
+ def test_unknown_dimension_raises(self):
+ attacks = [make_attack(AttackOutcome.SUCCESS)]
+ with pytest.raises(ValueError, match="Unknown dimension 'nonexistent'"):
+ analyze_results(attacks, group_by=["nonexistent"])
+
+ def test_unknown_dimension_in_composite_raises(self):
+ attacks = [make_attack(AttackOutcome.SUCCESS)]
+ with pytest.raises(ValueError, match="Unknown dimension 'bad_dim'"):
+ analyze_results(attacks, group_by=[("attack_type", "bad_dim")])
+
+
+# ---------------------------------------------------------------------------
+# Overall stats
+# ---------------------------------------------------------------------------
+class TestOverallStats:
+ """Overall stats computation (no dimension breakdown)."""
+
+ @pytest.mark.parametrize(
+ "outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate",
+ [
+ ([AttackOutcome.SUCCESS, AttackOutcome.SUCCESS], 2, 0, 0, 1.0),
+ ([AttackOutcome.FAILURE, AttackOutcome.FAILURE], 0, 2, 0, 0.0),
+ ([AttackOutcome.SUCCESS, AttackOutcome.FAILURE], 1, 1, 0, 0.5),
+ ([AttackOutcome.SUCCESS, AttackOutcome.UNDETERMINED], 1, 0, 1, 1.0),
+ ([AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED], 0, 1, 1, 0.0),
+ (
+ [AttackOutcome.SUCCESS, AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED],
+ 1,
+ 1,
+ 1,
+ 0.5,
+ ),
+ ],
+ )
+ def test_overall_stats(self, outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate):
+ attacks = [make_attack(o) for o in outcomes]
+ result = analyze_results(attacks, group_by=[])
+
+ assert isinstance(result, AnalysisResult)
+ overall = result.overall
+ assert overall.successes == expected_successes
+ assert overall.failures == expected_failures
+ assert overall.undetermined == expected_undetermined
+ assert overall.total_decided == expected_successes + expected_failures
+ assert overall.success_rate == expected_rate
+
+ def test_all_undetermined_gives_none_rate(self):
+ attacks = [make_attack(AttackOutcome.UNDETERMINED)]
+ result = analyze_results(attacks, group_by=[])
+ assert result.overall.success_rate is None
+ assert result.overall.total_decided == 0
+
+
+# ---------------------------------------------------------------------------
+# Single dimension: attack_identifier
+# ---------------------------------------------------------------------------
+class TestGroupByAttackType:
+ """Group-by a single dimension: attack_type."""
+
+ @pytest.mark.parametrize(
+ "items, type_key, exp_succ, exp_fail, exp_und, exp_rate",
+ [
+ (
+ [
+ (AttackOutcome.SUCCESS, "CrescendoAttack"),
+ (AttackOutcome.FAILURE, "CrescendoAttack"),
+ (AttackOutcome.UNDETERMINED, "CrescendoAttack"),
+ ],
+ "CrescendoAttack",
+ 1,
+ 1,
+ 1,
+ 0.5,
+ ),
+ (
+ [
+ (AttackOutcome.SUCCESS, "CrescendoAttack"),
+ (AttackOutcome.FAILURE, "CrescendoAttack"),
+ (AttackOutcome.SUCCESS, "RedTeamingAttack"),
+ (AttackOutcome.FAILURE, "RedTeamingAttack"),
+ (AttackOutcome.SUCCESS, "RedTeamingAttack"),
+ ],
+ "RedTeamingAttack",
+ 2,
+ 1,
+ 0,
+ 2 / 3,
+ ),
+ (
+ [
+ (AttackOutcome.FAILURE, None),
+ (AttackOutcome.UNDETERMINED, None),
+ (AttackOutcome.SUCCESS, None),
+ ],
+ "unknown",
+ 1,
+ 1,
+ 1,
+ 0.5,
+ ),
+ ],
+ )
+ def test_single_dimension(self, items, type_key, exp_succ, exp_fail, exp_und, exp_rate):
+ attacks = [make_attack(outcome=o, attack_type=t) for (o, t) in items]
+ result = analyze_results(attacks, group_by=["attack_type"])
+
+ assert "attack_type" in result.dimensions
+ stats = result.dimensions["attack_type"][type_key]
+ assert isinstance(stats, AttackStats)
+ assert stats.successes == exp_succ
+ assert stats.failures == exp_fail
+ assert stats.undetermined == exp_und
+ assert stats.total_decided == exp_succ + exp_fail
+ assert stats.success_rate == exp_rate
+
+
+# ---------------------------------------------------------------------------
+# Single dimension: converter_type
+# ---------------------------------------------------------------------------
+class TestGroupByConverterType:
+ """Group-by a single dimension: converter_type."""
+
+ def test_no_converter_tracked(self):
+ attacks = [
+ AttackResult(
+ conversation_id="conv-1",
+ objective="test",
+ attack_identifier=ComponentIdentifier(
+ class_name="PromptSendingAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.SUCCESS,
+ last_response=None,
+ ),
+ AttackResult(
+ conversation_id="conv-2",
+ objective="test",
+ attack_identifier=ComponentIdentifier(
+ class_name="PromptSendingAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.FAILURE,
+ last_response=None,
+ ),
+ ]
+ result = analyze_results(attacks, group_by=["converter_type"])
+
+ stats = result.dimensions["converter_type"]["no_converter"]
+ assert stats.successes == 1
+ assert stats.failures == 1
+ assert stats.success_rate == 0.5
+
+ def test_multiple_converter_types(self):
+ attacks = [
+ make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]),
+ make_attack_with_converters(AttackOutcome.FAILURE, ["ROT13Converter"]),
+ make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]),
+ ]
+ result = analyze_results(attacks, group_by=["converter_type"])
+
+ base64 = result.dimensions["converter_type"]["Base64Converter"]
+ assert base64.successes == 2
+ assert base64.failures == 0
+ assert base64.success_rate == 1.0
+
+ rot13 = result.dimensions["converter_type"]["ROT13Converter"]
+ assert rot13.successes == 0
+ assert rot13.failures == 1
+ assert rot13.success_rate == 0.0
+
+ def test_multiple_converters_per_attack(self):
+ attacks = [
+ make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter", "ROT13Converter"]),
+ ]
+ result = analyze_results(attacks, group_by=["converter_type"])
+
+ assert result.dimensions["converter_type"]["Base64Converter"].successes == 1
+ assert result.dimensions["converter_type"]["ROT13Converter"].successes == 1
+
+ def test_undetermined_tracked(self):
+ attacks = [
+ make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]),
+ make_attack_with_converters(AttackOutcome.UNDETERMINED, ["Base64Converter"]),
+ ]
+ result = analyze_results(attacks, group_by=["converter_type"])
+
+ stats = result.dimensions["converter_type"]["Base64Converter"]
+ assert stats.successes == 1
+ assert stats.undetermined == 1
+ assert stats.total_decided == 1
+ assert stats.success_rate == 1.0
+
+
+# ---------------------------------------------------------------------------
+# Composite dimensions
+# ---------------------------------------------------------------------------
+class TestCompositeDimensions:
+ """Group-by composite (cross-product) dimensions."""
+
+ def test_composite_two_dimensions(self):
+ attacks = [
+ make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="CrescendoAttack"),
+ make_attack_with_converters(AttackOutcome.FAILURE, ["ROT13Converter"], attack_type="CrescendoAttack"),
+ make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="RedTeamingAttack"),
+ ]
+ result = analyze_results(attacks, group_by=[("converter_type", "attack_type")])
+
+ dim = result.dimensions[("converter_type", "attack_type")]
+ assert dim[("Base64Converter", "CrescendoAttack")].successes == 1
+ assert dim[("Base64Converter", "CrescendoAttack")].failures == 0
+ assert dim[("ROT13Converter", "CrescendoAttack")].failures == 1
+ assert dim[("Base64Converter", "RedTeamingAttack")].successes == 1
+
+ def test_composite_with_multi_converter_creates_cross_product(self):
+ attacks = [
+ make_attack_with_converters(
+ AttackOutcome.SUCCESS,
+ ["Base64Converter", "ROT13Converter"],
+ attack_type="CrescendoAttack",
+ ),
+ ]
+ result = analyze_results(attacks, group_by=[("converter_type", "attack_type")])
+
+ dim = result.dimensions[("converter_type", "attack_type")]
+ assert ("Base64Converter", "CrescendoAttack") in dim
+ assert ("ROT13Converter", "CrescendoAttack") in dim
+ assert dim[("Base64Converter", "CrescendoAttack")].successes == 1
+ assert dim[("ROT13Converter", "CrescendoAttack")].successes == 1
+
+ def test_mixed_single_and_composite(self):
+ attacks = [
+ make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="CrescendoAttack"),
+ make_attack_with_converters(AttackOutcome.FAILURE, ["ROT13Converter"], attack_type="RedTeamingAttack"),
+ ]
+ result = analyze_results(
+ attacks,
+ group_by=[
+ "attack_type",
+ ("converter_type", "attack_type"),
],
- "unknown",
- 1,
- 1,
- 1,
- 0.5,
- ),
- ],
-)
-def test_group_by_attack_type_parametrized(items, type_key, exp_succ, exp_fail, exp_und, exp_rate):
- attacks = [make_attack(outcome=o, attack_type=t) for (o, t) in items]
- result = analyze_results(attacks)
-
- assert type_key in result["By_attack_identifier"]
- stats = result["By_attack_identifier"][type_key]
- assert isinstance(stats, AttackStats)
- assert stats.successes == exp_succ
- assert stats.failures == exp_fail
- assert stats.undetermined == exp_und
- assert stats.total_decided == exp_succ + exp_fail
- assert stats.success_rate == exp_rate
+ )
+
+ # Single dimension present
+ assert "attack_type" in result.dimensions
+ assert result.dimensions["attack_type"]["CrescendoAttack"].successes == 1
+ assert result.dimensions["attack_type"]["RedTeamingAttack"].failures == 1
+
+ # Composite dimension present
+ composite = result.dimensions[("converter_type", "attack_type")]
+ assert composite[("Base64Converter", "CrescendoAttack")].successes == 1
+ assert composite[("ROT13Converter", "RedTeamingAttack")].failures == 1
+
+
+# ---------------------------------------------------------------------------
+# Custom dimensions
+# ---------------------------------------------------------------------------
+class TestCustomDimensions:
+ """User-supplied custom dimension extractors."""
+
+ def test_custom_extractor(self):
+ def _extract_objective(result: AttackResult) -> list[str]:
+ return [result.objective]
+
+ attacks = [
+ AttackResult(
+ conversation_id="c1",
+ objective="steal secrets",
+ attack_identifier=ComponentIdentifier(
+ class_name="PromptSendingAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.SUCCESS,
+ ),
+ AttackResult(
+ conversation_id="c2",
+ objective="bypass filter",
+ attack_identifier=ComponentIdentifier(
+ class_name="PromptSendingAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.FAILURE,
+ ),
+ ]
+ result = analyze_results(
+ attacks,
+ group_by=["objective"],
+ custom_dimensions={"objective": _extract_objective},
+ )
+
+ assert result.dimensions["objective"]["steal secrets"].successes == 1
+ assert result.dimensions["objective"]["bypass filter"].failures == 1
+
+ def test_custom_dimension_in_composite(self):
+ def _extract_objective(result: AttackResult) -> list[str]:
+ return [result.objective]
+
+ attacks = [
+ make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]),
+ ]
+ # Override objective on the attack for testing
+ attacks[0].objective = "test_obj"
+
+ result = analyze_results(
+ attacks,
+ group_by=[("converter_type", "objective")],
+ custom_dimensions={"objective": _extract_objective},
+ )
+
+ composite = result.dimensions[("converter_type", "objective")]
+ assert ("Base64Converter", "test_obj") in composite
+
+
+# ---------------------------------------------------------------------------
+# Single dimension: label
+# ---------------------------------------------------------------------------
+class TestGroupByLabel:
+ """Group-by a single dimension: label."""
+
+ def test_no_labels_tracked(self):
+ attacks = [make_attack(AttackOutcome.SUCCESS)]
+ result = analyze_results(attacks, group_by=["label"])
+
+ stats = result.dimensions["label"]["no_labels"]
+ assert stats.successes == 1
+ assert stats.total_decided == 1
+
+ def test_single_label(self):
+ message = MessagePiece(
+ role="user",
+ original_value="test",
+ labels={"operation_name": "op_trash_panda"},
+ )
+ attacks = [
+ AttackResult(
+ conversation_id="c1",
+ objective="test",
+ attack_identifier=ComponentIdentifier(
+ class_name="PromptSendingAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.SUCCESS,
+ last_response=message,
+ ),
+ ]
+ result = analyze_results(attacks, group_by=["label"])
+
+ assert "operation_name=op_trash_panda" in result.dimensions["label"]
+ assert result.dimensions["label"]["operation_name=op_trash_panda"].successes == 1
+
+ def test_multiple_labels_per_attack(self):
+ """Each label key=value pair creates its own stats entry."""
+ message = MessagePiece(
+ role="user",
+ original_value="test",
+ labels={"operation_name": "op_trash_panda", "operator": "roakey"},
+ )
+ attacks = [
+ AttackResult(
+ conversation_id="c1",
+ objective="test",
+ attack_identifier=ComponentIdentifier(
+ class_name="PromptSendingAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.SUCCESS,
+ last_response=message,
+ ),
+ ]
+ result = analyze_results(attacks, group_by=["label"])
+
+ assert result.dimensions["label"]["operation_name=op_trash_panda"].successes == 1
+ assert result.dimensions["label"]["operator=roakey"].successes == 1
+
+ def test_label_composite_with_attack_type(self):
+ message = MessagePiece(
+ role="user",
+ original_value="test",
+ labels={"operator": "roakey"},
+ )
+ attacks = [
+ AttackResult(
+ conversation_id="c1",
+ objective="test",
+ attack_identifier=ComponentIdentifier(
+ class_name="CrescendoAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.SUCCESS,
+ last_response=message,
+ ),
+ AttackResult(
+ conversation_id="c2",
+ objective="test",
+ attack_identifier=ComponentIdentifier(
+ class_name="CrescendoAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.FAILURE,
+ last_response=message,
+ ),
+ ]
+ result = analyze_results(attacks, group_by=[("label", "attack_type")])
+
+ dim = result.dimensions[("label", "attack_type")]
+ assert ("operator=roakey", "CrescendoAttack") in dim
+ assert dim[("operator=roakey", "CrescendoAttack")].successes == 1
+ assert dim[("operator=roakey", "CrescendoAttack")].failures == 1
+
+
+# ---------------------------------------------------------------------------
+# Default group_by behavior
+# ---------------------------------------------------------------------------
+class TestDefaultGroupBy:
+ """When group_by=None, all built-in dimensions are used."""
+
+ def test_defaults_include_all_builtin_dimensions(self):
+ attacks = [make_attack(AttackOutcome.SUCCESS)]
+ result = analyze_results(attacks)
+
+ assert "attack_type" in result.dimensions
+ assert "converter_type" in result.dimensions
+ assert "label" in result.dimensions
+
+ def test_empty_group_by_returns_only_overall(self):
+ attacks = [make_attack(AttackOutcome.SUCCESS)]
+ result = analyze_results(attacks, group_by=[])
+
+ assert result.dimensions == {}
+ assert result.overall.successes == 1
+
+
+# ---------------------------------------------------------------------------
+# Deprecated dimension alias: attack_identifier -> attack_type
+# ---------------------------------------------------------------------------
+class TestDeprecatedAttackIdentifierAlias:
+ """Using 'attack_identifier' in group_by should work but warn."""
+
+ def test_alias_emits_deprecation_warning(self):
+ attacks = [make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack")]
+ with pytest.warns(DeprecationWarning, match="'attack_identifier' is deprecated"):
+ analyze_results(attacks, group_by=["attack_identifier"])
+
+ def test_alias_resolves_to_canonical_key(self):
+ attacks = [
+ make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack"),
+ make_attack(AttackOutcome.FAILURE, attack_type="CrescendoAttack"),
+ ]
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ result = analyze_results(attacks, group_by=["attack_identifier"])
+
+ # The dimension key in the result should be the canonical "attack_type"
+ assert "attack_type" in result.dimensions
+ assert "attack_identifier" not in result.dimensions
+ assert result.dimensions["attack_type"]["CrescendoAttack"].successes == 1
+
+ def test_alias_in_composite(self):
+ attacks = [
+ make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="CrescendoAttack"),
+ ]
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", DeprecationWarning)
+ result = analyze_results(attacks, group_by=[("converter_type", "attack_identifier")])
+
+ # Composite key uses canonical names
+ assert ("converter_type", "attack_type") in result.dimensions
+ dim = result.dimensions[("converter_type", "attack_type")]
+ assert ("Base64Converter", "CrescendoAttack") in dim
+
+
+# ---------------------------------------------------------------------------
+# Single dimension: harm_category
+# ---------------------------------------------------------------------------
+class TestGroupByHarmCategory:
+ """Group-by a single dimension: harm_category."""
+
+ def test_no_harm_category_tracked(self):
+ attacks = [make_attack(AttackOutcome.SUCCESS)]
+ result = analyze_results(attacks, group_by=["harm_category"])
+
+ stats = result.dimensions["harm_category"]["no_harm_category"]
+ assert stats.successes == 1
+ assert stats.total_decided == 1
+
+ def test_single_harm_category(self):
+ message = MessagePiece(
+ role="user",
+ original_value="test",
+ targeted_harm_categories=["hate_speech"],
+ )
+ attacks = [
+ AttackResult(
+ conversation_id="c1",
+ objective="test",
+ attack_identifier=ComponentIdentifier(
+ class_name="PromptSendingAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.SUCCESS,
+ last_response=message,
+ ),
+ ]
+ result = analyze_results(attacks, group_by=["harm_category"])
+
+ stats = result.dimensions["harm_category"]["hate_speech"]
+ assert stats.successes == 1
+ assert stats.total_decided == 1
+
+ def test_multiple_harm_categories_per_attack(self):
+ message = MessagePiece(
+ role="user",
+ original_value="test",
+ targeted_harm_categories=["violence", "hate_speech"],
+ )
+ attacks = [
+ AttackResult(
+ conversation_id="c1",
+ objective="test",
+ attack_identifier=ComponentIdentifier(
+ class_name="PromptSendingAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.SUCCESS,
+ last_response=message,
+ ),
+ ]
+ result = analyze_results(attacks, group_by=["harm_category"])
+
+ # Attack counted under both categories
+ assert result.dimensions["harm_category"]["violence"].successes == 1
+ assert result.dimensions["harm_category"]["hate_speech"].successes == 1
+
+ def test_multiple_attacks_different_harm_categories(self):
+ def _make(category: str, outcome: AttackOutcome) -> AttackResult:
+ message = MessagePiece(
+ role="user",
+ original_value="test",
+ targeted_harm_categories=[category],
+ )
+ return AttackResult(
+ conversation_id="c1",
+ objective="test",
+ attack_identifier=ComponentIdentifier(
+ class_name="PromptSendingAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=outcome,
+ last_response=message,
+ )
+
+ attacks = [
+ _make("violence", AttackOutcome.SUCCESS),
+ _make("violence", AttackOutcome.FAILURE),
+ _make("self_harm", AttackOutcome.SUCCESS),
+ ]
+ result = analyze_results(attacks, group_by=["harm_category"])
+
+ violence = result.dimensions["harm_category"]["violence"]
+ assert violence.successes == 1
+ assert violence.failures == 1
+
+ self_harm = result.dimensions["harm_category"]["self_harm"]
+ assert self_harm.successes == 1
+ assert self_harm.failures == 0
+
+ def test_harm_category_composite_with_attack_type(self):
+ message = MessagePiece(
+ role="user",
+ original_value="test",
+ targeted_harm_categories=["violence"],
+ )
+ attacks = [
+ AttackResult(
+ conversation_id="c1",
+ objective="test",
+ attack_identifier=ComponentIdentifier(
+ class_name="CrescendoAttack", class_module="pyrit.executor.attack"
+ ),
+ outcome=AttackOutcome.SUCCESS,
+ last_response=message,
+ ),
+ ]
+ result = analyze_results(attacks, group_by=[("harm_category", "attack_type")])
+
+ dim = result.dimensions[("harm_category", "attack_type")]
+ assert ("violence", "CrescendoAttack") in dim
+ assert dim[("violence", "CrescendoAttack")].successes == 1
+
+
+# ---------------------------------------------------------------------------
+# to_dataframe
+# ---------------------------------------------------------------------------
+class TestToDataframe:
+ """Tests for AnalysisResult.to_dataframe()."""
+
+ def test_single_dimension_columns(self):
+ attacks = [
+ make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack"),
+ make_attack(AttackOutcome.FAILURE, attack_type="CrescendoAttack"),
+ ]
+ result = analyze_results(attacks, group_by=["attack_type"])
+ df = result.to_dataframe("attack_type")
+
+ assert list(df.columns) == [
+ "dimension",
+ "key",
+ "successes",
+ "failures",
+ "undetermined",
+ "total_decided",
+ "success_rate",
+ ]
+ assert len(df) == 1
+ row = df.iloc[0]
+ assert row["dimension"] == "attack_type"
+ assert row["key"] == "CrescendoAttack"
+ assert row["successes"] == 1
+ assert row["failures"] == 1
+ assert row["success_rate"] == 0.5
+
+ def test_single_dimension_multiple_keys(self):
+ attacks = [
+ make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack"),
+ make_attack(AttackOutcome.SUCCESS, attack_type="RedTeamingAttack"),
+ make_attack(AttackOutcome.FAILURE, attack_type="RedTeamingAttack"),
+ ]
+ result = analyze_results(attacks, group_by=["attack_type"])
+ df = result.to_dataframe("attack_type")
+
+ assert len(df) == 2
+ keys = set(df["key"])
+ assert keys == {"CrescendoAttack", "RedTeamingAttack"}
+
+ def test_composite_dimension_explodes_columns(self):
+ attacks = [make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="Crescendo")]
+ result = analyze_results(attacks, group_by=[("converter_type", "attack_type")])
+ df = result.to_dataframe(("converter_type", "attack_type"))
+
+ assert "converter_type" in df.columns
+ assert "attack_type" in df.columns
+ assert "dimension" not in df.columns
+ assert "key" not in df.columns
+ assert df.iloc[0]["converter_type"] == "Base64Converter"
+ assert df.iloc[0]["attack_type"] == "Crescendo"
+ assert df.iloc[0]["successes"] == 1
+
+ def test_no_arg_includes_overall_and_all_dimensions(self):
+ attacks = [
+ make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack"),
+ make_attack(AttackOutcome.FAILURE, attack_type="CrescendoAttack"),
+ ]
+ result = analyze_results(attacks, group_by=["attack_type"])
+ df = result.to_dataframe()
+
+ assert "overall" in df["dimension"].values
+ assert "attack_type" in df["dimension"].values
+
+ overall_row = df[df["dimension"] == "overall"].iloc[0]
+ assert overall_row["key"] == "all"
+ assert overall_row["successes"] == 1
+ assert overall_row["failures"] == 1
+
+ def test_no_arg_composite_dimension_flattened(self):
+ attacks = [make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="Crescendo")]
+ result = analyze_results(attacks, group_by=[("converter_type", "attack_type")])
+ df = result.to_dataframe()
+
+ dims = df["dimension"].unique()
+ assert any("×" in d for d in dims)
+
+ def test_unknown_dimension_raises(self):
+ attacks = [make_attack(AttackOutcome.SUCCESS)]
+ result = analyze_results(attacks, group_by=["attack_type"])
+
+ with pytest.raises(KeyError):
+ result.to_dataframe("nonexistent_dimension")
+
+ def test_undetermined_included(self):
+ attacks = [make_attack(AttackOutcome.UNDETERMINED, attack_type="CrescendoAttack")]
+ result = analyze_results(attacks, group_by=["attack_type"])
+ df = result.to_dataframe("attack_type")
+
+ row = df.iloc[0]
+ assert row["undetermined"] == 1
+ assert row["total_decided"] == 0
+ assert row["success_rate"] is None
diff --git a/tests/unit/analytics/test_visualization.py b/tests/unit/analytics/test_visualization.py
new file mode 100644
index 0000000000..12be91f165
--- /dev/null
+++ b/tests/unit/analytics/test_visualization.py
@@ -0,0 +1,261 @@
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT license.
+
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from pyrit.analytics.result_analysis import AnalysisResult, AttackStats, analyze_results
+from pyrit.models import AttackOutcome, AttackResult
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+
+def _make_attack(
+ *,
+ conversation_id: str,
+ attack_type: str = "CrescendoAttack",
+ outcome: AttackOutcome = AttackOutcome.SUCCESS,
+ harm_category: str = "violence",
+) -> AttackResult:
+ mock_piece = MagicMock()
+ mock_piece.targeted_harm_categories = [harm_category]
+ mock_piece.converter_identifiers = []
+
+ attack = MagicMock(spec=AttackResult)
+ attack.conversation_id = conversation_id
+ attack.objective = "test"
+ attack.outcome = outcome
+ attack.last_response = mock_piece
+ attack.attack_identifier = MagicMock()
+ attack.attack_identifier.class_name = attack_type
+ attack.converter_identifiers = []
+ return attack
+
+
+def _make_result(*, group_by: list[str] | None = None) -> AnalysisResult:
+ attacks = [
+ _make_attack(conversation_id="c1", attack_type="CrescendoAttack", outcome=AttackOutcome.SUCCESS),
+ _make_attack(conversation_id="c2", attack_type="CrescendoAttack", outcome=AttackOutcome.FAILURE),
+ _make_attack(
+ conversation_id="c3",
+ attack_type="RedTeamingAttack",
+ outcome=AttackOutcome.SUCCESS,
+ harm_category="hate_speech",
+ ),
+ _make_attack(
+ conversation_id="c4",
+ attack_type="RedTeamingAttack",
+ outcome=AttackOutcome.UNDETERMINED,
+ harm_category="hate_speech",
+ ),
+ ]
+ return analyze_results(attacks, group_by=group_by or ["attack_type", "harm_category"])
+
+
+# ---------------------------------------------------------------------------
+# Tests: save_html
+# ---------------------------------------------------------------------------
+
+
+class TestSaveHtml:
+ """Tests for save_html function."""
+
+ def test_save_html_creates_file(self, tmp_path: Path) -> None:
+ """save_html writes an HTML file at the given path."""
+ plotly = pytest.importorskip("plotly") # noqa: F841
+ from pyrit.analytics.visualization import save_html
+
+ result = _make_result()
+ out = tmp_path / "report.html"
+ returned = save_html(result, out)
+
+ assert returned == out
+ assert out.exists()
+ assert out.stat().st_size > 0
+
+ def test_save_html_returns_path(self, tmp_path: Path) -> None:
+ """save_html returns a Path object."""
+ pytest.importorskip("plotly")
+ from pyrit.analytics.visualization import save_html
+
+ result = _make_result()
+ returned = save_html(result, tmp_path / "r.html")
+ assert isinstance(returned, Path)
+
+ def test_save_html_accepts_string_path(self, tmp_path: Path) -> None:
+ """save_html accepts a plain string as the path argument."""
+ pytest.importorskip("plotly")
+ from pyrit.analytics.visualization import save_html
+
+ result = _make_result()
+ str_path = str(tmp_path / "report.html")
+ returned = save_html(result, str_path)
+ assert returned == Path(str_path)
+ assert Path(str_path).exists()
+
+ def test_save_html_creates_parent_dirs(self, tmp_path: Path) -> None:
+ """save_html creates missing parent directories."""
+ pytest.importorskip("plotly")
+ from pyrit.analytics.visualization import save_html
+
+ result = _make_result()
+ nested = tmp_path / "a" / "b" / "report.html"
+ save_html(result, nested)
+ assert nested.exists()
+
+ def test_save_html_contains_plotly_cdn(self, tmp_path: Path) -> None:
+ """The saved HTML includes the Plotly CDN script tag."""
+ pytest.importorskip("plotly")
+ from pyrit.analytics.visualization import save_html
+
+ result = _make_result()
+ out = tmp_path / "report.html"
+ save_html(result, out)
+ content = out.read_text(encoding="utf-8")
+ assert "plotly" in content.lower()
+
+ def test_save_html_contains_title(self, tmp_path: Path) -> None:
+ """Custom title appears in the saved HTML."""
+ pytest.importorskip("plotly")
+ from pyrit.analytics.visualization import save_html
+
+ result = _make_result()
+ out = tmp_path / "report.html"
+ save_html(result, out, title="My Custom Report")
+ content = out.read_text(encoding="utf-8")
+ assert "My Custom Report" in content
+
+ def test_save_html_contains_overall_stats(self, tmp_path: Path) -> None:
+ """The KPI card includes overall ASR and counts."""
+ pytest.importorskip("plotly")
+ from pyrit.analytics.visualization import save_html
+
+ result = _make_result()
+ out = tmp_path / "report.html"
+ save_html(result, out)
+ content = out.read_text(encoding="utf-8")
+ assert "Overall ASR" in content
+ assert "Total Attacks" in content
+
+ def test_save_html_no_plotly_raises(self, tmp_path: Path) -> None:
+ """save_html raises ImportError when plotly is not installed."""
+ with patch.dict("sys.modules", {"plotly": None, "plotly.graph_objects": None, "plotly.io": None}):
+ import importlib
+
+ import pyrit.analytics.visualization as vis_mod
+
+ importlib.reload(vis_mod)
+
+ result = _make_result()
+ with pytest.raises(ImportError, match="plotly is required"):
+ vis_mod.save_html(result, tmp_path / "r.html")
+
+
+# ---------------------------------------------------------------------------
+# Tests: sparsity guard
+# ---------------------------------------------------------------------------
+
+
+class TestSparsityGuard:
+ """Tests for the sparsity filtering in save_html."""
+
+ def test_sparse_heatmap_skipped(self, tmp_path: Path) -> None:
+ """A composite dim with >50% empty cells is not rendered as a heatmap."""
+ pytest.importorskip("plotly")
+ from pyrit.analytics.visualization import _is_sparse
+
+ sparse_data = {
+ ("a", "x"): AttackStats(success_rate=0.5, total_decided=2, successes=1, failures=1, undetermined=0),
+ ("b", "x"): AttackStats(success_rate=None, total_decided=0, successes=0, failures=0, undetermined=1),
+ ("c", "x"): AttackStats(success_rate=None, total_decided=0, successes=0, failures=0, undetermined=1),
+ }
+ assert _is_sparse(sparse_data, threshold=0.5) is True
+
+ def test_dense_heatmap_not_skipped(self) -> None:
+ """A composite dim with ≤50% empty cells passes the sparsity check."""
+ from pyrit.analytics.visualization import _is_sparse
+
+ dense_data = {
+ ("a", "x"): AttackStats(success_rate=0.8, total_decided=5, successes=4, failures=1, undetermined=0),
+ ("b", "x"): AttackStats(success_rate=0.4, total_decided=5, successes=2, failures=3, undetermined=0),
+ ("c", "x"): AttackStats(success_rate=None, total_decided=0, successes=0, failures=0, undetermined=1),
+ }
+ assert _is_sparse(dense_data, threshold=0.5) is False
+
+ def test_empty_dim_data_is_sparse(self) -> None:
+ """Empty dim_data is considered sparse."""
+ from pyrit.analytics.visualization import _is_sparse
+
+ assert _is_sparse({}) is True
+
+
+# ---------------------------------------------------------------------------
+# Tests: heatmap builders
+# ---------------------------------------------------------------------------
+
+
+class TestBuildZMatrix:
+ """Tests for _build_z_matrix."""
+
+ def test_z_matrix_shape(self) -> None:
+ """z matrix dimensions match row_keys × col_keys."""
+ from pyrit.analytics.visualization import _build_z_matrix
+
+ row_keys = ["r1", "r2"]
+ col_keys = ["c1", "c2", "c3"]
+ lookup = {
+ ("r1", "c1"): AttackStats(success_rate=0.8, total_decided=5, successes=4, failures=1, undetermined=0),
+ }
+ z, text = _build_z_matrix(row_keys=row_keys, col_keys=col_keys, lookup=lookup)
+ assert len(z) == 2
+ assert len(z[0]) == 3
+
+ def test_missing_cell_is_none(self) -> None:
+ """Cells absent from lookup get None in the z matrix."""
+ from pyrit.analytics.visualization import _build_z_matrix
+
+ z, _ = _build_z_matrix(row_keys=["r1"], col_keys=["c1"], lookup={})
+ assert z[0][0] is None
+
+ def test_present_cell_has_rate(self) -> None:
+ """Cells present in lookup get the correct success_rate."""
+ from pyrit.analytics.visualization import _build_z_matrix
+
+ lookup = {
+ ("r1", "c1"): AttackStats(success_rate=0.75, total_decided=4, successes=3, failures=1, undetermined=0),
+ }
+ z, _ = _build_z_matrix(row_keys=["r1"], col_keys=["c1"], lookup=lookup)
+ assert z[0][0] == pytest.approx(0.75)
+
+
+# ---------------------------------------------------------------------------
+# Tests: helper functions
+# ---------------------------------------------------------------------------
+
+
+class TestAsrCssClass:
+ """Tests for _asr_css_class."""
+
+ def test_high_rate_is_green(self) -> None:
+ from pyrit.analytics.visualization import _asr_css_class
+
+ assert _asr_css_class(0.9) == "green"
+
+ def test_mid_rate_is_yellow(self) -> None:
+ from pyrit.analytics.visualization import _asr_css_class
+
+ assert _asr_css_class(0.45) == "yellow"
+
+ def test_low_rate_is_red(self) -> None:
+ from pyrit.analytics.visualization import _asr_css_class
+
+ assert _asr_css_class(0.1) == "red"
+
+ def test_none_rate_is_empty(self) -> None:
+ from pyrit.analytics.visualization import _asr_css_class
+
+ assert _asr_css_class(None) == ""