diff --git a/doc/_toc.yml b/doc/_toc.yml index 0845047cf3..2e231122a0 100644 --- a/doc/_toc.yml +++ b/doc/_toc.yml @@ -110,6 +110,7 @@ chapters: - file: code/scoring/prompt_shield_scorer - file: code/scoring/generic_scorers - file: code/scoring/8_scorer_metrics + - file: code/analytics/1_result_analysis - file: code/memory/0_memory sections: - file: code/memory/1_sqlite_memory diff --git a/doc/code/analytics/1_result_analysis.ipynb b/doc/code/analytics/1_result_analysis.ipynb new file mode 100644 index 0000000000..a2dbca70c2 --- /dev/null +++ b/doc/code/analytics/1_result_analysis.ipynb @@ -0,0 +1,898 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# Result Analysis\n", + "\n", + "The `analyze_results` function computes attack success rates from a list of `AttackResult` objects.\n", + "It supports flexible grouping across built-in dimensions (`attack_type`, `converter_type`, `label`)\n", + "as well as composite and custom dimensions." + ] + }, + { + "cell_type": "markdown", + "id": "1", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "First, let's create some sample `AttackResult` objects to work with." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Created 5 sample AttackResult objects\n" + ] + } + ], + "source": [ + "from pyrit.analytics import analyze_results\n", + "from pyrit.identifiers import ComponentIdentifier\n", + "from pyrit.models import AttackOutcome, AttackResult, MessagePiece\n", + "\n", + "\n", + "def make_converter(name: str) -> ComponentIdentifier:\n", + " return ComponentIdentifier(class_name=name, class_module=\"pyrit.prompt_converter\")\n", + "\n", + "\n", + "crescendo_id = ComponentIdentifier(class_name=\"CrescendoAttack\", class_module=\"pyrit.executor.attack\")\n", + "red_team_id = ComponentIdentifier(class_name=\"RedTeamingAttack\", class_module=\"pyrit.executor.attack\")\n", + "\n", + "# Build a small set of representative attack results\n", + "results = [\n", + " # Crescendo attacks with Base64Converter\n", + " AttackResult(\n", + " conversation_id=\"c1\",\n", + " objective=\"bypass safety filter\",\n", + " attack_identifier=crescendo_id,\n", + " outcome=AttackOutcome.SUCCESS,\n", + " last_response=MessagePiece(\n", + " role=\"user\",\n", + " original_value=\"response 1\",\n", + " converter_identifiers=[make_converter(\"Base64Converter\")],\n", + " labels={\"operation_name\": \"op_safety_bypass\", \"operator\": \"alice\"},\n", + " targeted_harm_categories=[\"violence\", \"hate_speech\"],\n", + " ),\n", + " ),\n", + " AttackResult(\n", + " conversation_id=\"c2\",\n", + " objective=\"bypass safety filter\",\n", + " attack_identifier=crescendo_id,\n", + " outcome=AttackOutcome.FAILURE,\n", + " last_response=MessagePiece(\n", + " role=\"user\",\n", + " original_value=\"response 2\",\n", + " converter_identifiers=[make_converter(\"Base64Converter\")],\n", + " labels={\"operation_name\": \"op_safety_bypass\", \"operator\": \"alice\"},\n", + " targeted_harm_categories=[\"violence\"],\n", + " ),\n", + " ),\n", + " # Red teaming attacks with ROT13Converter\n", + " AttackResult(\n", + " conversation_id=\"c3\",\n", + " objective=\"extract secrets\",\n", + " attack_identifier=red_team_id,\n", + " outcome=AttackOutcome.SUCCESS,\n", + " last_response=MessagePiece(\n", + " role=\"user\",\n", + " original_value=\"response 3\",\n", + " converter_identifiers=[make_converter(\"ROT13Converter\")],\n", + " labels={\"operation_name\": \"op_secret_extract\", \"operator\": \"bob\"},\n", + " targeted_harm_categories=[\"misinformation\"],\n", + " ),\n", + " ),\n", + " AttackResult(\n", + " conversation_id=\"c4\",\n", + " objective=\"extract secrets\",\n", + " attack_identifier=red_team_id,\n", + " outcome=AttackOutcome.SUCCESS,\n", + " last_response=MessagePiece(\n", + " role=\"user\",\n", + " original_value=\"response 4\",\n", + " converter_identifiers=[make_converter(\"ROT13Converter\")],\n", + " labels={\"operation_name\": \"op_secret_extract\", \"operator\": \"bob\"},\n", + " targeted_harm_categories=[\"hate_speech\", \"misinformation\"],\n", + " ),\n", + " ),\n", + " # An undetermined result (no converter, no labels)\n", + " AttackResult(\n", + " conversation_id=\"c5\",\n", + " objective=\"test prompt\",\n", + " attack_identifier=crescendo_id,\n", + " outcome=AttackOutcome.UNDETERMINED,\n", + " ),\n", + "]\n", + "\n", + "print(f\"Created {len(results)} sample AttackResult objects\")" + ] + }, + { + "cell_type": "markdown", + "id": "3", + "metadata": {}, + "source": [ + "## Overall Stats (No Grouping)\n", + "\n", + "Pass `group_by=[]` to compute only the overall attack success rate, with no\n", + "dimensional breakdown." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Overall success rate: 0.75\n", + " Successes: 3\n", + " Failures: 1\n", + " Undetermined: 1\n", + " Total decided (excl. undetermined): 4\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[])\n", + "\n", + "print(f\"Overall success rate: {result.overall.success_rate}\")\n", + "print(f\" Successes: {result.overall.successes}\")\n", + "print(f\" Failures: {result.overall.failures}\")\n", + "print(f\" Undetermined: {result.overall.undetermined}\")\n", + "print(f\" Total decided (excl. undetermined): {result.overall.total_decided}\")" + ] + }, + { + "cell_type": "markdown", + "id": "5", + "metadata": {}, + "source": [ + "## Group by Attack Type\n", + "\n", + "See how success rates differ across attack strategies (e.g. `crescendo` vs `red_teaming`)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " CrescendoAttack: success_rate=0.5, successes=1, failures=1, undetermined=1\n", + " RedTeamingAttack: success_rate=1.0, successes=2, failures=0, undetermined=0\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[\"attack_type\"])\n", + "\n", + "for attack_type, stats in result.dimensions[\"attack_type\"].items():\n", + " print(\n", + " f\" {attack_type}: success_rate={stats.success_rate}, \"\n", + " f\"successes={stats.successes}, failures={stats.failures}, \"\n", + " f\"undetermined={stats.undetermined}\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "## Group by Converter Type\n", + "\n", + "Break down success rates by which prompt converter was applied." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " Base64Converter: success_rate=0.5, successes=1, failures=1\n", + " ROT13Converter: success_rate=1.0, successes=2, failures=0\n", + " no_converter: success_rate=None, successes=0, failures=0\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[\"converter_type\"])\n", + "\n", + "for converter, stats in result.dimensions[\"converter_type\"].items():\n", + " print(f\" {converter}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "## Group by Label\n", + "\n", + "Labels are key=value metadata attached to messages. Each label pair becomes its own\n", + "grouping key." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " operation_name=op_safety_bypass: success_rate=0.5, successes=1, failures=1\n", + " operator=alice: success_rate=0.5, successes=1, failures=1\n", + " operation_name=op_secret_extract: success_rate=1.0, successes=2, failures=0\n", + " operator=bob: success_rate=1.0, successes=2, failures=0\n", + " no_labels: success_rate=None, successes=0, failures=0\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[\"label\"])\n", + "\n", + "for label_key, stats in result.dimensions[\"label\"].items():\n", + " print(f\" {label_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")" + ] + }, + { + "cell_type": "markdown", + "id": "11", + "metadata": {}, + "source": [ + "## Group by Harm Category\n", + "\n", + "Break down success rates by the targeted harm categories associated with each prompt." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " violence: success_rate=0.5, successes=1, failures=1\n", + " hate_speech: success_rate=1.0, successes=2, failures=0\n", + " misinformation: success_rate=1.0, successes=2, failures=0\n", + " no_harm_category: success_rate=None, successes=0, failures=0\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[\"harm_category\"])\n", + "\n", + "for harm_cat, stats in result.dimensions[\"harm_category\"].items():\n", + " print(f\" {harm_cat}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")" + ] + }, + { + "cell_type": "markdown", + "id": "13", + "metadata": {}, + "source": [ + "## Multiple Dimensions at Once\n", + "\n", + "Pass several dimension names to `group_by` for independent breakdowns in a single call." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "--- By attack_type ---\n", + " CrescendoAttack: success_rate=0.5\n", + " RedTeamingAttack: success_rate=1.0\n", + "\n", + "--- By converter_type ---\n", + " Base64Converter: success_rate=0.5\n", + " ROT13Converter: success_rate=1.0\n", + " no_converter: success_rate=None\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[\"attack_type\", \"converter_type\"])\n", + "\n", + "print(\"--- By attack_type ---\")\n", + "for key, stats in result.dimensions[\"attack_type\"].items():\n", + " print(f\" {key}: success_rate={stats.success_rate}\")\n", + "\n", + "print(\"\\n--- By converter_type ---\")\n", + "for key, stats in result.dimensions[\"converter_type\"].items():\n", + " print(f\" {key}: success_rate={stats.success_rate}\")" + ] + }, + { + "cell_type": "markdown", + "id": "15", + "metadata": {}, + "source": [ + "## Composite Dimensions\n", + "\n", + "Use a tuple of dimension names to create a cross-product grouping. For example,\n", + "`(\"converter_type\", \"attack_type\")` produces keys like `(\"Base64Converter\", \"CrescendoAttack\")`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "16", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " ('Base64Converter', 'CrescendoAttack'): success_rate=0.5, successes=1, failures=1\n", + " ('ROT13Converter', 'RedTeamingAttack'): success_rate=1.0, successes=2, failures=0\n", + " ('no_converter', 'CrescendoAttack'): success_rate=None, successes=0, failures=0\n" + ] + } + ], + "source": [ + "result = analyze_results(results, group_by=[(\"converter_type\", \"attack_type\")])\n", + "\n", + "for combo_key, stats in result.dimensions[(\"converter_type\", \"attack_type\")].items():\n", + " print(f\" {combo_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")" + ] + }, + { + "cell_type": "markdown", + "id": "17", + "metadata": {}, + "source": [ + "## Custom Dimensions\n", + "\n", + "Supply your own extractor function via `custom_dimensions`. An extractor takes an\n", + "`AttackResult` and returns a `list[str]` of dimension values. Here we group by the\n", + "attack objective." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " bypass safety filter: success_rate=0.5, successes=1, failures=1\n", + " extract secrets: success_rate=1.0, successes=2, failures=0\n", + " test prompt: success_rate=None, successes=0, failures=0\n" + ] + } + ], + "source": [ + "def extract_objective(attack: AttackResult) -> list[str]:\n", + " return [attack.objective]\n", + "\n", + "\n", + "result = analyze_results(\n", + " results,\n", + " group_by=[\"objective\"],\n", + " custom_dimensions={\"objective\": extract_objective},\n", + ")\n", + "\n", + "for objective, stats in result.dimensions[\"objective\"].items():\n", + " print(f\" {objective}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}\")" + ] + }, + { + "cell_type": "markdown", + "id": "19", + "metadata": {}, + "source": [ + "## Default Behavior\n", + "\n", + "When `group_by` is omitted, `analyze_results` groups by **all** registered\n", + "dimensions: `attack_type`, `converter_type`, and `label`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dimensions returned: ['attack_type', 'converter_type', 'label']\n", + "Overall success rate: 0.75\n" + ] + } + ], + "source": [ + "result = analyze_results(results)\n", + "\n", + "print(f\"Dimensions returned: {list(result.dimensions.keys())}\")\n", + "print(f\"Overall success rate: {result.overall.success_rate}\")" + ] + }, + { + "cell_type": "markdown", + "id": "21", + "metadata": {}, + "source": [ + "## Export to DataFrame\n", + "\n", + "Use the `to_dataframe()` method to export analysis results as a pandas DataFrame for further\n", + "analysis or visualization. Pass a dimension name to export a specific breakdown, or `None`\n", + "to export all dimensions in long-form." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "--- Harm Category DataFrame ---\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
dimensionkeysuccessesfailuresundeterminedtotal_decidedsuccess_rate
0harm_categoryviolence11020.5
1harm_categoryhate_speech20021.0
2harm_categorymisinformation20021.0
3harm_categoryno_harm_category0010NaN
\n", + "
" + ], + "text/plain": [ + " dimension key successes failures undetermined \\\n", + "0 harm_category violence 1 1 0 \n", + "1 harm_category hate_speech 2 0 0 \n", + "2 harm_category misinformation 2 0 0 \n", + "3 harm_category no_harm_category 0 0 1 \n", + "\n", + " total_decided success_rate \n", + "0 2 0.5 \n", + "1 2 1.0 \n", + "2 2 1.0 \n", + "3 0 NaN " + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "--- Attack Type DataFrame ---\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
dimensionkeysuccessesfailuresundeterminedtotal_decidedsuccess_rate
0attack_typeCrescendoAttack11120.5
1attack_typeRedTeamingAttack20021.0
\n", + "
" + ], + "text/plain": [ + " dimension key successes failures undetermined \\\n", + "0 attack_type CrescendoAttack 1 1 1 \n", + "1 attack_type RedTeamingAttack 2 0 0 \n", + "\n", + " total_decided success_rate \n", + "0 2 0.5 \n", + "1 2 1.0 " + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "--- All Dimensions DataFrame ---\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
dimensionkeysuccessesfailuresundeterminedtotal_decidedsuccess_rate
0overallall31140.75
1harm_categoryviolence11020.50
2harm_categoryhate_speech20021.00
3harm_categorymisinformation20021.00
4harm_categoryno_harm_category0010NaN
5attack_typeCrescendoAttack11120.50
6attack_typeRedTeamingAttack20021.00
\n", + "
" + ], + "text/plain": [ + " dimension key successes failures undetermined \\\n", + "0 overall all 3 1 1 \n", + "1 harm_category violence 1 1 0 \n", + "2 harm_category hate_speech 2 0 0 \n", + "3 harm_category misinformation 2 0 0 \n", + "4 harm_category no_harm_category 0 0 1 \n", + "5 attack_type CrescendoAttack 1 1 1 \n", + "6 attack_type RedTeamingAttack 2 0 0 \n", + "\n", + " total_decided success_rate \n", + "0 4 0.75 \n", + "1 2 0.50 \n", + "2 2 1.00 \n", + "3 2 1.00 \n", + "4 0 NaN \n", + "5 2 0.50 \n", + "6 2 1.00 " + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "from IPython.display import display\n", + "\n", + "result = analyze_results(results, group_by=[\"harm_category\", \"attack_type\"])\n", + "\n", + "# Export a single dimension as a table\n", + "print(\"--- Harm Category DataFrame ---\")\n", + "df_harm = result.to_dataframe(dimension=\"harm_category\")\n", + "display(df_harm)\n", + "\n", + "# Export a single dimension as a table\n", + "print(\"--- Attack Type DataFrame ---\")\n", + "df_attack = result.to_dataframe(dimension=\"attack_type\")\n", + "display(df_attack)\n", + "\n", + "# Export all dimensions in long-form as a table\n", + "print(\"\\n--- All Dimensions DataFrame ---\")\n", + "df_all = result.to_dataframe()\n", + "display(df_all)" + ] + }, + { + "cell_type": "markdown", + "id": "265c9e56", + "metadata": {}, + "source": [ + "## Interactive HTML Report\n", + "\n", + "Use `save_html()` to generate a fully interactive HTML report with:\n", + "- KPI summary cards\n", + "- A dimension selector dropdown to switch between harm_category, attack_type, etc.\n", + "- Cross-dimensional heatmaps\n", + "- Data coverage table\n", + "\n", + "The report is self-contained and can be opened in any browser." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "ba3ecf2f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Report saved to: attack_analysis_report.html\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + " \n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from pyrit.analytics import save_html\n", + "from IPython.display import IFrame\n", + "\n", + "# Run analysis with multiple dimensions for rich visualization\n", + "result = analyze_results(\n", + " results,\n", + " group_by=[\n", + " \"harm_category\",\n", + " \"attack_type\",\n", + " \"converter_type\",\n", + " \"label\",\n", + " (\"harm_category\", \"attack_type\"),\n", + " (\"harm_category\", \"converter_type\"),\n", + " ],\n", + ")\n", + "\n", + "# Save the interactive HTML report\n", + "report_path = save_html(result, \"attack_analysis_report.html\", title=\"Attack Analysis Report\")\n", + "print(f\"Report saved to: {report_path}\")\n", + "\n", + "# Display the report inline (works in Jupyter notebooks)\n", + "IFrame(src=str(report_path), width=\"100%\", height=800)" + ] + } + ], + "metadata": { + "jupytext": { + "cell_metadata_filter": "-all" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/doc/code/analytics/1_result_analysis.py b/doc/code/analytics/1_result_analysis.py new file mode 100644 index 0000000000..9c68083259 --- /dev/null +++ b/doc/code/analytics/1_result_analysis.py @@ -0,0 +1,295 @@ +# --- +# jupyter: +# jupytext: +# cell_metadata_filter: -all +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.19.0 +# kernelspec: +# display_name: pyrit-dev +# language: python +# name: python3 +# --- + +# %% [markdown] +# # Result Analysis +# +# The `analyze_results` function computes attack success rates from a list of `AttackResult` objects. +# It supports flexible grouping across built-in dimensions (`attack_type`, `converter_type`, `label`) +# as well as composite and custom dimensions. + +# %% [markdown] +# ## Setup +# +# First, let's create some sample `AttackResult` objects to work with. + +# %% +from pyrit.analytics import analyze_results +from pyrit.identifiers import ComponentIdentifier +from pyrit.models import AttackOutcome, AttackResult, MessagePiece + + +def make_converter(name: str) -> ComponentIdentifier: + return ComponentIdentifier(class_name=name, class_module="pyrit.prompt_converter") + + +crescendo_id = ComponentIdentifier(class_name="CrescendoAttack", class_module="pyrit.executor.attack") +red_team_id = ComponentIdentifier(class_name="RedTeamingAttack", class_module="pyrit.executor.attack") + +# Build a small set of representative attack results +results = [ + # Crescendo attacks with Base64Converter + AttackResult( + conversation_id="c1", + objective="bypass safety filter", + attack_identifier=crescendo_id, + outcome=AttackOutcome.SUCCESS, + last_response=MessagePiece( + role="user", + original_value="response 1", + converter_identifiers=[make_converter("Base64Converter")], + labels={"operation_name": "op_safety_bypass", "operator": "alice"}, + targeted_harm_categories=["violence", "hate_speech"], + ), + ), + AttackResult( + conversation_id="c2", + objective="bypass safety filter", + attack_identifier=crescendo_id, + outcome=AttackOutcome.FAILURE, + last_response=MessagePiece( + role="user", + original_value="response 2", + converter_identifiers=[make_converter("Base64Converter")], + labels={"operation_name": "op_safety_bypass", "operator": "alice"}, + targeted_harm_categories=["violence"], + ), + ), + # Red teaming attacks with ROT13Converter + AttackResult( + conversation_id="c3", + objective="extract secrets", + attack_identifier=red_team_id, + outcome=AttackOutcome.SUCCESS, + last_response=MessagePiece( + role="user", + original_value="response 3", + converter_identifiers=[make_converter("ROT13Converter")], + labels={"operation_name": "op_secret_extract", "operator": "bob"}, + targeted_harm_categories=["misinformation"], + ), + ), + AttackResult( + conversation_id="c4", + objective="extract secrets", + attack_identifier=red_team_id, + outcome=AttackOutcome.SUCCESS, + last_response=MessagePiece( + role="user", + original_value="response 4", + converter_identifiers=[make_converter("ROT13Converter")], + labels={"operation_name": "op_secret_extract", "operator": "bob"}, + targeted_harm_categories=["hate_speech", "misinformation"], + ), + ), + # An undetermined result (no converter, no labels) + AttackResult( + conversation_id="c5", + objective="test prompt", + attack_identifier=crescendo_id, + outcome=AttackOutcome.UNDETERMINED, + ), +] + +print(f"Created {len(results)} sample AttackResult objects") + +# %% [markdown] +# ## Overall Stats (No Grouping) +# +# Pass `group_by=[]` to compute only the overall attack success rate, with no +# dimensional breakdown. + +# %% +result = analyze_results(results, group_by=[]) + +print(f"Overall success rate: {result.overall.success_rate}") +print(f" Successes: {result.overall.successes}") +print(f" Failures: {result.overall.failures}") +print(f" Undetermined: {result.overall.undetermined}") +print(f" Total decided (excl. undetermined): {result.overall.total_decided}") + +# %% [markdown] +# ## Group by Attack Type +# +# See how success rates differ across attack strategies (e.g. `crescendo` vs `red_teaming`). + +# %% +result = analyze_results(results, group_by=["attack_type"]) + +for attack_type, stats in result.dimensions["attack_type"].items(): + print( + f" {attack_type}: success_rate={stats.success_rate}, " + f"successes={stats.successes}, failures={stats.failures}, " + f"undetermined={stats.undetermined}" + ) + +# %% [markdown] +# ## Group by Converter Type +# +# Break down success rates by which prompt converter was applied. + +# %% +result = analyze_results(results, group_by=["converter_type"]) + +for converter, stats in result.dimensions["converter_type"].items(): + print(f" {converter}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}") + +# %% [markdown] +# ## Group by Label +# +# Labels are key=value metadata attached to messages. Each label pair becomes its own +# grouping key. + +# %% +result = analyze_results(results, group_by=["label"]) + +for label_key, stats in result.dimensions["label"].items(): + print(f" {label_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}") + +# %% [markdown] +# ## Group by Harm Category +# +# Break down success rates by the targeted harm categories associated with each prompt. + +# %% +result = analyze_results(results, group_by=["harm_category"]) + +for harm_cat, stats in result.dimensions["harm_category"].items(): + print(f" {harm_cat}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}") + +# %% [markdown] +# ## Multiple Dimensions at Once +# +# Pass several dimension names to `group_by` for independent breakdowns in a single call. + +# %% +result = analyze_results(results, group_by=["attack_type", "converter_type"]) + +print("--- By attack_type ---") +for key, stats in result.dimensions["attack_type"].items(): + print(f" {key}: success_rate={stats.success_rate}") + +print("\n--- By converter_type ---") +for key, stats in result.dimensions["converter_type"].items(): + print(f" {key}: success_rate={stats.success_rate}") + +# %% [markdown] +# ## Composite Dimensions +# +# Use a tuple of dimension names to create a cross-product grouping. For example, +# `("converter_type", "attack_type")` produces keys like `("Base64Converter", "CrescendoAttack")`. + +# %% +result = analyze_results(results, group_by=[("converter_type", "attack_type")]) + +for combo_key, stats in result.dimensions[("converter_type", "attack_type")].items(): + print(f" {combo_key}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}") + +# %% [markdown] +# ## Custom Dimensions +# +# Supply your own extractor function via `custom_dimensions`. An extractor takes an +# `AttackResult` and returns a `list[str]` of dimension values. Here we group by the +# attack objective. + +# %% + + +def extract_objective(attack: AttackResult) -> list[str]: + return [attack.objective] + + +result = analyze_results( + results, + group_by=["objective"], + custom_dimensions={"objective": extract_objective}, +) + +for objective, stats in result.dimensions["objective"].items(): + print(f" {objective}: success_rate={stats.success_rate}, successes={stats.successes}, failures={stats.failures}") + +# %% [markdown] +# ## Default Behavior +# +# When `group_by` is omitted, `analyze_results` groups by **all** registered +# dimensions: `attack_type`, `converter_type`, and `label`. + +# %% +result = analyze_results(results) + +print(f"Dimensions returned: {list(result.dimensions.keys())}") +print(f"Overall success rate: {result.overall.success_rate}") + +# %% [markdown] +# ## Export to DataFrame +# +# Use the `to_dataframe()` method to export analysis results as a pandas DataFrame for further +# analysis or visualization. Pass a dimension name to export a specific breakdown, or `None` +# to export all dimensions in long-form. + +# %% +from IPython.display import display + +result = analyze_results(results, group_by=["harm_category", "attack_type"]) + +# Export a single dimension as a table +print("--- Harm Category DataFrame ---") +df_harm = result.to_dataframe(dimension="harm_category") +display(df_harm) + +# Export a single dimension as a table +print("--- Attack Type DataFrame ---") +df_attack = result.to_dataframe(dimension="attack_type") +display(df_attack) + +# Export all dimensions in long-form as a table +print("\n--- All Dimensions DataFrame ---") +df_all = result.to_dataframe() +display(df_all) + +# %% [markdown] +# ## Interactive HTML Report +# +# Use `save_html()` to generate a fully interactive HTML report with: +# - KPI summary cards +# - A dimension selector dropdown to switch between harm_category, attack_type, etc. +# - Cross-dimensional heatmaps +# - Data coverage table +# +# The report is self-contained and can be opened in any browser. + +# %% +from pyrit.analytics import save_html +from IPython.display import IFrame + +# Run analysis with multiple dimensions for rich visualization +result = analyze_results( + results, + group_by=[ + "harm_category", + "attack_type", + "converter_type", + "label", + ("harm_category", "attack_type"), + ("harm_category", "converter_type"), + ], +) + +# Save the interactive HTML report +report_path = save_html(result, "attack_analysis_report.html", title="Attack Analysis Report") +print(f"Report saved to: {report_path}") + +# Display the report inline (works in Jupyter notebooks) +IFrame(src=str(report_path), width="100%", height=800) diff --git a/pyrit/analytics/__init__.py b/pyrit/analytics/__init__.py index f75d401dd7..04b329df05 100644 --- a/pyrit/analytics/__init__.py +++ b/pyrit/analytics/__init__.py @@ -4,18 +4,27 @@ """Analytics module for PyRIT conversation and result analysis.""" from pyrit.analytics.conversation_analytics import ConversationAnalytics -from pyrit.analytics.result_analysis import AttackStats, analyze_results +from pyrit.analytics.result_analysis import ( + AnalysisResult, + AttackStats, + DimensionExtractor, + analyze_results, +) from pyrit.analytics.text_matching import ( ApproximateTextMatching, ExactTextMatching, TextMatching, ) +from pyrit.analytics.visualization import save_html __all__ = [ "analyze_results", + "AnalysisResult", "ApproximateTextMatching", "AttackStats", "ConversationAnalytics", + "DimensionExtractor", "ExactTextMatching", + "save_html", "TextMatching", ] diff --git a/pyrit/analytics/result_analysis.py b/pyrit/analytics/result_analysis.py index cc5f58fa70..1cb7947ec7 100644 --- a/pyrit/analytics/result_analysis.py +++ b/pyrit/analytics/result_analysis.py @@ -1,13 +1,26 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. +import warnings from collections import defaultdict -from dataclasses import dataclass -from typing import Optional +from collections.abc import Callable +from dataclasses import dataclass, field +from itertools import product +from typing import Optional, Union from pyrit.models import AttackOutcome, AttackResult +# --------------------------------------------------------------------------- +# Type alias for dimension extractors. +# An extractor receives an AttackResult and returns a list of string keys +# (list to support one-to-many mappings, e.g. multiple converters per attack). +# --------------------------------------------------------------------------- +DimensionExtractor = Callable[[AttackResult], list[str]] + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- @dataclass class AttackStats: """Statistics for attack analysis results.""" @@ -19,7 +32,206 @@ class AttackStats: undetermined: int -def _compute_stats(successes: int, failures: int, undetermined: int) -> AttackStats: +@dataclass +class AnalysisResult: + """ + Structured result from attack analysis. + + Attributes: + overall (AttackStats): Aggregate stats across all attack results. + dimensions (dict): Per-dimension breakdown. Keys are dimension names + (str) for single dimensions, or tuples of dimension names for + composite groupings. Values map dimension keys to AttackStats. + """ + + overall: AttackStats + dimensions: dict[Union[str, tuple[str, ...]], dict[Union[str, tuple[str, ...]], AttackStats]] = field( + default_factory=dict + ) + + def to_dataframe( + self, + dimension: Optional[Union[str, tuple[str, ...]]] = None, + ) -> "pandas.DataFrame": # type: ignore[name-defined] # noqa: F821 + """ + Export analysis results as a pandas DataFrame. + + When *dimension* is provided, only that dimension's breakdown is + returned. For composite dimensions the tuple keys are exploded into + individual columns. When *dimension* is ``None``, all dimensions and + the overall stats are returned in a single long-form DataFrame with a + ``dimension`` column. + + Args: + dimension (str | tuple[str, ...] | None): The dimension to export. + Pass a string for a single dimension (e.g. ``"harm_category"``), + a tuple for a composite dimension (e.g. + ``("harm_category", "attack_type")``), or ``None`` to export + everything. Defaults to ``None``. + + Returns: + pandas.DataFrame: A DataFrame with columns for dimension key(s) + and stats (``successes``, ``failures``, ``undetermined``, + ``total_decided``, ``success_rate``). + + Raises: + ImportError: If pandas is not installed. + KeyError: If the requested dimension is not in the results. + """ + try: + import pandas as pd + except ImportError as err: + raise ImportError("pandas is required for to_dataframe(). Install it with: pip install pandas") from err + + stats_columns = ["successes", "failures", "undetermined", "total_decided", "success_rate"] + + def _stats_row(stats: AttackStats) -> dict[str, object]: + return { + "successes": stats.successes, + "failures": stats.failures, + "undetermined": stats.undetermined, + "total_decided": stats.total_decided, + "success_rate": stats.success_rate, + } + + def _dim_rows( + dim_name: Union[str, tuple[str, ...]], + dim_data: dict[Union[str, tuple[str, ...]], AttackStats], + ) -> list[dict[str, object]]: + rows = [] + for key, stats in dim_data.items(): + row: dict[str, object] + if isinstance(dim_name, tuple): + # Explode composite key into individual columns + row = dict(zip(dim_name, key, strict=True)) + else: + row = {"dimension": dim_name, "key": key} + row.update(_stats_row(stats)) + rows.append(row) + return rows + + # Single dimension requested + if dimension is not None: + if dimension not in self.dimensions: + raise KeyError(f"Dimension {dimension!r} not found. Available: {list(self.dimensions.keys())}") + rows = _dim_rows(dimension, self.dimensions[dimension]) + cols: list[str] + if isinstance(dimension, tuple): + cols = list(dimension) + stats_columns + else: + cols = ["dimension", "key"] + stats_columns + return pd.DataFrame(rows, columns=cols) + + # All dimensions + overall + overall_row: dict[str, object] = {"dimension": "overall", "key": "all"} + overall_row.update(_stats_row(self.overall)) + all_rows: list[dict[str, object]] = [overall_row] + + for dim_name, dim_data in self.dimensions.items(): + if isinstance(dim_name, tuple): + # Composite dimensions: flatten as "dim1 × dim2" in the dimension column + label = " \u00d7 ".join(dim_name) + for key, stats in dim_data.items(): + row: dict[str, object] = {"dimension": label, "key": " \u00d7 ".join(str(k) for k in key)} + row.update(_stats_row(stats)) + all_rows.append(row) + else: + all_rows.extend(_dim_rows(dim_name, dim_data)) + + return pd.DataFrame(all_rows, columns=["dimension", "key"] + stats_columns) + + +# --------------------------------------------------------------------------- +# Built-in dimension extractors +# --------------------------------------------------------------------------- +def _extract_attack_type(result: AttackResult) -> list[str]: + """ + Extract the attack type from the attack identifier. + + Reads the ``class_name`` attribute from the ComponentIdentifier. + + Returns: + list[str]: A single-element list containing the attack type. + """ + return [result.attack_identifier.class_name if result.attack_identifier else "unknown"] + + +def _extract_converter_types(result: AttackResult) -> list[str]: + """ + Extract converter class names from the last response. + + Returns: + list[str]: Converter class names, or ``["no_converter"]`` if none. + """ + if result.last_response is not None and result.last_response.converter_identifiers: + return [conv.class_name for conv in result.last_response.converter_identifiers] + return ["no_converter"] + + +def _extract_labels(result: AttackResult) -> list[str]: + """ + Extract label key=value pairs from the last response. + + Returns: + list[str]: Label strings as ``"key=value"``, or ``["no_labels"]`` if none. + """ + if result.last_response is not None and result.last_response.labels: + return [f"{k}={v}" for k, v in result.last_response.labels.items()] + return ["no_labels"] + + +def _extract_harm_categories(result: AttackResult) -> list[str]: + """ + Extract targeted harm categories from the last response. + + Returns: + list[str]: Harm category strings, or ``["no_harm_category"]`` if none. + """ + if result.last_response is not None and result.last_response.targeted_harm_categories: + return result.last_response.targeted_harm_categories + return ["no_harm_category"] + + +DEFAULT_DIMENSIONS: dict[str, DimensionExtractor] = { + "attack_type": _extract_attack_type, + "converter_type": _extract_converter_types, + "harm_category": _extract_harm_categories, + "label": _extract_labels, +} + +# Deprecated aliases — maps old name to canonical name. +# Using the old name emits a DeprecationWarning. +_DEPRECATED_DIMENSION_ALIASES: dict[str, str] = { + "attack_identifier": "attack_type", +} + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- +_OUTCOME_KEYS: dict[AttackOutcome, str] = { + AttackOutcome.SUCCESS: "successes", + AttackOutcome.FAILURE: "failures", +} + + +def _outcome_key(outcome: AttackOutcome) -> str: + """ + Map an AttackOutcome to its counter key. + + Returns: + str: The counter key (``"successes"``, ``"failures"``, or ``"undetermined"``). + """ + return _OUTCOME_KEYS.get(outcome, "undetermined") + + +def _compute_stats(*, successes: int, failures: int, undetermined: int) -> AttackStats: + """ + Compute AttackStats from raw counts. + + Returns: + AttackStats: The computed statistics. + """ total_decided = successes + failures success_rate = successes / total_decided if total_decided > 0 else None return AttackStats( @@ -31,65 +243,162 @@ def _compute_stats(successes: int, failures: int, undetermined: int) -> AttackSt ) -def analyze_results(attack_results: list[AttackResult]) -> dict[str, AttackStats | dict[str, AttackStats]]: +def _build_stats(counts: defaultdict[str, int]) -> AttackStats: """ - Analyze a list of AttackResult objects and return overall and grouped statistics. + Build AttackStats from a counter dict. Returns: - A dictionary of AttackStats objects. The overall stats are accessible with the key - "Overall", and the stats of any attack can be retrieved using "By_attack_identifier" - followed by the identifier of the attack. + AttackStats: The computed statistics. + """ + return _compute_stats( + successes=counts["successes"], + failures=counts["failures"], + undetermined=counts["undetermined"], + ) + + +def _resolve_dimension_name(*, name: str, extractors: dict[str, DimensionExtractor]) -> str: + """ + Resolve a single dimension name, handling deprecated aliases. + + Returns: + str: The canonical dimension name. Raises: - ValueError: if attack_results is empty. - TypeError: if any element is not an AttackResult. + ValueError: If the dimension name is unknown. + """ + if name in extractors: + return name + canonical = _DEPRECATED_DIMENSION_ALIASES.get(name) + if canonical and canonical in extractors: + warnings.warn( + f"Dimension '{name}' is deprecated and will be removed in v0.13.0. Use '{canonical}' instead.", + DeprecationWarning, + stacklevel=4, + ) + return canonical + raise ValueError(f"Unknown dimension '{name}'. Available: {sorted(extractors.keys())}") + - Example: - >>> analyze_results(attack_results) - { - "Overall": AttackStats, - "By_attack_identifier": dict[str, AttackStats] - } +def _resolve_dimension_spec( + *, spec: Union[str, tuple[str, ...]], extractors: dict[str, DimensionExtractor] +) -> Union[str, tuple[str, ...]]: + """ + Resolve a group_by spec (single or composite), handling deprecated aliases. + + Returns: + Union[str, tuple[str, ...]]: The resolved spec with canonical dimension names. + """ + if isinstance(spec, str): + return _resolve_dimension_name(name=spec, extractors=extractors) + return tuple(_resolve_dimension_name(name=n, extractors=extractors) for n in spec) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- +def analyze_results( + attack_results: list[AttackResult], + *, + group_by: list[Union[str, tuple[str, ...]]] | None = None, + custom_dimensions: dict[str, DimensionExtractor] | None = None, +) -> AnalysisResult: + """ + Analyze attack results with flexible, dimension-based grouping. + + Computes overall stats and breaks down results by one or more dimensions. + Dimensions can be single (e.g. ``"converter_type"``) or composite tuples + (e.g. ``("converter_type", "attack_type")``) for cross-dimensional + grouping. + + Args: + attack_results (list[AttackResult]): The attack results to analyze. + group_by (list[str | tuple[str, ...]] | None): Dimensions to group by. + Each element is either a dimension name (str) for independent + grouping, or a tuple of dimension names for composite grouping. + Defaults to all registered single dimensions. + custom_dimensions (dict[str, DimensionExtractor] | None): Additional + or overriding dimension extractors keyed by name. Merged with + built-in defaults. + + Returns: + AnalysisResult: Overall stats and per-dimension breakdowns. + + Raises: + ValueError: If attack_results is empty or a dimension name is unknown. + TypeError: If any element is not an AttackResult. + + Examples: + Group by a single built-in dimension:: + + result = analyze_results(attacks, group_by=["attack_type"]) + for name, stats in result.dimensions["attack_type"].items(): + print(f"{name}: {stats.success_rate}") + + Group by a composite (cross-product) of two dimensions:: + + result = analyze_results( + attacks, + group_by=[("converter_type", "attack_type")], + ) + + Supply a custom dimension extractor:: + + def by_objective(r: AttackResult) -> list[str]: + return [r.objective] + + result = analyze_results( + attacks, + group_by=["objective"], + custom_dimensions={"objective": by_objective}, + ) """ if not attack_results: raise ValueError("attack_results cannot be empty") + # Merge extractors + extractors = dict(DEFAULT_DIMENSIONS) + if custom_dimensions: + extractors.update(custom_dimensions) + + # Resolve group_by — default to every registered dimension independently + if group_by is None: + group_by = list(extractors.keys()) + + # Resolve deprecated aliases and validate dimension names + group_by = [_resolve_dimension_spec(spec=spec, extractors=extractors) for spec in group_by] + + # Accumulators overall_counts: defaultdict[str, int] = defaultdict(int) - by_type_counts: defaultdict[str, defaultdict[str, int]] = defaultdict(lambda: defaultdict(int)) + dim_counts: dict[ + Union[str, tuple[str, ...]], + defaultdict[Union[str, tuple[str, ...]], defaultdict[str, int]], + ] = {spec: defaultdict(lambda: defaultdict(int)) for spec in group_by} + # Single pass over results for attack in attack_results: if not isinstance(attack, AttackResult): raise TypeError(f"Expected AttackResult, got {type(attack).__name__}: {attack!r}") - outcome = attack.outcome - attack_type = attack.attack_identifier.class_name if attack.attack_identifier else "unknown" - - if outcome == AttackOutcome.SUCCESS: - overall_counts["successes"] += 1 - by_type_counts[attack_type]["successes"] += 1 - elif outcome == AttackOutcome.FAILURE: - overall_counts["failures"] += 1 - by_type_counts[attack_type]["failures"] += 1 - else: - overall_counts["undetermined"] += 1 - by_type_counts[attack_type]["undetermined"] += 1 - - overall_stats = _compute_stats( - successes=overall_counts["successes"], - failures=overall_counts["failures"], - undetermined=overall_counts["undetermined"], - ) + key = _outcome_key(attack.outcome) + overall_counts[key] += 1 - by_type_stats = { - attack_type: _compute_stats( - successes=counts["successes"], - failures=counts["failures"], - undetermined=counts["undetermined"], - ) - for attack_type, counts in by_type_counts.items() - } + for spec in group_by: + if isinstance(spec, str): + for dim_value in extractors[spec](attack): + dim_counts[spec][dim_value][key] += 1 + else: + # Composite: cross-product of all sub-dimension values + sub_values = [extractors[name](attack) for name in spec] + for combo in product(*sub_values): + dim_counts[spec][combo][key] += 1 + + # Build result + dimension_stats: dict[Union[str, tuple[str, ...]], dict[Union[str, tuple[str, ...]], AttackStats]] = {} + for spec, counts_by_key in dim_counts.items(): + dimension_stats[spec] = {dim_key: _build_stats(counts) for dim_key, counts in counts_by_key.items()} - return { - "Overall": overall_stats, - "By_attack_identifier": by_type_stats, - } + return AnalysisResult( + overall=_build_stats(overall_counts), + dimensions=dimension_stats, + ) diff --git a/pyrit/analytics/visualization.py b/pyrit/analytics/visualization.py new file mode 100644 index 0000000000..f1370faa52 --- /dev/null +++ b/pyrit/analytics/visualization.py @@ -0,0 +1,720 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +Visualization utilities for analytics results. + +This module provides interactive HTML report generation for ``AnalysisResult`` +using Plotly. Plotly is lazy-imported so the core analytics module has no +hard dependency on it. If plotly is not installed, a clear error is raised +at call time. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Any, Optional, Union + +if TYPE_CHECKING: + from pyrit.analytics.result_analysis import AnalysisResult, AttackStats + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +_ASR_COLORSCALE: list[list[object]] = [ + [0.0, "#e74c3c"], + [0.5, "#f39c12"], + [1.0, "#2ecc71"], +] + +_HTML_TEMPLATE = """\ + + + + + {title} + + + + +

{title}

+{body} + + +""" + +_DEFAULT_COMPOSITE_DIMS: list[tuple[str, str]] = [ + ("harm_category", "attack_type"), + ("harm_category", "converter_type"), + ("attack_type", "converter_type"), +] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _import_plotly() -> tuple[Any, Any]: + """ + Lazy-import plotly and return (graph_objects, io) modules. + + Returns: + tuple: The ``plotly.graph_objects`` and ``plotly.io`` modules. + + Raises: + ImportError: If plotly is not installed. + """ + try: + import plotly.graph_objects as go + import plotly.io as pio + + return go, pio + except ImportError as err: + raise ImportError("plotly is required for HTML report features. Install it with: pip install plotly") from err + + +def _asr_css_class(rate: Optional[float]) -> str: + """ + Return a CSS class name for colour-coding a success rate. + + Returns: + str: ``"green"``, ``"yellow"``, ``"red"``, or ``""`` when unknown. + """ + if rate is None: + return "" + if rate >= 0.6: + return "green" + if rate >= 0.3: + return "yellow" + return "red" + + +def _asr_bar_color(rate: Optional[float]) -> str: + """ + Return an RGB colour string for a bar based on success rate. + + Returns: + str: CSS colour string. + """ + if rate is None: + return "rgba(180,180,180,0.5)" + if rate >= 0.6: + return "#2ecc71" + if rate >= 0.3: + return "#f39c12" + return "#e74c3c" + + +def _is_sparse( + dim_data: dict[Any, AttackStats], + *, + threshold: float = 0.5, +) -> bool: + """ + Return True when more than *threshold* fraction of cells lack decided data. + + Returns: + bool: True if the dimension data is too sparse to display. + """ + if not dim_data: + return True + none_count = sum(1 for s in dim_data.values() if s.success_rate is None) + return none_count / len(dim_data) > threshold + + +# --------------------------------------------------------------------------- +# Section builders +# --------------------------------------------------------------------------- + + +def _build_summary_html(*, result: AnalysisResult, title: str) -> str: + """ + Build the top-level KPI summary card as an HTML string. + + Returns: + str: HTML string for the summary card. + """ + o = result.overall + rate_str = f"{o.success_rate:.0%}" if o.success_rate is not None else "N/A" + cls = _asr_css_class(o.success_rate) + total = o.successes + o.failures + o.undetermined + kpis = [ + (rate_str, "Overall ASR", cls), + (str(total), "Total Attacks", ""), + (str(o.successes), "Successes", "green"), + (str(o.failures), "Failures", "red"), + (str(o.undetermined), "Undetermined", ""), + ] + kpi_html = "".join( + f'
{v}
{lbl}
' + for v, lbl, c in kpis + ) + return f'
{kpi_html}
' + + +def _build_bar_figure( + go: Any, + *, + dim_name: str, + dim_data: dict[Any, AttackStats], +) -> Any: + """ + Build a sorted horizontal bar chart of success rates for one dimension. + + Returns: + plotly.graph_objects.Figure: The bar chart figure. + """ + items = sorted( + dim_data.items(), + key=lambda kv: kv[1].success_rate if kv[1].success_rate is not None else -1, + reverse=True, + ) + labels = [str(k) for k, _ in items] + rates = [s.success_rate if s.success_rate is not None else 0.0 for _, s in items] + colors = [_asr_bar_color(s.success_rate) for _, s in items] + hover = [ + ( + f"{k}
ASR: {s.success_rate:.1%}
✓ {s.successes}   ✗ {s.failures}   ? {s.undetermined}" + if s.success_rate is not None + else f"{k}
No decided outcomes   ? {s.undetermined}" + ) + for k, s in items + ] + text = [f"{r:.0%}" if s.success_rate is not None else "—" for (_, s), r in zip(items, rates, strict=True)] + fig = go.Figure( + go.Bar( + x=rates, + y=labels, + orientation="h", + marker_color=colors, + hovertemplate="%{customdata}", + customdata=hover, + text=text, + textposition="outside", + ) + ) + fig.update_layout( + title=f"Success Rate by {dim_name}", + xaxis={"title": "Success Rate", "range": [0, 1.25], "tickformat": ".0%"}, + yaxis={"autorange": "reversed"}, + height=max(300, len(labels) * 44 + 100), + margin={"l": 10, "r": 80, "t": 48, "b": 40}, + plot_bgcolor="white", + paper_bgcolor="white", + ) + return fig + + +def _build_dropdown_bar_figure( + go: Any, + *, + result: "AnalysisResult", +) -> Any: + """ + Build a bar chart with a dropdown to switch between dimensions. + + Returns: + plotly.graph_objects.Figure: The bar chart figure with dropdown. + """ + single_dims = [d for d in result.dimensions if isinstance(d, str)] + if not single_dims: + return None + + traces = [] + max_labels = 0 + + for i, dim_name in enumerate(single_dims): + dim_data = result.dimensions[dim_name] + items = sorted( + dim_data.items(), + key=lambda kv: kv[1].success_rate if kv[1].success_rate is not None else -1, + reverse=True, + ) + labels = [str(k) for k, _ in items] + max_labels = max(max_labels, len(labels)) + rates = [s.success_rate if s.success_rate is not None else 0.0 for _, s in items] + colors = [_asr_bar_color(s.success_rate) for _, s in items] + hover = [ + ( + f"{k}
ASR: {s.success_rate:.1%}
✓ {s.successes}   ✗ {s.failures}   ? {s.undetermined}" + if s.success_rate is not None + else f"{k}
No decided outcomes   ? {s.undetermined}" + ) + for k, s in items + ] + text = [f"{r:.0%}" if s.success_rate is not None else "—" for (_, s), r in zip(items, rates, strict=True)] + + traces.append( + go.Bar( + x=rates, + y=labels, + orientation="h", + marker_color=colors, + hovertemplate="%{customdata}", + customdata=hover, + text=text, + textposition="outside", + visible=(i == 0), + name=dim_name, + ) + ) + + # Create dropdown buttons + buttons = [] + for i, dim_name in enumerate(single_dims): + visibility = [j == i for j in range(len(single_dims))] + label = dim_name.replace("_", " ").title() + buttons.append( + { + "label": label, + "method": "update", + "args": [ + {"visible": visibility}, + {"title": f"Success Rate by {label}"}, + ], + } + ) + + first_label = single_dims[0].replace("_", " ").title() + fig = go.Figure(data=traces) + fig.update_layout( + title=f"Success Rate by {first_label}", + xaxis={"title": "Success Rate", "range": [0, 1.25], "tickformat": ".0%"}, + yaxis={"autorange": "reversed"}, + height=max(350, max_labels * 44 + 120), + margin={"l": 10, "r": 80, "t": 80, "b": 40}, + plot_bgcolor="white", + paper_bgcolor="white", + updatemenus=[ + { + "buttons": buttons, + "direction": "down", + "showactive": True, + "x": 0.0, + "xanchor": "left", + "y": 1.12, + "yanchor": "top", + "bgcolor": "white", + "bordercolor": "#ccc", + } + ], + ) + return fig + + +def _build_z_matrix( + *, + row_keys: list[str], + col_keys: list[str], + lookup: dict[tuple[str, str], AttackStats], +) -> tuple[list[list[Optional[float]]], list[list[str]]]: + """ + Build the z-value and annotation matrices for a heatmap. + + Returns: + tuple: (z matrix, text annotation matrix). + """ + z: list[list[Optional[float]]] = [] + text: list[list[str]] = [] + for row in row_keys: + z_row: list[Optional[float]] = [] + t_row: list[str] = [] + for col in col_keys: + stats = lookup.get((row, col)) + if stats and stats.success_rate is not None: + z_row.append(stats.success_rate) + t_row.append(f"{stats.success_rate:.0%}
{stats.successes}/{stats.total_decided}") + else: + z_row.append(None) + t_row.append("—" if not stats else f"?{stats.undetermined}") + z.append(z_row) + text.append(t_row) + return z, text + + +def _build_heatmap_figure( + go: Any, + *, + dim_name: tuple[str, str], + dim_data: dict[Any, AttackStats], +) -> Any: + """ + Build a 2D success-rate heatmap for a composite dimension. + + Returns: + plotly.graph_objects.Figure: The heatmap figure. + """ + row_dim, col_dim = dim_name + row_keys = sorted({str(k[0]) for k in dim_data}) + col_keys = sorted({str(k[1]) for k in dim_data}) + lookup = {(str(k[0]), str(k[1])): v for k, v in dim_data.items()} + z, text = _build_z_matrix(row_keys=row_keys, col_keys=col_keys, lookup=lookup) + fig = go.Figure( + go.Heatmap( + z=z, + x=col_keys, + y=row_keys, + text=text, + texttemplate="%{text}", + colorscale=_ASR_COLORSCALE, + zmin=0, + zmax=1, + colorbar={"title": "ASR", "tickformat": ".0%"}, + hovertemplate=f"{row_dim}: %{{y}}
{col_dim}: %{{x}}
%{{text}}", + ) + ) + fig.update_layout( + title=f"Success Rate: {row_dim} \u00d7 {col_dim}", + xaxis_title=col_dim, + yaxis_title=row_dim, + height=max(350, len(row_keys) * 54 + 130), + margin={"l": 10, "r": 10, "t": 54, "b": 60}, + plot_bgcolor="white", + paper_bgcolor="white", + ) + return fig + + +def _build_dropdown_heatmap_figure( + go: Any, + *, + result: "AnalysisResult", + composite_dims: list[tuple[str, str]], + sparsity_threshold: float = 0.5, +) -> Any: + """ + Build a heatmap with a dropdown to switch between composite dimensions. + + Returns: + plotly.graph_objects.Figure or None: The heatmap figure with dropdown, + or None if no valid composite dimensions. + """ + # Filter to valid composite dimensions + valid_dims = [] + for dims in composite_dims: + if dims in result.dimensions and not _is_sparse( + result.dimensions[dims], threshold=sparsity_threshold + ): + valid_dims.append(dims) + + if not valid_dims: + return None + + traces = [] + max_rows = 0 + + for i, dim_name in enumerate(valid_dims): + dim_data = result.dimensions[dim_name] + row_dim, col_dim = dim_name + row_keys = sorted({str(k[0]) for k in dim_data}) + col_keys = sorted({str(k[1]) for k in dim_data}) + max_rows = max(max_rows, len(row_keys)) + lookup = {(str(k[0]), str(k[1])): v for k, v in dim_data.items()} + z, text = _build_z_matrix(row_keys=row_keys, col_keys=col_keys, lookup=lookup) + + traces.append( + go.Heatmap( + z=z, + x=col_keys, + y=row_keys, + text=text, + texttemplate="%{text}", + colorscale=_ASR_COLORSCALE, + zmin=0, + zmax=1, + colorbar={"title": "ASR", "tickformat": ".0%"}, + hovertemplate=f"{row_dim}: %{{y}}
{col_dim}: %{{x}}
%{{text}}", + visible=(i == 0), + name=f"{row_dim} × {col_dim}", + ) + ) + + # Create dropdown buttons + buttons = [] + for i, dim_name in enumerate(valid_dims): + row_dim, col_dim = dim_name + visibility = [j == i for j in range(len(valid_dims))] + label = f"{row_dim.replace('_', ' ').title()} × {col_dim.replace('_', ' ').title()}" + buttons.append( + { + "label": label, + "method": "update", + "args": [ + {"visible": visibility}, + { + "title": f"Success Rate: {row_dim} × {col_dim}", + "xaxis.title.text": col_dim, + "yaxis.title.text": row_dim, + }, + ], + } + ) + + first_row_dim, first_col_dim = valid_dims[0] + fig = go.Figure(data=traces) + fig.update_layout( + title=f"Success Rate: {first_row_dim} × {first_col_dim}", + xaxis_title=first_col_dim, + yaxis_title=first_row_dim, + height=max(400, max_rows * 54 + 150), + margin={"l": 10, "r": 10, "t": 80, "b": 60}, + plot_bgcolor="white", + paper_bgcolor="white", + updatemenus=[ + { + "buttons": buttons, + "direction": "down", + "showactive": True, + "x": 0.0, + "xanchor": "left", + "y": 1.12, + "yanchor": "top", + "bgcolor": "white", + "bordercolor": "#ccc", + } + ], + ) + return fig + + +def _build_faceted_heatmap_figure( + go: Any, + *, + dim_name: tuple[str, str, str], + dim_data: dict[Any, AttackStats], +) -> Any: + """ + Build a 3D heatmap with a dropdown to filter by the third dimension. + + Rows = dim_name[0], Columns = dim_name[1], Dropdown = dim_name[2]. + + Returns: + plotly.graph_objects.Figure: The faceted heatmap figure. + """ + row_dim, col_dim, facet_dim = dim_name + row_keys = sorted({str(k[0]) for k in dim_data}) + col_keys = sorted({str(k[1]) for k in dim_data}) + facet_vals = sorted({str(k[2]) for k in dim_data}) + + traces: list[Any] = [] + for i, fval in enumerate(facet_vals): + lookup = {(str(k[0]), str(k[1])): v for k, v in dim_data.items() if str(k[2]) == fval} + z, text = _build_z_matrix(row_keys=row_keys, col_keys=col_keys, lookup=lookup) + traces.append( + go.Heatmap( + z=z, + x=col_keys, + y=row_keys, + text=text, + texttemplate="%{text}", + colorscale=_ASR_COLORSCALE, + zmin=0, + zmax=1, + visible=(i == 0), + name=fval, + colorbar={"title": "ASR", "tickformat": ".0%"}, + hovertemplate=f"{row_dim}: %{{y}}
{col_dim}: %{{x}}
%{{text}}", + ) + ) + + buttons = [ + { + "label": fval, + "method": "update", + "args": [ + {"visible": [j == i for j in range(len(facet_vals))]}, + {"title": f"{row_dim} \u00d7 {col_dim} | {facet_dim}: {fval}"}, + ], + } + for i, fval in enumerate(facet_vals) + ] + fig = go.Figure(traces) + fig.update_layout( + title=f"{row_dim} \u00d7 {col_dim} | {facet_dim}: {facet_vals[0]}", + xaxis_title=col_dim, + yaxis_title=row_dim, + updatemenus=[ + { + "buttons": buttons, + "direction": "down", + "showactive": True, + "x": 0.01, + "xanchor": "left", + "y": 1.18, + "yanchor": "top", + } + ], + height=max(400, len(row_keys) * 54 + 160), + margin={"l": 10, "r": 10, "t": 90, "b": 60}, + plot_bgcolor="white", + paper_bgcolor="white", + ) + return fig + + +def _build_coverage_table_figure( + go: Any, + *, + result: AnalysisResult, +) -> Any: + """ + Build a data coverage table showing sample sizes per dimension key. + + Cells with fewer than 5 decided outcomes are highlighted in pink. + + Returns: + plotly.graph_objects.Figure: The table figure. + """ + rows: list[tuple[str, str, AttackStats]] = [("overall", "all", result.overall)] + for dim_name, dim_data in result.dimensions.items(): + if isinstance(dim_name, str): + for key, stats in dim_data.items(): + rows.append((dim_name, str(key), stats)) + + dims = [r[0] for r in rows] + keys = [r[1] for r in rows] + decided = [r[2].total_decided for r in rows] + undetermined = [r[2].undetermined for r in rows] + asr = [f"{r[2].success_rate:.0%}" if r[2].success_rate is not None else "—" for r in rows] + decided_colors = ["#fff5f5" if n < 5 else "white" for n in decided] + + fig = go.Figure( + go.Table( + header={ + "values": [ + "Dimension", + "Key", + "Decided", + "Undetermined", + "ASR", + ], + "fill_color": "#f0f2f5", + "align": "left", + "font": {"size": 12, "color": "#444"}, + "height": 36, + }, + cells={ + "values": [dims, keys, decided, undetermined, asr], + "fill_color": ["white", "white", decided_colors, "white", "white"], + "align": "left", + "font": {"size": 11}, + "height": 28, + }, + ) + ) + fig.update_layout( + title="Data Coverage (pink = fewer than 5 decided outcomes)", + height=min(600, max(300, len(rows) * 30 + 120)), + margin={"l": 0, "r": 0, "t": 44, "b": 10}, + ) + return fig + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def save_html( + result: AnalysisResult, + path: Union[str, Path], + *, + title: str = "Attack Analysis Report", + composite_dims: Optional[list[tuple[str, str]]] = None, + sparsity_threshold: float = 0.5, +) -> Path: + """ + Save a fully interactive HTML attack analysis report using Plotly. + + The report contains: + + * A KPI summary card (overall ASR, total attacks, outcome counts). + * One horizontal bar chart per single dimension in *result*. + * Heatmaps for each 2-tuple composite dimension (default: all three + combinations of ``attack_type``, ``converter_type``, and + ``harm_category``). + * A dropdown-faceted heatmap for any 3-tuple composite dimension found + in *result* (e.g. ``("harm_category", "converter_type", "attack_type")``). + * A data-coverage table flagging low sample sizes. + + The output is a single self-contained ``.html`` file — no server needed. + + Args: + result (AnalysisResult): The analysis result to report on. + path (str | Path): Output file path (e.g. ``"report.html"``). + title (str): Report title shown in the header. Defaults to + ``"Attack Analysis Report"``. + composite_dims (list[tuple[str, str]] | None): 2D heatmap pairs to + include. Defaults to all combinations of ``attack_type``, + ``converter_type``, and ``harm_category``. + sparsity_threshold (float): Skip a heatmap when the fraction of + empty cells exceeds this value. Defaults to ``0.5``. + + Returns: + Path: The path to the saved HTML file. + + Raises: + ImportError: If plotly is not installed. + """ + go, pio = _import_plotly() + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + + if composite_dims is None: + composite_dims = _DEFAULT_COMPOSITE_DIMS + + def _div(fig: Any) -> str: + html: str = pio.to_html(fig, include_plotlyjs=False, full_html=False) + return html + + sections: list[str] = [_build_summary_html(result=result, title=title)] + + # Single-dimension bar chart with dropdown selector + dropdown_fig = _build_dropdown_bar_figure(go, result=result) + if dropdown_fig is not None: + sections.append(f'

By Dimension

{_div(dropdown_fig)}
') + + # 2D heatmaps with dropdown selector + heatmap_parts: list[str] = [] + dropdown_heatmap_fig = _build_dropdown_heatmap_figure( + go, result=result, composite_dims=composite_dims, sparsity_threshold=sparsity_threshold + ) + if dropdown_heatmap_fig is not None: + heatmap_parts.append(_div(dropdown_heatmap_fig)) + + # 3-tuple faceted heatmaps + three_d_dims = [d for d in result.dimensions if isinstance(d, tuple) and len(d) == 3] + for dim_name in three_d_dims: + if _is_sparse(result.dimensions[dim_name], threshold=sparsity_threshold): + continue + heatmap_parts.append( + _div(_build_faceted_heatmap_figure(go, dim_name=dim_name, dim_data=result.dimensions[dim_name])) + ) + + if heatmap_parts: + sections.append(f'

Cross-Dimensional Analysis

{"".join(heatmap_parts)}
') + + # Coverage table + sections.append(f'
{_div(_build_coverage_table_figure(go, result=result))}
') + + body = "\n".join(sections) + path.write_text(_HTML_TEMPLATE.format(title=title, body=body), encoding="utf-8") + return path diff --git a/tests/unit/analytics/test_result_analysis.py b/tests/unit/analytics/test_result_analysis.py index 05d1b94f0e..c55aa146fa 100644 --- a/tests/unit/analytics/test_result_analysis.py +++ b/tests/unit/analytics/test_result_analysis.py @@ -1,27 +1,34 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -from typing import Optional +import warnings import pytest -from pyrit.analytics.result_analysis import AttackStats, analyze_results +from pyrit.analytics.result_analysis import ( + AnalysisResult, + AttackStats, + analyze_results, +) from pyrit.identifiers import ComponentIdentifier -from pyrit.models import AttackOutcome, AttackResult +from pyrit.models import AttackOutcome, AttackResult, MessagePiece -# helpers +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- def make_attack( outcome: AttackOutcome, - attack_type: Optional[str] = "default", + attack_type: str | None = "PromptSendingAttack", conversation_id: str = "conv-1", ) -> AttackResult: - """ - Minimal valid AttackResult for analytics tests. - """ - attack_identifier: Optional[ComponentIdentifier] = None + """Minimal valid AttackResult for analytics tests.""" + attack_identifier = None if attack_type is not None: - attack_identifier = ComponentIdentifier(class_name=attack_type, class_module="tests.unit.analytics") + attack_identifier = ComponentIdentifier( + class_name=attack_type, + class_module="pyrit.executor.attack", + ) return AttackResult( conversation_id=conversation_id, @@ -31,108 +38,729 @@ def make_attack( ) -def test_analyze_results_empty_raises(): - with pytest.raises(ValueError): - analyze_results([]) - - -def test_analyze_results_raises_on_invalid_object(): - with pytest.raises(TypeError): - analyze_results(["not-an-AttackResult"]) - - -@pytest.mark.parametrize( - "outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate", - [ - # all successes - ([AttackOutcome.SUCCESS, AttackOutcome.SUCCESS], 2, 0, 0, 1.0), - # all failures - ([AttackOutcome.FAILURE, AttackOutcome.FAILURE], 0, 2, 0, 0.0), - # mixed decided - ([AttackOutcome.SUCCESS, AttackOutcome.FAILURE], 1, 1, 0, 0.5), - # include undetermined (excluded from denominator) - ([AttackOutcome.SUCCESS, AttackOutcome.UNDETERMINED], 1, 0, 1, 1.0), - ([AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED], 0, 1, 1, 0.0), - # multiple with undetermined - ( - [AttackOutcome.SUCCESS, AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED], - 1, - 1, - 1, - 0.5, - ), - ], -) -def test_overall_success_rate_parametrized( - outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate -): - attacks = [make_attack(o) for o in outcomes] - result = analyze_results(attacks) - - assert isinstance(result["Overall"], AttackStats) - overall = result["Overall"] - assert overall.successes == expected_successes - assert overall.failures == expected_failures - assert overall.undetermined == expected_undetermined - assert overall.total_decided == expected_successes + expected_failures - assert overall.success_rate == expected_rate - - -@pytest.mark.parametrize( - "items, type_key, exp_succ, exp_fail, exp_und, exp_rate", - [ - # single type, mixed decided + undetermined - ( - [ - (AttackOutcome.SUCCESS, "crescendo"), - (AttackOutcome.FAILURE, "crescendo"), - (AttackOutcome.UNDETERMINED, "crescendo"), - ], - "crescendo", - 1, - 1, - 1, - 0.5, - ), - # two types with different balances - ( - [ - (AttackOutcome.SUCCESS, "crescendo"), - (AttackOutcome.FAILURE, "crescendo"), - (AttackOutcome.SUCCESS, "red_teaming"), - (AttackOutcome.FAILURE, "red_teaming"), - (AttackOutcome.SUCCESS, "red_teaming"), - ], - "red_teaming", - 2, - 1, - 0, - 2 / 3, - ), - # unknown type fallback (missing "type" key) - ( - [ - (AttackOutcome.FAILURE, None), - (AttackOutcome.UNDETERMINED, None), - (AttackOutcome.SUCCESS, None), +def make_converter( + class_name: str, + class_module: str = "pyrit.prompt_converter.test_converter", +) -> ComponentIdentifier: + """Create a test ComponentIdentifier for converter with minimal required fields.""" + return ComponentIdentifier( + class_name=class_name, + class_module=class_module, + ) + + +def make_attack_with_converters( + outcome: AttackOutcome, + converter_names: list[str], + attack_type: str = "test", + conversation_id: str = "conv-1", +) -> AttackResult: + """Create an AttackResult with converter identifiers on last_response.""" + converters = [make_converter(name) for name in converter_names] + message = MessagePiece( + role="user", + original_value="test", + converter_identifiers=converters, + ) + attack_identifier = ComponentIdentifier( + class_name=attack_type, + class_module="pyrit.executor.attack", + ) + return AttackResult( + conversation_id=conversation_id, + objective="test", + attack_identifier=attack_identifier, + outcome=outcome, + last_response=message, + ) + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- +class TestAnalyzeResultsValidation: + """Input validation for analyze_results.""" + + def test_empty_raises(self): + with pytest.raises(ValueError, match="cannot be empty"): + analyze_results([]) + + def test_invalid_object_raises(self): + with pytest.raises(TypeError, match="Expected AttackResult"): + analyze_results(["not-an-AttackResult"]) + + def test_unknown_dimension_raises(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + with pytest.raises(ValueError, match="Unknown dimension 'nonexistent'"): + analyze_results(attacks, group_by=["nonexistent"]) + + def test_unknown_dimension_in_composite_raises(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + with pytest.raises(ValueError, match="Unknown dimension 'bad_dim'"): + analyze_results(attacks, group_by=[("attack_type", "bad_dim")]) + + +# --------------------------------------------------------------------------- +# Overall stats +# --------------------------------------------------------------------------- +class TestOverallStats: + """Overall stats computation (no dimension breakdown).""" + + @pytest.mark.parametrize( + "outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate", + [ + ([AttackOutcome.SUCCESS, AttackOutcome.SUCCESS], 2, 0, 0, 1.0), + ([AttackOutcome.FAILURE, AttackOutcome.FAILURE], 0, 2, 0, 0.0), + ([AttackOutcome.SUCCESS, AttackOutcome.FAILURE], 1, 1, 0, 0.5), + ([AttackOutcome.SUCCESS, AttackOutcome.UNDETERMINED], 1, 0, 1, 1.0), + ([AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED], 0, 1, 1, 0.0), + ( + [AttackOutcome.SUCCESS, AttackOutcome.FAILURE, AttackOutcome.UNDETERMINED], + 1, + 1, + 1, + 0.5, + ), + ], + ) + def test_overall_stats(self, outcomes, expected_successes, expected_failures, expected_undetermined, expected_rate): + attacks = [make_attack(o) for o in outcomes] + result = analyze_results(attacks, group_by=[]) + + assert isinstance(result, AnalysisResult) + overall = result.overall + assert overall.successes == expected_successes + assert overall.failures == expected_failures + assert overall.undetermined == expected_undetermined + assert overall.total_decided == expected_successes + expected_failures + assert overall.success_rate == expected_rate + + def test_all_undetermined_gives_none_rate(self): + attacks = [make_attack(AttackOutcome.UNDETERMINED)] + result = analyze_results(attacks, group_by=[]) + assert result.overall.success_rate is None + assert result.overall.total_decided == 0 + + +# --------------------------------------------------------------------------- +# Single dimension: attack_identifier +# --------------------------------------------------------------------------- +class TestGroupByAttackType: + """Group-by a single dimension: attack_type.""" + + @pytest.mark.parametrize( + "items, type_key, exp_succ, exp_fail, exp_und, exp_rate", + [ + ( + [ + (AttackOutcome.SUCCESS, "CrescendoAttack"), + (AttackOutcome.FAILURE, "CrescendoAttack"), + (AttackOutcome.UNDETERMINED, "CrescendoAttack"), + ], + "CrescendoAttack", + 1, + 1, + 1, + 0.5, + ), + ( + [ + (AttackOutcome.SUCCESS, "CrescendoAttack"), + (AttackOutcome.FAILURE, "CrescendoAttack"), + (AttackOutcome.SUCCESS, "RedTeamingAttack"), + (AttackOutcome.FAILURE, "RedTeamingAttack"), + (AttackOutcome.SUCCESS, "RedTeamingAttack"), + ], + "RedTeamingAttack", + 2, + 1, + 0, + 2 / 3, + ), + ( + [ + (AttackOutcome.FAILURE, None), + (AttackOutcome.UNDETERMINED, None), + (AttackOutcome.SUCCESS, None), + ], + "unknown", + 1, + 1, + 1, + 0.5, + ), + ], + ) + def test_single_dimension(self, items, type_key, exp_succ, exp_fail, exp_und, exp_rate): + attacks = [make_attack(outcome=o, attack_type=t) for (o, t) in items] + result = analyze_results(attacks, group_by=["attack_type"]) + + assert "attack_type" in result.dimensions + stats = result.dimensions["attack_type"][type_key] + assert isinstance(stats, AttackStats) + assert stats.successes == exp_succ + assert stats.failures == exp_fail + assert stats.undetermined == exp_und + assert stats.total_decided == exp_succ + exp_fail + assert stats.success_rate == exp_rate + + +# --------------------------------------------------------------------------- +# Single dimension: converter_type +# --------------------------------------------------------------------------- +class TestGroupByConverterType: + """Group-by a single dimension: converter_type.""" + + def test_no_converter_tracked(self): + attacks = [ + AttackResult( + conversation_id="conv-1", + objective="test", + attack_identifier=ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.SUCCESS, + last_response=None, + ), + AttackResult( + conversation_id="conv-2", + objective="test", + attack_identifier=ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.FAILURE, + last_response=None, + ), + ] + result = analyze_results(attacks, group_by=["converter_type"]) + + stats = result.dimensions["converter_type"]["no_converter"] + assert stats.successes == 1 + assert stats.failures == 1 + assert stats.success_rate == 0.5 + + def test_multiple_converter_types(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]), + make_attack_with_converters(AttackOutcome.FAILURE, ["ROT13Converter"]), + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]), + ] + result = analyze_results(attacks, group_by=["converter_type"]) + + base64 = result.dimensions["converter_type"]["Base64Converter"] + assert base64.successes == 2 + assert base64.failures == 0 + assert base64.success_rate == 1.0 + + rot13 = result.dimensions["converter_type"]["ROT13Converter"] + assert rot13.successes == 0 + assert rot13.failures == 1 + assert rot13.success_rate == 0.0 + + def test_multiple_converters_per_attack(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter", "ROT13Converter"]), + ] + result = analyze_results(attacks, group_by=["converter_type"]) + + assert result.dimensions["converter_type"]["Base64Converter"].successes == 1 + assert result.dimensions["converter_type"]["ROT13Converter"].successes == 1 + + def test_undetermined_tracked(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]), + make_attack_with_converters(AttackOutcome.UNDETERMINED, ["Base64Converter"]), + ] + result = analyze_results(attacks, group_by=["converter_type"]) + + stats = result.dimensions["converter_type"]["Base64Converter"] + assert stats.successes == 1 + assert stats.undetermined == 1 + assert stats.total_decided == 1 + assert stats.success_rate == 1.0 + + +# --------------------------------------------------------------------------- +# Composite dimensions +# --------------------------------------------------------------------------- +class TestCompositeDimensions: + """Group-by composite (cross-product) dimensions.""" + + def test_composite_two_dimensions(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="CrescendoAttack"), + make_attack_with_converters(AttackOutcome.FAILURE, ["ROT13Converter"], attack_type="CrescendoAttack"), + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="RedTeamingAttack"), + ] + result = analyze_results(attacks, group_by=[("converter_type", "attack_type")]) + + dim = result.dimensions[("converter_type", "attack_type")] + assert dim[("Base64Converter", "CrescendoAttack")].successes == 1 + assert dim[("Base64Converter", "CrescendoAttack")].failures == 0 + assert dim[("ROT13Converter", "CrescendoAttack")].failures == 1 + assert dim[("Base64Converter", "RedTeamingAttack")].successes == 1 + + def test_composite_with_multi_converter_creates_cross_product(self): + attacks = [ + make_attack_with_converters( + AttackOutcome.SUCCESS, + ["Base64Converter", "ROT13Converter"], + attack_type="CrescendoAttack", + ), + ] + result = analyze_results(attacks, group_by=[("converter_type", "attack_type")]) + + dim = result.dimensions[("converter_type", "attack_type")] + assert ("Base64Converter", "CrescendoAttack") in dim + assert ("ROT13Converter", "CrescendoAttack") in dim + assert dim[("Base64Converter", "CrescendoAttack")].successes == 1 + assert dim[("ROT13Converter", "CrescendoAttack")].successes == 1 + + def test_mixed_single_and_composite(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="CrescendoAttack"), + make_attack_with_converters(AttackOutcome.FAILURE, ["ROT13Converter"], attack_type="RedTeamingAttack"), + ] + result = analyze_results( + attacks, + group_by=[ + "attack_type", + ("converter_type", "attack_type"), ], - "unknown", - 1, - 1, - 1, - 0.5, - ), - ], -) -def test_group_by_attack_type_parametrized(items, type_key, exp_succ, exp_fail, exp_und, exp_rate): - attacks = [make_attack(outcome=o, attack_type=t) for (o, t) in items] - result = analyze_results(attacks) - - assert type_key in result["By_attack_identifier"] - stats = result["By_attack_identifier"][type_key] - assert isinstance(stats, AttackStats) - assert stats.successes == exp_succ - assert stats.failures == exp_fail - assert stats.undetermined == exp_und - assert stats.total_decided == exp_succ + exp_fail - assert stats.success_rate == exp_rate + ) + + # Single dimension present + assert "attack_type" in result.dimensions + assert result.dimensions["attack_type"]["CrescendoAttack"].successes == 1 + assert result.dimensions["attack_type"]["RedTeamingAttack"].failures == 1 + + # Composite dimension present + composite = result.dimensions[("converter_type", "attack_type")] + assert composite[("Base64Converter", "CrescendoAttack")].successes == 1 + assert composite[("ROT13Converter", "RedTeamingAttack")].failures == 1 + + +# --------------------------------------------------------------------------- +# Custom dimensions +# --------------------------------------------------------------------------- +class TestCustomDimensions: + """User-supplied custom dimension extractors.""" + + def test_custom_extractor(self): + def _extract_objective(result: AttackResult) -> list[str]: + return [result.objective] + + attacks = [ + AttackResult( + conversation_id="c1", + objective="steal secrets", + attack_identifier=ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.SUCCESS, + ), + AttackResult( + conversation_id="c2", + objective="bypass filter", + attack_identifier=ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.FAILURE, + ), + ] + result = analyze_results( + attacks, + group_by=["objective"], + custom_dimensions={"objective": _extract_objective}, + ) + + assert result.dimensions["objective"]["steal secrets"].successes == 1 + assert result.dimensions["objective"]["bypass filter"].failures == 1 + + def test_custom_dimension_in_composite(self): + def _extract_objective(result: AttackResult) -> list[str]: + return [result.objective] + + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"]), + ] + # Override objective on the attack for testing + attacks[0].objective = "test_obj" + + result = analyze_results( + attacks, + group_by=[("converter_type", "objective")], + custom_dimensions={"objective": _extract_objective}, + ) + + composite = result.dimensions[("converter_type", "objective")] + assert ("Base64Converter", "test_obj") in composite + + +# --------------------------------------------------------------------------- +# Single dimension: label +# --------------------------------------------------------------------------- +class TestGroupByLabel: + """Group-by a single dimension: label.""" + + def test_no_labels_tracked(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + result = analyze_results(attacks, group_by=["label"]) + + stats = result.dimensions["label"]["no_labels"] + assert stats.successes == 1 + assert stats.total_decided == 1 + + def test_single_label(self): + message = MessagePiece( + role="user", + original_value="test", + labels={"operation_name": "op_trash_panda"}, + ) + attacks = [ + AttackResult( + conversation_id="c1", + objective="test", + attack_identifier=ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + ] + result = analyze_results(attacks, group_by=["label"]) + + assert "operation_name=op_trash_panda" in result.dimensions["label"] + assert result.dimensions["label"]["operation_name=op_trash_panda"].successes == 1 + + def test_multiple_labels_per_attack(self): + """Each label key=value pair creates its own stats entry.""" + message = MessagePiece( + role="user", + original_value="test", + labels={"operation_name": "op_trash_panda", "operator": "roakey"}, + ) + attacks = [ + AttackResult( + conversation_id="c1", + objective="test", + attack_identifier=ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + ] + result = analyze_results(attacks, group_by=["label"]) + + assert result.dimensions["label"]["operation_name=op_trash_panda"].successes == 1 + assert result.dimensions["label"]["operator=roakey"].successes == 1 + + def test_label_composite_with_attack_type(self): + message = MessagePiece( + role="user", + original_value="test", + labels={"operator": "roakey"}, + ) + attacks = [ + AttackResult( + conversation_id="c1", + objective="test", + attack_identifier=ComponentIdentifier( + class_name="CrescendoAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + AttackResult( + conversation_id="c2", + objective="test", + attack_identifier=ComponentIdentifier( + class_name="CrescendoAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.FAILURE, + last_response=message, + ), + ] + result = analyze_results(attacks, group_by=[("label", "attack_type")]) + + dim = result.dimensions[("label", "attack_type")] + assert ("operator=roakey", "CrescendoAttack") in dim + assert dim[("operator=roakey", "CrescendoAttack")].successes == 1 + assert dim[("operator=roakey", "CrescendoAttack")].failures == 1 + + +# --------------------------------------------------------------------------- +# Default group_by behavior +# --------------------------------------------------------------------------- +class TestDefaultGroupBy: + """When group_by=None, all built-in dimensions are used.""" + + def test_defaults_include_all_builtin_dimensions(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + result = analyze_results(attacks) + + assert "attack_type" in result.dimensions + assert "converter_type" in result.dimensions + assert "label" in result.dimensions + + def test_empty_group_by_returns_only_overall(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + result = analyze_results(attacks, group_by=[]) + + assert result.dimensions == {} + assert result.overall.successes == 1 + + +# --------------------------------------------------------------------------- +# Deprecated dimension alias: attack_identifier -> attack_type +# --------------------------------------------------------------------------- +class TestDeprecatedAttackIdentifierAlias: + """Using 'attack_identifier' in group_by should work but warn.""" + + def test_alias_emits_deprecation_warning(self): + attacks = [make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack")] + with pytest.warns(DeprecationWarning, match="'attack_identifier' is deprecated"): + analyze_results(attacks, group_by=["attack_identifier"]) + + def test_alias_resolves_to_canonical_key(self): + attacks = [ + make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack"), + make_attack(AttackOutcome.FAILURE, attack_type="CrescendoAttack"), + ] + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + result = analyze_results(attacks, group_by=["attack_identifier"]) + + # The dimension key in the result should be the canonical "attack_type" + assert "attack_type" in result.dimensions + assert "attack_identifier" not in result.dimensions + assert result.dimensions["attack_type"]["CrescendoAttack"].successes == 1 + + def test_alias_in_composite(self): + attacks = [ + make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="CrescendoAttack"), + ] + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + result = analyze_results(attacks, group_by=[("converter_type", "attack_identifier")]) + + # Composite key uses canonical names + assert ("converter_type", "attack_type") in result.dimensions + dim = result.dimensions[("converter_type", "attack_type")] + assert ("Base64Converter", "CrescendoAttack") in dim + + +# --------------------------------------------------------------------------- +# Single dimension: harm_category +# --------------------------------------------------------------------------- +class TestGroupByHarmCategory: + """Group-by a single dimension: harm_category.""" + + def test_no_harm_category_tracked(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + result = analyze_results(attacks, group_by=["harm_category"]) + + stats = result.dimensions["harm_category"]["no_harm_category"] + assert stats.successes == 1 + assert stats.total_decided == 1 + + def test_single_harm_category(self): + message = MessagePiece( + role="user", + original_value="test", + targeted_harm_categories=["hate_speech"], + ) + attacks = [ + AttackResult( + conversation_id="c1", + objective="test", + attack_identifier=ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + ] + result = analyze_results(attacks, group_by=["harm_category"]) + + stats = result.dimensions["harm_category"]["hate_speech"] + assert stats.successes == 1 + assert stats.total_decided == 1 + + def test_multiple_harm_categories_per_attack(self): + message = MessagePiece( + role="user", + original_value="test", + targeted_harm_categories=["violence", "hate_speech"], + ) + attacks = [ + AttackResult( + conversation_id="c1", + objective="test", + attack_identifier=ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + ] + result = analyze_results(attacks, group_by=["harm_category"]) + + # Attack counted under both categories + assert result.dimensions["harm_category"]["violence"].successes == 1 + assert result.dimensions["harm_category"]["hate_speech"].successes == 1 + + def test_multiple_attacks_different_harm_categories(self): + def _make(category: str, outcome: AttackOutcome) -> AttackResult: + message = MessagePiece( + role="user", + original_value="test", + targeted_harm_categories=[category], + ) + return AttackResult( + conversation_id="c1", + objective="test", + attack_identifier=ComponentIdentifier( + class_name="PromptSendingAttack", class_module="pyrit.executor.attack" + ), + outcome=outcome, + last_response=message, + ) + + attacks = [ + _make("violence", AttackOutcome.SUCCESS), + _make("violence", AttackOutcome.FAILURE), + _make("self_harm", AttackOutcome.SUCCESS), + ] + result = analyze_results(attacks, group_by=["harm_category"]) + + violence = result.dimensions["harm_category"]["violence"] + assert violence.successes == 1 + assert violence.failures == 1 + + self_harm = result.dimensions["harm_category"]["self_harm"] + assert self_harm.successes == 1 + assert self_harm.failures == 0 + + def test_harm_category_composite_with_attack_type(self): + message = MessagePiece( + role="user", + original_value="test", + targeted_harm_categories=["violence"], + ) + attacks = [ + AttackResult( + conversation_id="c1", + objective="test", + attack_identifier=ComponentIdentifier( + class_name="CrescendoAttack", class_module="pyrit.executor.attack" + ), + outcome=AttackOutcome.SUCCESS, + last_response=message, + ), + ] + result = analyze_results(attacks, group_by=[("harm_category", "attack_type")]) + + dim = result.dimensions[("harm_category", "attack_type")] + assert ("violence", "CrescendoAttack") in dim + assert dim[("violence", "CrescendoAttack")].successes == 1 + + +# --------------------------------------------------------------------------- +# to_dataframe +# --------------------------------------------------------------------------- +class TestToDataframe: + """Tests for AnalysisResult.to_dataframe().""" + + def test_single_dimension_columns(self): + attacks = [ + make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack"), + make_attack(AttackOutcome.FAILURE, attack_type="CrescendoAttack"), + ] + result = analyze_results(attacks, group_by=["attack_type"]) + df = result.to_dataframe("attack_type") + + assert list(df.columns) == [ + "dimension", + "key", + "successes", + "failures", + "undetermined", + "total_decided", + "success_rate", + ] + assert len(df) == 1 + row = df.iloc[0] + assert row["dimension"] == "attack_type" + assert row["key"] == "CrescendoAttack" + assert row["successes"] == 1 + assert row["failures"] == 1 + assert row["success_rate"] == 0.5 + + def test_single_dimension_multiple_keys(self): + attacks = [ + make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack"), + make_attack(AttackOutcome.SUCCESS, attack_type="RedTeamingAttack"), + make_attack(AttackOutcome.FAILURE, attack_type="RedTeamingAttack"), + ] + result = analyze_results(attacks, group_by=["attack_type"]) + df = result.to_dataframe("attack_type") + + assert len(df) == 2 + keys = set(df["key"]) + assert keys == {"CrescendoAttack", "RedTeamingAttack"} + + def test_composite_dimension_explodes_columns(self): + attacks = [make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="Crescendo")] + result = analyze_results(attacks, group_by=[("converter_type", "attack_type")]) + df = result.to_dataframe(("converter_type", "attack_type")) + + assert "converter_type" in df.columns + assert "attack_type" in df.columns + assert "dimension" not in df.columns + assert "key" not in df.columns + assert df.iloc[0]["converter_type"] == "Base64Converter" + assert df.iloc[0]["attack_type"] == "Crescendo" + assert df.iloc[0]["successes"] == 1 + + def test_no_arg_includes_overall_and_all_dimensions(self): + attacks = [ + make_attack(AttackOutcome.SUCCESS, attack_type="CrescendoAttack"), + make_attack(AttackOutcome.FAILURE, attack_type="CrescendoAttack"), + ] + result = analyze_results(attacks, group_by=["attack_type"]) + df = result.to_dataframe() + + assert "overall" in df["dimension"].values + assert "attack_type" in df["dimension"].values + + overall_row = df[df["dimension"] == "overall"].iloc[0] + assert overall_row["key"] == "all" + assert overall_row["successes"] == 1 + assert overall_row["failures"] == 1 + + def test_no_arg_composite_dimension_flattened(self): + attacks = [make_attack_with_converters(AttackOutcome.SUCCESS, ["Base64Converter"], attack_type="Crescendo")] + result = analyze_results(attacks, group_by=[("converter_type", "attack_type")]) + df = result.to_dataframe() + + dims = df["dimension"].unique() + assert any("×" in d for d in dims) + + def test_unknown_dimension_raises(self): + attacks = [make_attack(AttackOutcome.SUCCESS)] + result = analyze_results(attacks, group_by=["attack_type"]) + + with pytest.raises(KeyError): + result.to_dataframe("nonexistent_dimension") + + def test_undetermined_included(self): + attacks = [make_attack(AttackOutcome.UNDETERMINED, attack_type="CrescendoAttack")] + result = analyze_results(attacks, group_by=["attack_type"]) + df = result.to_dataframe("attack_type") + + row = df.iloc[0] + assert row["undetermined"] == 1 + assert row["total_decided"] == 0 + assert row["success_rate"] is None diff --git a/tests/unit/analytics/test_visualization.py b/tests/unit/analytics/test_visualization.py new file mode 100644 index 0000000000..12be91f165 --- /dev/null +++ b/tests/unit/analytics/test_visualization.py @@ -0,0 +1,261 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from pyrit.analytics.result_analysis import AnalysisResult, AttackStats, analyze_results +from pyrit.models import AttackOutcome, AttackResult + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_attack( + *, + conversation_id: str, + attack_type: str = "CrescendoAttack", + outcome: AttackOutcome = AttackOutcome.SUCCESS, + harm_category: str = "violence", +) -> AttackResult: + mock_piece = MagicMock() + mock_piece.targeted_harm_categories = [harm_category] + mock_piece.converter_identifiers = [] + + attack = MagicMock(spec=AttackResult) + attack.conversation_id = conversation_id + attack.objective = "test" + attack.outcome = outcome + attack.last_response = mock_piece + attack.attack_identifier = MagicMock() + attack.attack_identifier.class_name = attack_type + attack.converter_identifiers = [] + return attack + + +def _make_result(*, group_by: list[str] | None = None) -> AnalysisResult: + attacks = [ + _make_attack(conversation_id="c1", attack_type="CrescendoAttack", outcome=AttackOutcome.SUCCESS), + _make_attack(conversation_id="c2", attack_type="CrescendoAttack", outcome=AttackOutcome.FAILURE), + _make_attack( + conversation_id="c3", + attack_type="RedTeamingAttack", + outcome=AttackOutcome.SUCCESS, + harm_category="hate_speech", + ), + _make_attack( + conversation_id="c4", + attack_type="RedTeamingAttack", + outcome=AttackOutcome.UNDETERMINED, + harm_category="hate_speech", + ), + ] + return analyze_results(attacks, group_by=group_by or ["attack_type", "harm_category"]) + + +# --------------------------------------------------------------------------- +# Tests: save_html +# --------------------------------------------------------------------------- + + +class TestSaveHtml: + """Tests for save_html function.""" + + def test_save_html_creates_file(self, tmp_path: Path) -> None: + """save_html writes an HTML file at the given path.""" + plotly = pytest.importorskip("plotly") # noqa: F841 + from pyrit.analytics.visualization import save_html + + result = _make_result() + out = tmp_path / "report.html" + returned = save_html(result, out) + + assert returned == out + assert out.exists() + assert out.stat().st_size > 0 + + def test_save_html_returns_path(self, tmp_path: Path) -> None: + """save_html returns a Path object.""" + pytest.importorskip("plotly") + from pyrit.analytics.visualization import save_html + + result = _make_result() + returned = save_html(result, tmp_path / "r.html") + assert isinstance(returned, Path) + + def test_save_html_accepts_string_path(self, tmp_path: Path) -> None: + """save_html accepts a plain string as the path argument.""" + pytest.importorskip("plotly") + from pyrit.analytics.visualization import save_html + + result = _make_result() + str_path = str(tmp_path / "report.html") + returned = save_html(result, str_path) + assert returned == Path(str_path) + assert Path(str_path).exists() + + def test_save_html_creates_parent_dirs(self, tmp_path: Path) -> None: + """save_html creates missing parent directories.""" + pytest.importorskip("plotly") + from pyrit.analytics.visualization import save_html + + result = _make_result() + nested = tmp_path / "a" / "b" / "report.html" + save_html(result, nested) + assert nested.exists() + + def test_save_html_contains_plotly_cdn(self, tmp_path: Path) -> None: + """The saved HTML includes the Plotly CDN script tag.""" + pytest.importorskip("plotly") + from pyrit.analytics.visualization import save_html + + result = _make_result() + out = tmp_path / "report.html" + save_html(result, out) + content = out.read_text(encoding="utf-8") + assert "plotly" in content.lower() + + def test_save_html_contains_title(self, tmp_path: Path) -> None: + """Custom title appears in the saved HTML.""" + pytest.importorskip("plotly") + from pyrit.analytics.visualization import save_html + + result = _make_result() + out = tmp_path / "report.html" + save_html(result, out, title="My Custom Report") + content = out.read_text(encoding="utf-8") + assert "My Custom Report" in content + + def test_save_html_contains_overall_stats(self, tmp_path: Path) -> None: + """The KPI card includes overall ASR and counts.""" + pytest.importorskip("plotly") + from pyrit.analytics.visualization import save_html + + result = _make_result() + out = tmp_path / "report.html" + save_html(result, out) + content = out.read_text(encoding="utf-8") + assert "Overall ASR" in content + assert "Total Attacks" in content + + def test_save_html_no_plotly_raises(self, tmp_path: Path) -> None: + """save_html raises ImportError when plotly is not installed.""" + with patch.dict("sys.modules", {"plotly": None, "plotly.graph_objects": None, "plotly.io": None}): + import importlib + + import pyrit.analytics.visualization as vis_mod + + importlib.reload(vis_mod) + + result = _make_result() + with pytest.raises(ImportError, match="plotly is required"): + vis_mod.save_html(result, tmp_path / "r.html") + + +# --------------------------------------------------------------------------- +# Tests: sparsity guard +# --------------------------------------------------------------------------- + + +class TestSparsityGuard: + """Tests for the sparsity filtering in save_html.""" + + def test_sparse_heatmap_skipped(self, tmp_path: Path) -> None: + """A composite dim with >50% empty cells is not rendered as a heatmap.""" + pytest.importorskip("plotly") + from pyrit.analytics.visualization import _is_sparse + + sparse_data = { + ("a", "x"): AttackStats(success_rate=0.5, total_decided=2, successes=1, failures=1, undetermined=0), + ("b", "x"): AttackStats(success_rate=None, total_decided=0, successes=0, failures=0, undetermined=1), + ("c", "x"): AttackStats(success_rate=None, total_decided=0, successes=0, failures=0, undetermined=1), + } + assert _is_sparse(sparse_data, threshold=0.5) is True + + def test_dense_heatmap_not_skipped(self) -> None: + """A composite dim with ≤50% empty cells passes the sparsity check.""" + from pyrit.analytics.visualization import _is_sparse + + dense_data = { + ("a", "x"): AttackStats(success_rate=0.8, total_decided=5, successes=4, failures=1, undetermined=0), + ("b", "x"): AttackStats(success_rate=0.4, total_decided=5, successes=2, failures=3, undetermined=0), + ("c", "x"): AttackStats(success_rate=None, total_decided=0, successes=0, failures=0, undetermined=1), + } + assert _is_sparse(dense_data, threshold=0.5) is False + + def test_empty_dim_data_is_sparse(self) -> None: + """Empty dim_data is considered sparse.""" + from pyrit.analytics.visualization import _is_sparse + + assert _is_sparse({}) is True + + +# --------------------------------------------------------------------------- +# Tests: heatmap builders +# --------------------------------------------------------------------------- + + +class TestBuildZMatrix: + """Tests for _build_z_matrix.""" + + def test_z_matrix_shape(self) -> None: + """z matrix dimensions match row_keys × col_keys.""" + from pyrit.analytics.visualization import _build_z_matrix + + row_keys = ["r1", "r2"] + col_keys = ["c1", "c2", "c3"] + lookup = { + ("r1", "c1"): AttackStats(success_rate=0.8, total_decided=5, successes=4, failures=1, undetermined=0), + } + z, text = _build_z_matrix(row_keys=row_keys, col_keys=col_keys, lookup=lookup) + assert len(z) == 2 + assert len(z[0]) == 3 + + def test_missing_cell_is_none(self) -> None: + """Cells absent from lookup get None in the z matrix.""" + from pyrit.analytics.visualization import _build_z_matrix + + z, _ = _build_z_matrix(row_keys=["r1"], col_keys=["c1"], lookup={}) + assert z[0][0] is None + + def test_present_cell_has_rate(self) -> None: + """Cells present in lookup get the correct success_rate.""" + from pyrit.analytics.visualization import _build_z_matrix + + lookup = { + ("r1", "c1"): AttackStats(success_rate=0.75, total_decided=4, successes=3, failures=1, undetermined=0), + } + z, _ = _build_z_matrix(row_keys=["r1"], col_keys=["c1"], lookup=lookup) + assert z[0][0] == pytest.approx(0.75) + + +# --------------------------------------------------------------------------- +# Tests: helper functions +# --------------------------------------------------------------------------- + + +class TestAsrCssClass: + """Tests for _asr_css_class.""" + + def test_high_rate_is_green(self) -> None: + from pyrit.analytics.visualization import _asr_css_class + + assert _asr_css_class(0.9) == "green" + + def test_mid_rate_is_yellow(self) -> None: + from pyrit.analytics.visualization import _asr_css_class + + assert _asr_css_class(0.45) == "yellow" + + def test_low_rate_is_red(self) -> None: + from pyrit.analytics.visualization import _asr_css_class + + assert _asr_css_class(0.1) == "red" + + def test_none_rate_is_empty(self) -> None: + from pyrit.analytics.visualization import _asr_css_class + + assert _asr_css_class(None) == ""