diff --git a/CHANGELOG.md b/CHANGELOG.md index 12d6be7f..aca603e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ how a consumer would use the library or CLI tool (e.g. adding unit tests, updating documentation, etc) are not captured here. -## Unreleased +## 2.12.0 - 2026-05-04 ### Added - Added the `sdk.file-events.v2.search_groups` method to get approximate aggregate file event counts by a given grouping term. @@ -19,6 +19,8 @@ - Added the `is_high_value` option to trusted activity methods in the SDK, and the `--high-value` option to trusted activity methods in the CLI. - Added the ability to specify domain trust for browser destinations, allowing users to specify when users should be allowed to use certain destinations when logged-in using a trusted domain. - Added the ability to specify trust for file-transfer tools when adding a trusted domain. +- Added the `risk-indicator-categories` client to the SDK, allowing the listing of risk indicator categories, subcategories, and risk indicators. +- Added the `risk-indicator-categories` command to the CLI, allowing the listing of risk indicator categories. ## 2.11.0 - 2026-02-10 diff --git a/docs/cli/cmds/risk-indicator-categories.md b/docs/cli/cmds/risk-indicator-categories.md new file mode 100644 index 00000000..5edc1a76 --- /dev/null +++ b/docs/cli/cmds/risk-indicator-categories.md @@ -0,0 +1,6 @@ +# Risk Indicator Categories Commands + +::: mkdocs-click + :module: _incydr_cli.cmds.risk_indicator_categories + :command: risk_indicator_categories + :list_subcommands: diff --git a/docs/sdk/client.md b/docs/sdk/client.md index 0467b56a..dcf6b2b1 100644 --- a/docs/sdk/client.md +++ b/docs/sdk/client.md @@ -3,4 +3,4 @@ ::: incydr.Client :docstring: - :members: settings session request_history actors agents alerts alert_rules audit_log cases customer departments devices directory_groups file_events sessions trusted_activities users risk_profiles watchlists + :members: settings session request_history actors agents alerts alert_rules audit_log cases customer departments devices directory_groups file_events sessions trusted_activities users risk_profiles watchlists risk_indicator_categories diff --git a/docs/sdk/clients/risk_indicator_categories.md b/docs/sdk/clients/risk_indicator_categories.md new file mode 100644 index 00000000..8a2f3520 --- /dev/null +++ b/docs/sdk/clients/risk_indicator_categories.md @@ -0,0 +1,5 @@ +# Risk Indicator Categories + +::: _incydr_sdk.risk_indicator_categories.client.RiskIndicatorCategoriesV1 + :docstring: + :members: diff --git a/docs/sdk/models.md b/docs/sdk/models.md index dd4015fe..1f7f40a4 100644 --- a/docs/sdk/models.md +++ b/docs/sdk/models.md @@ -329,3 +329,26 @@ ExcludedUsersList is deprecated. Use ExcludedActorsList instead. ::: incydr.models.IncludedDirectoryGroup :docstring: + +## Risk Indicator Categories +--- + +### `RiskIndicator` model + +::: incydr.models.RiskIndicator + :docstring: + +### `RiskIndicatorSubcategory` model + +::: incydr.models.RiskIndicatorSubcategory + :docstring: + +### `RiskIndicatorCategory` model + +::: incydr.models.RiskIndicatorCategory + :docstring: + +### `RiskIndicatorCategoriesResponsePage` model + +::: incydr.models.RiskIndicatorCategoriesResponsePage + :docstring: diff --git a/mkdocs.yml b/mkdocs.yml index dff770d8..a27fff55 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -55,6 +55,7 @@ nav: - File Event Querying: 'sdk/clients/file_event_queries.md' - Legal Hold: 'sdk/clients/legal_hold.md' - Orgs: 'sdk/clients/orgs.md' + - Risk Indicator Categories: 'sdk/clients/risk_indicator_categories.md' - Sessions: 'sdk/clients/sessions.md' - Trusted Activites: 'sdk/clients/trusted_activities.md' - Users: 'sdk/clients/users.md' @@ -84,6 +85,7 @@ nav: - Files: 'cli/cmds/files.md' - Legal Hold: 'cli/cmds/legal_hold.md' - Orgs: 'cli/cmds/orgs.md' + - Risk Indicator Categories: 'cli/cmds/risk_indicator_categories.md' - Sessions: 'cli/cmds/sessions.md' - Trusted Activites: 'cli/cmds/trusted_activities.md' - Users: 'cli/cmds/users.md' diff --git a/src/_incydr_cli/cmds/risk_indicator_categories.py b/src/_incydr_cli/cmds/risk_indicator_categories.py new file mode 100644 index 00000000..e41496d5 --- /dev/null +++ b/src/_incydr_cli/cmds/risk_indicator_categories.py @@ -0,0 +1,103 @@ +import itertools +from typing import Iterator +from typing import Optional + +import click + +from _incydr_cli import console +from _incydr_cli import logging_options +from _incydr_cli import render +from _incydr_cli.cmds.options.output_options import columns_option +from _incydr_cli.cmds.options.output_options import table_format_option +from _incydr_cli.cmds.options.output_options import TableFormat +from _incydr_cli.core import IncydrCommand +from _incydr_cli.core import IncydrGroup +from _incydr_sdk.core.client import Client +from _incydr_sdk.risk_indicator_categories.models import RiskIndicator + + +@click.group(cls=IncydrGroup) +@logging_options +def risk_indicator_categories(): + """View and manage risk indicators.""" + + +@risk_indicator_categories.command("list", cls=IncydrCommand) +@table_format_option +@columns_option +@logging_options +def list_categories( + format_: Optional[TableFormat] = None, + columns: Optional[str] = None, +): + """ + List Risk Indicators by category and subcategory. + """ + client = Client() + categories = client.risk_indicator_categories.v1.list_categories().categories + + if format_ == TableFormat.table: + columns = columns or [ + "id", + "name", + "description", + "category_name", + "category_id", + "subcategory_name", + "subcategory_id", + "type", + ] + render.table( + RiskIndicatorTableEntry, + iter_risk_indicator_table_entries(categories), + columns=columns, + flat=False, + ) + elif format_ == TableFormat.csv: + render.csv( + RiskIndicatorTableEntry, + iter_risk_indicator_table_entries(categories), + columns=columns, + flat=True, + ) + else: + printed = False + for indicator in iter_risk_indicator_table_entries(categories): + printed = True + if format_ == TableFormat.json_pretty: + console.print_json(indicator.json()) + else: + click.echo(indicator.json()) + if not printed: + console.print("No results found.") + + +class RiskIndicatorTableEntry(RiskIndicator): + category_name: str + category_id: str + category_description: Optional[str] + subcategory_name: str + subcategory_id: str + subcategory_description: Optional[str] + type: str + + +def iter_risk_indicator_table_entries(categories) -> Iterator[RiskIndicatorTableEntry]: + for category in categories: + for subcategory in category.subcategories: + for indicator, indicator_type in itertools.chain( + ((i, "standard") for i in subcategory.standard_indicators), + ((i, "custom") for i in subcategory.custom_indicators), + ): + yield RiskIndicatorTableEntry( + id=indicator.id, + name=indicator.name, + description=indicator.description, + category_name=category.name, + category_id=category.id, + category_description=category.description, + subcategory_name=subcategory.name, + subcategory_id=subcategory.id, + subcategory_description=subcategory.description, + type=indicator_type, + ) diff --git a/src/_incydr_cli/main.py b/src/_incydr_cli/main.py index fbdf0fd0..f7b25a67 100644 --- a/src/_incydr_cli/main.py +++ b/src/_incydr_cli/main.py @@ -21,6 +21,7 @@ from _incydr_cli.cmds.files import files as files_client from _incydr_cli.cmds.legal_hold import legal_hold from _incydr_cli.cmds.orgs import orgs +from _incydr_cli.cmds.risk_indicator_categories import risk_indicator_categories from _incydr_cli.cmds.risk_profiles import risk_profiles from _incydr_cli.cmds.sessions import sessions from _incydr_cli.cmds.trusted_activities import trusted_activities @@ -87,6 +88,7 @@ def incydr(version, python, script_dir): incydr.add_command(files_client) incydr.add_command(cases) incydr.add_command(risk_profiles) +incydr.add_command(risk_indicator_categories) incydr.add_command(sessions) incydr.add_command(trusted_activities) incydr.add_command(users) diff --git a/src/_incydr_sdk/__version__.py b/src/_incydr_sdk/__version__.py index e3c379d8..482653ef 100644 --- a/src/_incydr_sdk/__version__.py +++ b/src/_incydr_sdk/__version__.py @@ -1,4 +1,4 @@ # SPDX-FileCopyrightText: 2022-present Code42 Software # # SPDX-License-Identifier: MIT -__version__ = "2.11.0" +__version__ = "2.12.0" diff --git a/src/_incydr_sdk/core/client.py b/src/_incydr_sdk/core/client.py index 487b9e71..0748c8b0 100644 --- a/src/_incydr_sdk/core/client.py +++ b/src/_incydr_sdk/core/client.py @@ -24,6 +24,7 @@ from _incydr_sdk.files.client import FilesClient from _incydr_sdk.legal_hold.client import LegalHoldClient from _incydr_sdk.orgs.client import OrgsClient +from _incydr_sdk.risk_indicator_categories.client import RiskIndicatorCategories from _incydr_sdk.risk_profiles.client import RiskProfiles from _incydr_sdk.sessions.client import SessionsClient from _incydr_sdk.trusted_activities.client import TrustedActivitiesClient @@ -114,6 +115,7 @@ def response_hook(response, *args, **kwargs): self._trusted_activities = TrustedActivitiesClient(self) self._users = UsersClient(self) self._risk_profiles = RiskProfiles(self) + self._risk_indicator_categories = RiskIndicatorCategories(self) self._watchlists = WatchlistsClient(self) if not skip_auth: @@ -366,6 +368,17 @@ def risk_profiles(self): """ return self._risk_profiles + @property + def risk_indicator_categories(self): + """ + Property returning a [`RiskIndicatorCategories`](../risk_indicator_categories) client for interacting + with `/v*/risk_indicator_categories` API endpoints. + + Usage: + >>> client.risk_indicator_categories.v1.list_categories(active=True) + """ + return self._risk_indicator_categories + @property def watchlists(self): """ diff --git a/src/_incydr_sdk/risk_indicator_categories/client.py b/src/_incydr_sdk/risk_indicator_categories/client.py new file mode 100644 index 00000000..722fede7 --- /dev/null +++ b/src/_incydr_sdk/risk_indicator_categories/client.py @@ -0,0 +1,84 @@ +from _incydr_sdk.enums import SortDirection +from _incydr_sdk.risk_indicator_categories.models import ( + RiskIndicatorCategoriesResponsePage, +) +from _incydr_sdk.risk_indicator_categories.models import RiskIndicatorCategory +from _incydr_sdk.risk_indicator_categories.models import RiskIndicatorSubcategory + + +class RiskIndicatorCategories: + def __init__(self, parent): + self._parent = parent + self._v1 = None + + @property + def v1(self): + if self._v1 is None: + self._v1 = RiskIndicatorCategoriesV1(self._parent) + return self._v1 + + +class RiskIndicatorCategoriesV1: + """ + Client for `/v1/risk-indicator-categories` endpoints. + + Usage example: + + >>> import incydr + >>> client = incydr.Client(**kwargs) + >>> client.risk_indicators.v1.list_categories() + """ + + def __init__(self, parent): + self._parent = parent + + def list_categories( + self, active: bool = None, sort_direction: SortDirection = None + ) -> RiskIndicatorCategoriesResponsePage: + """ + Returns all risk indicator categories, including their subcategories and associated risk indicators. + Filter results by passing the appropriate parameters: + + **Parameters**: + + * **active**: `bool` - When provided, returns only those risk indicators which match the provided value (true or false). When not provided, returns both. + * **sort_direction**: `SortDirection` - The order in which to sort the returned list. + + **Returns**: A [`RiskIndicatorCategoriesResponsePage`][riskindicatorcategoriesresponsepage-model] object. + """ + response = self._parent.session.get( + "/v1/risk-indicator-categories", + params={"isActive": active, "sort_direction": sort_direction}, + ) + return RiskIndicatorCategoriesResponsePage.parse_response(response) + + def get_category(self, id: str) -> RiskIndicatorCategory: + """ + Returns a single risk indicator category, including its subcategories and associated risk indicators. + + **Parameters**: + + * **id**: `str` - The unique ID of the category you wish to retrieve. + + **Returns**: A [`RiskIndicatorCategory`][riskindicatorcategory-model] object. + """ + response = self._parent.session.get(f"/v1/risk-indicator-categories/{id}") + return RiskIndicatorCategory.parse_response(response) + + def get_subcategory( + self, category_id: str, subcategory_id: str + ) -> RiskIndicatorSubcategory: + """ + Returns a single risk indicator category, including its subcategories and associated risk indicators. + + **Parameters**: + + * **category_id**: `str` - The unique ID of the category in which the subcategory lives. + * **subcategory_id**: `str` - The unique ID of the subcategory you wish to retrieve. + + **Returns**: A [`RiskIndicatorSubcategory`][riskindicatorsubcategory-model] object. + """ + response = self._parent.session.get( + f"/v1/risk-indicator-categories/{category_id}/subcategories/{subcategory_id}" + ) + return RiskIndicatorSubcategory.parse_response(response) diff --git a/src/_incydr_sdk/risk_indicator_categories/models.py b/src/_incydr_sdk/risk_indicator_categories/models.py new file mode 100644 index 00000000..379fb2b0 --- /dev/null +++ b/src/_incydr_sdk/risk_indicator_categories/models.py @@ -0,0 +1,75 @@ +from typing import List +from typing import Optional + +from pydantic import Field + +from _incydr_sdk.core.models import Model +from _incydr_sdk.core.models import ResponseModel + + +class RiskIndicator(Model): + """ + A model representing a Risk Indicator. + + **Fields**: + + * **id**: `str` - The unique ID of the indicator. + * **name**: `str` - The name of the indicator. + * **description**: `Optional[str]` - The description of the indicator. + """ + + id: str + name: str + description: Optional[str] = None + + +class RiskIndicatorSubcategory(ResponseModel): + """ + A model representing a Risk Indicator Subcategory. + + **Fields**: + + * **id**: `str` - The unique ID of the subcategory. + * **name**: `str` - The name of the subcategory. + * **description**: `Optional[str]` - The description of the subcategory. + * **standard_indicators**: `List[RiskIndicator]` - A list of standard risk indicators. + * **custom_indicators**: `List[RiskIndicator]` - A list of custom risk indicators. + """ + + id: str + name: str + description: Optional[str] = None + standard_indicators: List[RiskIndicator] = Field([], alias="standardIndicators") + custom_indicators: List[RiskIndicator] = Field([], alias="customIndicators") + + +class RiskIndicatorCategory(ResponseModel): + """ + A model representing a Risk Indicator Category. + + **Fields**: + + * **id**: `str` - The unique ID of the category. + * **name**: `str` - The name of the category. + * **description**: `Optional[str]` - The description of the category. + * **subcategories**: `List[RiskIndicatorSubcategory]` - A list of Risk Indicator Subcategories + """ + + id: str + name: str + description: Optional[str] = None + subcategories: List[RiskIndicatorSubcategory] + + +class RiskIndicatorCategoriesResponsePage(ResponseModel): + """ + A model representing a page of Risk Indicator Categories. + + **Fields**: + + * **categories**: `Optional[List[RiskIndicatorCategory]]` - A list of Risk Indicator Categories. + """ + + categories: Optional[List[RiskIndicatorCategory]] = Field( + None, description="A list of Risk Indicator Categories." + ) diff --git a/src/incydr/models.py b/src/incydr/models.py index cf7ab167..6f22228b 100644 --- a/src/incydr/models.py +++ b/src/incydr/models.py @@ -25,6 +25,12 @@ from _incydr_sdk.file_events.models.response import FileEventsPage from _incydr_sdk.file_events.models.response import GroupedFileEventResponse from _incydr_sdk.file_events.models.response import SavedSearch +from _incydr_sdk.risk_indicator_categories.models import RiskIndicator +from _incydr_sdk.risk_indicator_categories.models import ( + RiskIndicatorCategoriesResponsePage, +) +from _incydr_sdk.risk_indicator_categories.models import RiskIndicatorCategory +from _incydr_sdk.risk_indicator_categories.models import RiskIndicatorSubcategory from _incydr_sdk.risk_profiles.models import RiskProfile from _incydr_sdk.risk_profiles.models import RiskProfilesPage from _incydr_sdk.sessions.models.response import Session @@ -107,6 +113,10 @@ "AuditEventsPage", "RiskProfilesPage", "RiskProfile", + "RiskIndicator", + "RiskIndicatorSubcategory", + "RiskIndicatorCategory", + "RiskIndicatorCategoriesResponsePage", ] diff --git a/tests/test_risk_indicator_categories.py b/tests/test_risk_indicator_categories.py new file mode 100644 index 00000000..c933a516 --- /dev/null +++ b/tests/test_risk_indicator_categories.py @@ -0,0 +1,190 @@ +import json +from csv import DictReader +from io import StringIO +from urllib.parse import urlencode + +import pytest +from pytest_httpserver import HTTPServer + +from _incydr_cli import render as render_module +from _incydr_cli.cmds import risk_indicator_categories as risk_indicator_categories_cmd +from _incydr_cli.main import incydr +from _incydr_sdk.enums import SortDirection +from _incydr_sdk.risk_indicator_categories.models import ( + RiskIndicatorCategoriesResponsePage, +) +from _incydr_sdk.risk_indicator_categories.models import RiskIndicatorCategory +from _incydr_sdk.risk_indicator_categories.models import RiskIndicatorSubcategory +from incydr import Client + +TEST_CATEGORY_ID = "cat-1" +TEST_SUBCATEGORY_ID = "sub-1" + +TEST_INDICATOR_STANDARD = { + "id": "ind-std-1", + "name": "Standard indicator", + "description": "Standard description", +} + +TEST_INDICATOR_CUSTOM = { + "id": "ind-cust-1", + "name": "Custom indicator", + "description": None, +} + +TEST_SUBCATEGORY_PAYLOAD = { + "id": TEST_SUBCATEGORY_ID, + "name": "Cloud storage", + "description": "Cloud-related indicators", + "standardIndicators": [TEST_INDICATOR_STANDARD], + "customIndicators": [TEST_INDICATOR_CUSTOM], +} + +TEST_CATEGORY_PAYLOAD = { + "id": TEST_CATEGORY_ID, + "name": "Data exfiltration", + "description": "Category description", + "subcategories": [TEST_SUBCATEGORY_PAYLOAD], +} + +TEST_LIST_RESPONSE = {"categories": [TEST_CATEGORY_PAYLOAD]} + + +def test_list_categories_when_default_params_returns_expected_data( + httpserver_auth: HTTPServer, +): + httpserver_auth.expect_request("/v1/risk-indicator-categories").respond_with_json( + TEST_LIST_RESPONSE + ) + + client = Client() + page = client.risk_indicator_categories.v1.list_categories() + assert isinstance(page, RiskIndicatorCategoriesResponsePage) + assert len(page.categories) == 1 + cat = page.categories[0] + assert isinstance(cat, RiskIndicatorCategory) + assert cat.json() == json.dumps(TEST_CATEGORY_PAYLOAD, separators=(",", ":")) + + +def test_list_categories_when_active_and_sort_returns_expected_data( + httpserver_auth: HTTPServer, +): + query = urlencode({"isActive": True, "sort_direction": SortDirection.ASC.value}) + httpserver_auth.expect_request( + "/v1/risk-indicator-categories", query_string=query + ).respond_with_json(TEST_LIST_RESPONSE) + + client = Client() + page = client.risk_indicator_categories.v1.list_categories( + active=True, sort_direction=SortDirection.ASC + ) + assert isinstance(page, RiskIndicatorCategoriesResponsePage) + assert page.categories[0].id == TEST_CATEGORY_ID + + +def test_get_category_returns_expected_data(httpserver_auth: HTTPServer): + httpserver_auth.expect_request( + f"/v1/risk-indicator-categories/{TEST_CATEGORY_ID}" + ).respond_with_json(TEST_CATEGORY_PAYLOAD) + + client = Client() + category = client.risk_indicator_categories.v1.get_category(TEST_CATEGORY_ID) + assert isinstance(category, RiskIndicatorCategory) + assert category.json() == json.dumps(TEST_CATEGORY_PAYLOAD, separators=(",", ":")) + + +def test_get_subcategory_returns_expected_data(httpserver_auth: HTTPServer): + httpserver_auth.expect_request( + f"/v1/risk-indicator-categories/{TEST_CATEGORY_ID}/subcategories/{TEST_SUBCATEGORY_ID}" + ).respond_with_json(TEST_SUBCATEGORY_PAYLOAD) + + client = Client() + sub = client.risk_indicator_categories.v1.get_subcategory( + TEST_CATEGORY_ID, TEST_SUBCATEGORY_ID + ) + assert isinstance(sub, RiskIndicatorSubcategory) + assert sub.json() == json.dumps(TEST_SUBCATEGORY_PAYLOAD, separators=(",", ":")) + + +# ************************************************ CLI ************************************************ + + +def test_cli_list_when_default_params_makes_expected_call( + httpserver_auth: HTTPServer, runner +): + httpserver_auth.expect_request( + "/v1/risk-indicator-categories", method="GET" + ).respond_with_json(TEST_LIST_RESPONSE) + + result = runner.invoke(incydr, ["risk-indicator-categories", "list"]) + httpserver_auth.check() + assert result.exit_code == 0 + assert "Data exfiltration" in result.output + assert "Cloud storage" in result.output + assert "Standard indicator" in result.output + assert "Custom indicator" in result.output + + +def test_cli_list_json_lines_outputs_one_record_per_indicator( + httpserver_auth: HTTPServer, runner +): + httpserver_auth.expect_request( + "/v1/risk-indicator-categories", method="GET" + ).respond_with_json(TEST_LIST_RESPONSE) + + result = runner.invoke( + incydr, ["risk-indicator-categories", "list", "-f", "json-lines"] + ) + httpserver_auth.check() + assert result.exit_code == 0 + lines = [ln for ln in result.output.strip().splitlines() if ln.strip()] + assert len(lines) == 2 + rows = [json.loads(ln) for ln in lines] + by_id = {r["id"]: r for r in rows} + assert by_id["ind-std-1"]["type"] == "standard" + assert by_id["ind-std-1"]["category_name"] == "Data exfiltration" + assert by_id["ind-std-1"]["subcategory_name"] == "Cloud storage" + assert by_id["ind-cust-1"]["type"] == "custom" + assert by_id["ind-cust-1"]["description"] is None + + +def test_cli_list_csv_outputs_header_and_indicator_rows( + httpserver_auth: HTTPServer, runner, monkeypatch +): + """CSV uses render.csv default file= bound at import time; patch so output is readable.""" + buf = StringIO() + real_csv = render_module.csv + + def csv_to_buffer(model, models, columns=None, flat=False, file=None): + return real_csv(model, models, columns=columns, flat=flat, file=buf) + + monkeypatch.setattr(risk_indicator_categories_cmd.render, "csv", csv_to_buffer) + + httpserver_auth.expect_request( + "/v1/risk-indicator-categories", method="GET" + ).respond_with_json(TEST_LIST_RESPONSE) + + result = runner.invoke(incydr, ["risk-indicator-categories", "list", "-f", "csv"]) + httpserver_auth.check() + assert result.exit_code == 0 + reader = DictReader(StringIO(buf.getvalue())) + rows = list(reader) + assert len(rows) == 2 + by_id = {r["id"]: r for r in rows} + assert by_id["ind-std-1"]["type"] == "standard" + assert by_id["ind-cust-1"]["type"] == "custom" + assert by_id["ind-std-1"]["category_name"] == "Data exfiltration" + + +@pytest.mark.parametrize("format_", ["table", "csv", "json-pretty", "json-lines"]) +def test_cli_list_when_empty_returns_no_results( + httpserver_auth: HTTPServer, runner, format_ +): + httpserver_auth.expect_request( + "/v1/risk-indicator-categories", method="GET" + ).respond_with_json({"categories": []}) + + result = runner.invoke(incydr, ["risk-indicator-categories", "list", "-f", format_]) + httpserver_auth.check() + assert result.exit_code == 0 + assert "No results found" in result.output