diff --git a/apps/discord_bot/src/five08/discord_bot/cogs/invoices.py b/apps/discord_bot/src/five08/discord_bot/cogs/invoices.py new file mode 100644 index 00000000..5216da71 --- /dev/null +++ b/apps/discord_bot/src/five08/discord_bot/cogs/invoices.py @@ -0,0 +1,262 @@ +"""Invoice validation cog for the 508.dev Discord bot.""" + +import asyncio +import logging +from typing import Any + +import discord +from discord import app_commands +from discord.ext import commands + +from five08.clients.erpnext import ERPNextClient, ERPNextAPIError +from five08.erpnext_validation import validate_invoice +from five08.projects import ( + list_dashboard_projects, + project_viewer_emails_for_discord, +) +from five08.discord_bot.config import settings +from five08.discord_bot.utils.role_decorators import check_user_roles_with_hierarchy + +logger = logging.getLogger(__name__) + +DOCTYPE_CHOICES = [ + app_commands.Choice(name="Sales Invoice", value="Sales Invoice"), + app_commands.Choice(name="Purchase Invoice", value="Purchase Invoice"), +] + +STATUS_LABEL = {0: "Draft", 1: "Submitted", 2: "Cancelled"} + +# Invoice access rules — a caller may validate an invoice if any of these hold: +# 1. They have Steering Committee role or above (full access). +# 2. They created the invoice (invoice owner matches one of their ERP emails). +# 3. They are on the invoice's ERP project roster. +_PRIVILEGED_ROLES = ["Steering Committee"] + + +def _can_view_invoice( + invoice: dict[str, Any], + include_all: bool, + emails: list[str], + project_ids: list[str], +) -> bool: + """Return whether the caller is authorized to see this specific invoice.""" + if include_all: + return True + owner = str(invoice.get("owner") or "").strip().casefold() + if owner and owner in {email.casefold() for email in emails}: + return True + project = invoice.get("project") + return bool(project and project in project_ids) + + +class InvoicesCog(commands.Cog, name="Invoices"): + """Cog for ERPNext invoice validation.""" + + def __init__(self, bot: commands.Bot) -> None: + self.bot = bot + base_url = (settings.erpnext_base_url or "").strip() + api_key = (settings.erpnext_api_key or "").strip() + self.client = ERPNextClient( + base_url=base_url, + api_key=api_key, + timeout_seconds=settings.erpnext_api_timeout_seconds, + ) + logger.info("Invoices cog initialized") + + def _resolve_access( + self, interaction: discord.Interaction + ) -> tuple[bool, list[str], list[str]]: + """Resolve the caller's invoice access. Run inside a worker thread. + + Returns (include_all, emails, project_ids). A non-privileged caller + with no ERP identity returns (False, [], []). + """ + roles = getattr(interaction.user, "roles", []) + if check_user_roles_with_hierarchy(roles, _PRIVILEGED_ROLES): + return True, [], [] + + emails = project_viewer_emails_for_discord(settings, str(interaction.user.id)) + if not emails: + return False, [], [] + + projects = list_dashboard_projects( + settings, + viewer_emails=emails, + include_all=False, + limit=500, + include_roster=False, + ) + project_ids = [ + str(pid) + for project in projects + if (pid := project.get("erpnext_project_id")) + ] + return False, emails, project_ids + + @app_commands.command( + name="validate-invoice", + description="Check an ERPNext invoice for common validation errors", + ) + @app_commands.describe( + doctype="Invoice type to validate", + invoice_name="Invoice number (autocomplete). Changed doctype? Retype here to refresh options.", + ) + @app_commands.choices(doctype=DOCTYPE_CHOICES) + async def validate_invoice_command( + self, + interaction: discord.Interaction, + doctype: app_commands.Choice[str], + invoice_name: str, + ) -> None: + await interaction.response.defer(ephemeral=True) + + try: + include_all, emails, project_ids = await asyncio.to_thread( + self._resolve_access, interaction + ) + if not include_all and not emails: + await interaction.followup.send( + "Invoice validation is available to Steering Committee members " + "or confirmed ERP project members.", + ephemeral=True, + ) + return + + invoice = await asyncio.to_thread( + self.client.get_invoice, doctype.value, invoice_name + ) + + if invoice is None or not _can_view_invoice( + invoice, include_all, emails, project_ids + ): + await interaction.followup.send( + f"{doctype.value} `{invoice_name}` not found or you don't have access to it.", + ephemeral=True, + ) + return + + result = validate_invoice(invoice, doctype.value) + + docstatus = invoice.get("docstatus", 0) + status_label = STATUS_LABEL.get(docstatus, str(docstatus)) + summary_lines = [ + f"**Status:** {status_label}", + f"**Owner:** {invoice.get('owner') or '—'}", + f"**Project:** {invoice.get('project') or '—'}", + f"**Cost Center:** {invoice.get('cost_center') or '—'}", + f"**Posting Date:** {invoice.get('posting_date') or '—'}", + f"**Due Date:** {invoice.get('due_date') or '—'}", + ] + + if result.passed: + embed = discord.Embed( + title=f"✅ {invoice_name} — No issues found", + color=discord.Color.green(), + ) + else: + embed = discord.Embed( + title=f"⚠️ {invoice_name} — {len(result.issues)} issue(s) found", + color=discord.Color.red(), + ) + + embed.add_field( + name="Invoice Info", + value="\n".join(summary_lines), + inline=False, + ) + + if not result.passed: + issue_lines = "\n\n".join( + f"❌ {issue.message}" for issue in result.issues + ) + if len(issue_lines) > 1024: + cutoff = issue_lines.rfind("\n\n", 0, 984) + if cutoff == -1: + cutoff = 984 + shown = issue_lines[:cutoff].count("❌") + remaining = len(result.issues) - shown + issue_lines = ( + issue_lines[:cutoff] + + f"\n\n_…{remaining} more issue(s) hidden_" + ) + embed.add_field( + name="Issues", + value=issue_lines, + inline=False, + ) + + embed.set_footer(text=f"Type: {doctype.value}") + await interaction.followup.send(embed=embed, ephemeral=True) + + except ERPNextAPIError as e: + logger.exception("ERPNext API error in validate_invoice") + await interaction.followup.send( + f"Failed to fetch invoice from ERPNext: {e}", ephemeral=True + ) + except Exception: + logger.exception("Unexpected error in validate_invoice") + await interaction.followup.send( + "An unexpected error occurred.", ephemeral=True + ) + + @validate_invoice_command.autocomplete("invoice_name") + async def invoice_name_autocomplete( + self, + interaction: discord.Interaction, + current: str, + ) -> list[app_commands.Choice[str]]: + doctype_value = interaction.namespace.doctype + if not doctype_value: + return [] + + try: + include_all, emails, project_ids = await asyncio.to_thread( + self._resolve_access, interaction + ) + if not include_all and not emails: + return [] + + invoices = await asyncio.to_thread( + self.client.search_invoices, + doctype=doctype_value, + query=current, + limit=25, + owners=None if include_all else emails, + projects=None if include_all else project_ids, + ) + choices = [] + for inv in invoices[:25]: + inv_name = inv.get("name") + # Ensure inv_name is a valid string and within Discord's 100-char limit + if not isinstance(inv_name, str) or len(inv_name) > 100: + continue # Skip invalid or overlong IDs to prevent lookup/API failures + + try: + status_int = int(inv.get("docstatus", 0)) + except (TypeError, ValueError): + status_int = -1 + + label = f"[{STATUS_LABEL.get(status_int, '?')}] {inv_name} · {inv.get('owner', '')} · {inv.get('posting_date', '')}" + + # If label exceeds 100 chars, truncate gracefully with trailing dots + display_name = label if len(label) <= 100 else f"{label[:97]}..." + + choices.append( + app_commands.Choice( + name=display_name, + value=inv_name, + ) + ) + return choices + except Exception as e: + logger.warning("ERPNext autocomplete error for %r: %s", doctype_value, e) + return [] + + +async def setup(bot: commands.Bot) -> None: + if not all([settings.erpnext_base_url, settings.erpnext_api_key]): + logger.warning( + "ERPNext cog not loaded: missing ERPNEXT_BASE_URL or ERPNEXT_API_KEY" + ) + return + await bot.add_cog(InvoicesCog(bot)) diff --git a/packages/shared/src/five08/clients/erpnext.py b/packages/shared/src/five08/clients/erpnext.py index 02cc8b55..d872070e 100644 --- a/packages/shared/src/five08/clients/erpnext.py +++ b/packages/shared/src/five08/clients/erpnext.py @@ -666,6 +666,76 @@ def search_suppliers(self, query: str, *, limit: int = 10) -> list[dict[str, Any limit=limit, ) + def get_invoice(self, doctype: str, name: str) -> dict[str, Any] | None: + """Fetch a single Sales Invoice or Purchase Invoice by name. Returns None on 404.""" + doctype = doctype.strip() + name = name.strip() + if not doctype: + raise ERPNextAPIError("DocType is required") + if not name: + raise ERPNextAPIError("Invoice name is required") + try: + data = self.request( + "GET", + f"/api/resource/{quote(doctype, safe='')}/{quote(name, safe='')}", + ) + except ERPNextAPIError as exc: + if exc.status_code == 404: + return None + raise + row = data.get("data") + if not isinstance(row, dict): + raise ERPNextAPIError( + f"ERPNext returned unexpected payload for {doctype} {name!r}: {type(row).__name__}" + ) + return row + + def search_invoices( + self, + doctype: str, + query: str = "", + docstatus: int | None = None, + limit: int = 10, + owners: list[str] | None = None, + projects: list[str] | None = None, + ) -> list[dict[str, Any]]: + """Search invoices for autocomplete, ordered newest first. + + When owners or projects are given, results are scoped to invoices + created by those owners OR belonging to those projects. + """ + filters: list[Any] = [] + if query: + filters.append([doctype, "name", "like", f"%{query}%"]) + if docstatus is not None: + filters.append([doctype, "docstatus", "=", docstatus]) + + or_filters: list[Any] = [] + if owners: + or_filters.append([doctype, "owner", "in", owners]) + if projects: + or_filters.append([doctype, "project", "in", projects]) + + params: dict[str, Any] = { + "fields": json.dumps(["name", "posting_date", "docstatus", "owner"]), + "order_by": "posting_date desc", + "limit_page_length": max(1, limit), + } + if filters: + params["filters"] = json.dumps(filters) + if or_filters: + params["or_filters"] = json.dumps(or_filters) + + data = self.request( + "GET", + f"/api/resource/{quote(doctype, safe='')}", + params=params, + ) + rows = data.get("data") + if not isinstance(rows, list): + return [] + return [r for r in rows if isinstance(r, dict) and r.get("name")] + def get_project(self, project_id: str) -> dict[str, Any]: """Read one ERPNext Project detail document.""" normalized_id = project_id.strip() diff --git a/packages/shared/src/five08/erpnext_validation.py b/packages/shared/src/five08/erpnext_validation.py new file mode 100644 index 00000000..60911c32 --- /dev/null +++ b/packages/shared/src/five08/erpnext_validation.py @@ -0,0 +1,144 @@ +"""Invoice validation rules for ERPNext invoices.""" + +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Any + + +PROJECTS_COST_CENTER_PREFIX = "Projects" +VALID_DOCTYPES = {"Sales Invoice", "Purchase Invoice"} +# Per community billing guidelines (see wiki); update here if the policy changes. +MIN_DUE_DATE_DAYS = 29 + + +@dataclass +class ValidationIssue: + field: str + message: str + + +@dataclass +class ValidationResult: + invoice_name: str + issues: list[ValidationIssue] = field(default_factory=list) + + @property + def passed(self) -> bool: + return len(self.issues) == 0 + + +def validate_invoice(invoice: dict[str, Any], doctype: str) -> ValidationResult: + """Run all validation rules against an invoice.""" + if doctype not in VALID_DOCTYPES: + raise ValueError(f"Unsupported doctype: {doctype!r}") + result = ValidationResult(invoice_name=invoice.get("name", "Unknown")) + + _check_cost_center(invoice, result) + _check_line_item_cost_center_and_project(invoice, result) + + if doctype == "Purchase Invoice": + _check_due_date(invoice, result) + + return result + + +def _check_cost_center(invoice: dict[str, Any], result: ValidationResult) -> None: + """If Project is set, Cost Center must start with 'Projects'.""" + project = invoice.get("project") + cost_center = (invoice.get("cost_center") or "").strip() + + if not project: + return + + if not cost_center: + result.issues.append( + ValidationIssue( + field="cost_center", + message=( + f"**Cost Center not set**\n" + f"Invoice has Project `{project}` but Cost Center is empty.\n" + f"→ Set **Cost Center** to your Projects cost center (e.g. `Projects - 5`)" + ), + ) + ) + elif not cost_center.startswith(PROJECTS_COST_CENTER_PREFIX): + result.issues.append( + ValidationIssue( + field="cost_center", + message=( + f"**Cost Center mismatch**\n" + f"`{cost_center}` should be `Projects` when a Project is selected.\n" + f"→ Change **Cost Center** to your Projects cost center (e.g. `Projects - 5`)" + ), + ) + ) + + +def _check_line_item_cost_center_and_project( + invoice: dict[str, Any], result: ValidationResult +) -> None: + """Each line item's Cost Center and Project must match the invoice level.""" + invoice_project = invoice.get("project") + invoice_cost_center = (invoice.get("cost_center") or "").strip() + + if not invoice_cost_center and not invoice_project: + return + + for idx, item in enumerate(invoice.get("items") or [], start=1): + item_cost_center = (item.get("cost_center") or "").strip() + item_project = item.get("project") + + if invoice_cost_center and item_cost_center != invoice_cost_center: + result.issues.append( + ValidationIssue( + field=f"items[{idx}].cost_center", + message=( + f"**Line item #{idx} — Cost Center mismatch**\n" + f"Item has `{item_cost_center or '(empty)'}` but invoice is `{invoice_cost_center}`.\n" + f"→ In the Items table, set line #{idx} **Cost Center** to `{invoice_cost_center}`" + ), + ) + ) + + if invoice_project and item_project != invoice_project: + result.issues.append( + ValidationIssue( + field=f"items[{idx}].project", + message=( + f"**Line item #{idx} — Project mismatch**\n" + f"Item has `{item_project or '(empty)'}` but invoice is `{invoice_project}`.\n" + f"→ In the Items table, set line #{idx} **Project** to `{invoice_project}`" + ), + ) + ) + + +def _check_due_date(invoice: dict[str, Any], result: ValidationResult) -> None: + """Flag Purchase Invoices whose due date is too soon after the posting date.""" + posting_date_raw = invoice.get("posting_date") + due_date_raw = invoice.get("due_date") + + if not posting_date_raw or not due_date_raw: + return + + try: + posting_date = datetime.strptime(posting_date_raw, "%Y-%m-%d").date() + due_date = datetime.strptime(due_date_raw, "%Y-%m-%d").date() + except (ValueError, TypeError): + return + + delta = (due_date - posting_date).days + if delta < MIN_DUE_DATE_DAYS: + earliest = (posting_date + timedelta(days=MIN_DUE_DATE_DAYS)).strftime( + "%Y-%m-%d" + ) + result.issues.append( + ValidationIssue( + field="due_date", + message=( + f"**Due date too early**\n" + f"`{due_date_raw}` is only {delta} day(s) after posting date `{posting_date_raw}` (min {MIN_DUE_DATE_DAYS} days).\n" + f"→ Change **Due Date** to at least `{earliest}`" + ), + ) + ) diff --git a/packages/shared/src/five08/projects.py b/packages/shared/src/five08/projects.py index e67400d4..ad7c84e3 100644 --- a/packages/shared/src/five08/projects.py +++ b/packages/shared/src/five08/projects.py @@ -344,8 +344,13 @@ def list_dashboard_projects( viewer_emails: list[str] | None = None, include_all: bool = False, limit: int = 100, + include_roster: bool = True, ) -> list[dict[str, Any]]: - """Return project cache rows shaped for the operations dashboard.""" + """Return project cache rows shaped for the operations dashboard. + + Set include_roster=False to skip the roster-members query — useful for + lightweight access checks where only project IDs are needed. + """ where = [] params: list[Any] = [] normalized_query = (query or "").strip() @@ -444,7 +449,7 @@ def list_dashboard_projects( members_by_project: dict[str, list[dict[str, Any]]] = { project_id: [] for project_id in project_ids } - if project_ids: + if include_roster and project_ids: cursor.execute( """ SELECT @@ -527,7 +532,9 @@ def list_dashboard_projects( "customer", shaped.get("customer"), ) - shaped["roster_members"] = members_by_project.get(project_id, []) + shaped["roster_members"] = ( + members_by_project.get(project_id, []) if include_roster else [] + ) shaped["roster_count"] = len(shaped["roster_members"]) result.append(shaped) return result diff --git a/tests/unit/test_erpnext_client.py b/tests/unit/test_erpnext_client.py index 20b0bf72..5e0daa39 100644 --- a/tests/unit/test_erpnext_client.py +++ b/tests/unit/test_erpnext_client.py @@ -688,3 +688,144 @@ def update_project( client.remove_project_user("PROJ-001", "row-2") assert client.updated is False + + +# --------------------------------------------------------------------------- +# ERPNextClient invoice method tests +# --------------------------------------------------------------------------- + + +VALID_SALES_INVOICE: dict[str, Any] = { + "name": "TEST-SINV-0001", + "docstatus": 0, + "project": "TEST-PROJ-001", + "cost_center": "Projects - TEST", + "posting_date": "2026-01-01", + "due_date": "2026-02-01", + "items": [ + { + "idx": 1, + "project": "TEST-PROJ-001", + "cost_center": "Projects - TEST", + } + ], +} + + +def test_get_invoice_returns_invoice_dict() -> None: + client = FakeERPNextClient({"data": VALID_SALES_INVOICE}) + result = client.get_invoice("Sales Invoice", "TEST-SINV-0001") + assert result is not None + assert result["name"] == "TEST-SINV-0001" + + +def test_get_invoice_raises_on_unexpected_payload() -> None: + client = FakeERPNextClient({"data": []}) + with pytest.raises(ERPNextAPIError, match="unexpected payload"): + client.get_invoice("Sales Invoice", "TEST-SINV-0001") + + +def test_search_invoices_returns_list() -> None: + client = FakeERPNextClient({"data": [{"name": "TEST-SINV-0001"}]}) + assert client.search_invoices("Sales Invoice") == [{"name": "TEST-SINV-0001"}] + + +def test_search_invoices_returns_empty_list_when_data_missing() -> None: + client = FakeERPNextClient({}) + assert client.search_invoices("Sales Invoice") == [] + + +def test_search_invoices_includes_query_filter_when_given() -> None: + captured: dict[str, Any] = {} + + class CaptureClient(FakeERPNextClient): + def request( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + payload: dict[str, Any] | None = None, + ) -> dict[str, Any]: + captured["params"] = params + return {"data": []} + + CaptureClient({}).search_invoices("Sales Invoice", query="TEST-SINV") + + filters = json.loads(captured["params"]["filters"]) + assert ["Sales Invoice", "name", "like", "%TEST-SINV%"] in filters + + +def test_search_invoices_fetches_expected_fields() -> None: + captured: dict[str, Any] = {} + + class CaptureClient(FakeERPNextClient): + def request( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + payload: dict[str, Any] | None = None, + ) -> dict[str, Any]: + captured["params"] = params + return {"data": []} + + CaptureClient({}).search_invoices("Sales Invoice") + + fields = json.loads(captured["params"]["fields"]) + assert "name" in fields + assert "docstatus" in fields + assert "posting_date" in fields + assert "owner" in fields + + +def test_search_invoices_omits_or_filters_when_unscoped() -> None: + captured: dict[str, Any] = {} + + class CaptureClient(FakeERPNextClient): + def request( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + payload: dict[str, Any] | None = None, + ) -> dict[str, Any]: + captured["params"] = params + return {"data": []} + + CaptureClient({}).search_invoices("Sales Invoice") + + assert "or_filters" not in captured["params"] + + +def test_search_invoices_scopes_to_owners_and_projects() -> None: + captured: dict[str, Any] = {} + + class CaptureClient(FakeERPNextClient): + def request( + self, + method: str, + path: str, + *, + params: dict[str, Any] | None = None, + payload: dict[str, Any] | None = None, + ) -> dict[str, Any]: + captured["params"] = params + return {"data": []} + + CaptureClient({}).search_invoices( + "Sales Invoice", + owners=["member@example.com"], + projects=["TEST-PROJ-001", "TEST-PROJ-002"], + ) + + or_filters = json.loads(captured["params"]["or_filters"]) + assert ["Sales Invoice", "owner", "in", ["member@example.com"]] in or_filters + assert [ + "Sales Invoice", + "project", + "in", + ["TEST-PROJ-001", "TEST-PROJ-002"], + ] in or_filters diff --git a/tests/unit/test_erpnext_validation.py b/tests/unit/test_erpnext_validation.py new file mode 100644 index 00000000..d8bd7516 --- /dev/null +++ b/tests/unit/test_erpnext_validation.py @@ -0,0 +1,147 @@ +from __future__ import annotations + +from typing import Any + +from five08.erpnext_validation import validate_invoice + + +VALID_SALES_INVOICE: dict[str, Any] = { + "name": "TEST-SINV-0001", + "docstatus": 0, + "project": "TEST-PROJ-001", + "cost_center": "Projects - TEST", + "posting_date": "2026-01-01", + "due_date": "2026-02-01", + "items": [ + { + "idx": 1, + "project": "TEST-PROJ-001", + "cost_center": "Projects - TEST", + } + ], +} + +VALID_PURCHASE_INVOICE: dict[str, Any] = { + "name": "TEST-PINV-0001", + "docstatus": 0, + "project": "TEST-PROJ-001", + "cost_center": "Projects - TEST", + "posting_date": "2026-01-01", + "due_date": "2026-02-01", + "items": [ + { + "idx": 1, + "project": "TEST-PROJ-001", + "cost_center": "Projects - TEST", + } + ], +} + + +def test_valid_sales_invoice_passes() -> None: + assert validate_invoice(VALID_SALES_INVOICE, "Sales Invoice").passed + + +def test_valid_purchase_invoice_passes() -> None: + assert validate_invoice(VALID_PURCHASE_INVOICE, "Purchase Invoice").passed + + +def test_cost_center_mismatch_flagged() -> None: + invoice = {**VALID_SALES_INVOICE, "cost_center": "Main - TEST"} + result = validate_invoice(invoice, "Sales Invoice") + assert not result.passed + assert any("Cost Center" in i.message for i in result.issues) + + +def test_null_items_does_not_crash() -> None: + invoice = {**VALID_SALES_INVOICE, "items": None} + assert validate_invoice(invoice, "Sales Invoice").passed + + +def test_no_project_skips_cost_center_check() -> None: + invoice = { + **VALID_SALES_INVOICE, + "project": None, + "cost_center": "Main - TEST", + "items": [{"idx": 1, "project": None, "cost_center": "Main - TEST"}], + } + assert validate_invoice(invoice, "Sales Invoice").passed + + +def test_line_item_project_mismatch_flagged_even_when_invoice_cost_center_empty() -> ( + None +): + invoice = { + **VALID_SALES_INVOICE, + "cost_center": "", + "items": [{"idx": 1, "project": "TEST-PROJ-999", "cost_center": ""}], + } + result = validate_invoice(invoice, "Sales Invoice") + assert not result.passed + assert any( + "Line item #1" in i.message and "Project" in i.message for i in result.issues + ) + + +def test_line_item_cost_center_mismatch_flagged() -> None: + invoice = { + **VALID_SALES_INVOICE, + "items": [{"idx": 1, "project": "TEST-PROJ-001", "cost_center": "Main - TEST"}], + } + result = validate_invoice(invoice, "Sales Invoice") + assert not result.passed + assert any( + "Line item #1" in i.message and "Cost Center" in i.message + for i in result.issues + ) + + +def test_line_item_project_mismatch_flagged() -> None: + invoice = { + **VALID_SALES_INVOICE, + "items": [ + {"idx": 1, "project": "TEST-PROJ-999", "cost_center": "Projects - TEST"} + ], + } + result = validate_invoice(invoice, "Sales Invoice") + assert not result.passed + assert any( + "Line item #1" in i.message and "Project" in i.message for i in result.issues + ) + + +def test_due_date_too_soon_flagged() -> None: + invoice = {**VALID_PURCHASE_INVOICE, "due_date": "2026-01-10"} + result = validate_invoice(invoice, "Purchase Invoice") + assert not result.passed + assert any("Due date" in i.message for i in result.issues) + + +def test_due_date_exactly_29_days_passes() -> None: + invoice = { + **VALID_PURCHASE_INVOICE, + "posting_date": "2026-01-01", + "due_date": "2026-01-30", + } + assert validate_invoice(invoice, "Purchase Invoice").passed + + +def test_due_date_non_string_value_skipped() -> None: + invoice = {**VALID_PURCHASE_INVOICE, "posting_date": 20260101, "due_date": 20260102} + assert validate_invoice(invoice, "Purchase Invoice").passed + + +def test_due_date_not_checked_for_sales_invoice() -> None: + invoice = {**VALID_SALES_INVOICE, "due_date": "2026-01-02"} + assert validate_invoice(invoice, "Sales Invoice").passed + + +def test_multiple_issues_reported() -> None: + invoice = { + **VALID_PURCHASE_INVOICE, + "cost_center": "Main - TEST", + "due_date": "2026-01-02", + "items": [{"idx": 1, "project": "TEST-PROJ-001", "cost_center": "Main - TEST"}], + } + result = validate_invoice(invoice, "Purchase Invoice") + assert len(result.issues) >= 2 diff --git a/tests/unit/test_invoices_cog.py b/tests/unit/test_invoices_cog.py new file mode 100644 index 00000000..80df1a57 --- /dev/null +++ b/tests/unit/test_invoices_cog.py @@ -0,0 +1,299 @@ +"""Unit tests for the invoices Discord cog.""" + +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from five08.discord_bot.cogs.invoices import InvoicesCog +from five08.clients.erpnext import ERPNextAPIError + + +VALID_INVOICE = { + "name": "TEST-SINV-0001", + "docstatus": 0, + "owner": "test-user@example.com", + "project": "TEST-PROJ-001", + "cost_center": "Projects - TEST", + "posting_date": "2026-01-01", + "due_date": "2026-02-01", + "items": [{"idx": 1, "project": "TEST-PROJ-001", "cost_center": "Projects - TEST"}], +} + + +def _make_interaction(*, role_names: list[str], user_id: int) -> AsyncMock: + interaction = AsyncMock() + interaction.response = AsyncMock() + interaction.response.defer = AsyncMock() + interaction.followup = AsyncMock() + interaction.followup.send = AsyncMock() + interaction.namespace = Mock() + interaction.namespace.doctype = "Sales Invoice" + roles = [] + for role_name in role_names: + role = Mock() + role.name = role_name + roles.append(role) + interaction.user = Mock() + interaction.user.roles = roles + interaction.user.id = user_id + return interaction + + +@pytest.fixture +def mock_interaction() -> AsyncMock: + """A privileged (Steering Committee) caller with full invoice access.""" + return _make_interaction(role_names=["Steering Committee"], user_id=1001) + + +@pytest.fixture +def mock_member_interaction() -> AsyncMock: + """A non-privileged caller subject to owner/project access rules.""" + return _make_interaction(role_names=[], user_id=2002) + + +@pytest.fixture +def mock_doctype() -> Mock: + choice = Mock() + choice.value = "Sales Invoice" + return choice + + +@pytest.fixture +def cog() -> InvoicesCog: + with patch("five08.discord_bot.cogs.invoices.ERPNextClient"): + return InvoicesCog(Mock()) + + +# --------------------------------------------------------------------------- +# validate_invoice_command tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_validate_invoice_passes(cog, mock_interaction, mock_doctype): + cog.client.get_invoice = Mock(return_value=VALID_INVOICE) + await cog.validate_invoice_command.callback( + cog, mock_interaction, mock_doctype, "TEST-SINV-0001" + ) + embed = mock_interaction.followup.send.call_args.kwargs["embed"] + assert "No issues found" in embed.title + + +@pytest.mark.asyncio +async def test_validate_invoice_shows_invoice_info_field( + cog, mock_interaction, mock_doctype +): + cog.client.get_invoice = Mock(return_value=VALID_INVOICE) + await cog.validate_invoice_command.callback( + cog, mock_interaction, mock_doctype, "TEST-SINV-0001" + ) + embed = mock_interaction.followup.send.call_args.kwargs["embed"] + field_names = [f.name for f in embed.fields] + assert "Invoice Info" in field_names + + +@pytest.mark.asyncio +async def test_validate_invoice_not_found(cog, mock_interaction, mock_doctype): + cog.client.get_invoice = Mock(return_value=None) + await cog.validate_invoice_command.callback( + cog, mock_interaction, mock_doctype, "DOES-NOT-EXIST" + ) + sent = mock_interaction.followup.send.call_args.args[0] + assert "not found" in sent + + +@pytest.mark.asyncio +async def test_validate_invoice_api_error(cog, mock_interaction, mock_doctype): + cog.client.get_invoice = Mock(side_effect=ERPNextAPIError("connection failed")) + await cog.validate_invoice_command.callback( + cog, mock_interaction, mock_doctype, "TEST-SINV-0001" + ) + sent = mock_interaction.followup.send.call_args.args[0] + assert "Failed to fetch" in sent + + +@pytest.mark.asyncio +async def test_validate_invoice_issues_truncated_under_discord_limit( + cog, mock_interaction, mock_doctype +): + cog.client.get_invoice = Mock(return_value=VALID_INVOICE) + with patch("five08.discord_bot.cogs.invoices.validate_invoice") as mock_validate: + result = Mock() + result.passed = False + result.issues = [Mock(message="x" * 2000)] + mock_validate.return_value = result + await cog.validate_invoice_command.callback( + cog, mock_interaction, mock_doctype, "TEST-SINV-0001" + ) + embed = mock_interaction.followup.send.call_args.kwargs["embed"] + issues_field = next(f for f in embed.fields if f.name == "Issues") + assert len(issues_field.value) <= 1024 + + +# --------------------------------------------------------------------------- +# validate_invoice_command authorization tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_validate_invoice_denied_without_erp_identity( + cog, mock_member_interaction, mock_doctype +): + cog.client.get_invoice = Mock() + with patch( + "five08.discord_bot.cogs.invoices.project_viewer_emails_for_discord", + return_value=[], + ): + await cog.validate_invoice_command.callback( + cog, mock_member_interaction, mock_doctype, "TEST-SINV-0001" + ) + sent = mock_member_interaction.followup.send.call_args.args[0] + assert "Steering Committee" in sent + cog.client.get_invoice.assert_not_called() + + +@pytest.mark.asyncio +async def test_validate_invoice_allowed_for_invoice_owner( + cog, mock_member_interaction, mock_doctype +): + owned = {**VALID_INVOICE, "owner": "member@example.com"} + cog.client.get_invoice = Mock(return_value=owned) + with ( + patch( + "five08.discord_bot.cogs.invoices.project_viewer_emails_for_discord", + return_value=["member@example.com"], + ), + patch( + "five08.discord_bot.cogs.invoices.list_dashboard_projects", + return_value=[], + ), + ): + await cog.validate_invoice_command.callback( + cog, mock_member_interaction, mock_doctype, "TEST-SINV-0001" + ) + embed = mock_member_interaction.followup.send.call_args.kwargs["embed"] + assert "No issues found" in embed.title + + +@pytest.mark.asyncio +async def test_validate_invoice_allowed_for_project_member( + cog, mock_member_interaction, mock_doctype +): + others = {**VALID_INVOICE, "owner": "someone-else@example.com"} + cog.client.get_invoice = Mock(return_value=others) + with ( + patch( + "five08.discord_bot.cogs.invoices.project_viewer_emails_for_discord", + return_value=["member@example.com"], + ), + patch( + "five08.discord_bot.cogs.invoices.list_dashboard_projects", + return_value=[{"erpnext_project_id": "TEST-PROJ-001"}], + ), + ): + await cog.validate_invoice_command.callback( + cog, mock_member_interaction, mock_doctype, "TEST-SINV-0001" + ) + embed = mock_member_interaction.followup.send.call_args.kwargs["embed"] + assert "No issues found" in embed.title + + +@pytest.mark.asyncio +async def test_validate_invoice_denied_for_unrelated_invoice( + cog, mock_member_interaction, mock_doctype +): + unrelated = { + **VALID_INVOICE, + "owner": "someone-else@example.com", + "project": "OTHER-PROJ", + } + cog.client.get_invoice = Mock(return_value=unrelated) + with ( + patch( + "five08.discord_bot.cogs.invoices.project_viewer_emails_for_discord", + return_value=["member@example.com"], + ), + patch( + "five08.discord_bot.cogs.invoices.list_dashboard_projects", + return_value=[{"erpnext_project_id": "TEST-PROJ-001"}], + ), + ): + await cog.validate_invoice_command.callback( + cog, mock_member_interaction, mock_doctype, "TEST-SINV-0001" + ) + sent = mock_member_interaction.followup.send.call_args.args[0] + assert "don't have access" in sent + + +# --------------------------------------------------------------------------- +# invoice_name_autocomplete tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_autocomplete_returns_choices(cog, mock_interaction): + cog.client.search_invoices = Mock( + return_value=[ + { + "name": "TEST-SINV-0001", + "docstatus": 0, + "owner": "test-user@example.com", + "posting_date": "2026-01-01", + }, + ] + ) + choices = await cog.invoice_name_autocomplete(mock_interaction, "TEST-SINV") + assert len(choices) == 1 + assert choices[0].value == "TEST-SINV-0001" + + +@pytest.mark.asyncio +async def test_autocomplete_returns_empty_on_api_error(cog, mock_interaction): + cog.client.search_invoices = Mock(side_effect=ERPNextAPIError("timeout")) + choices = await cog.invoice_name_autocomplete(mock_interaction, "TEST-SINV") + assert choices == [] + + +@pytest.mark.asyncio +async def test_autocomplete_returns_empty_when_no_doctype(cog, mock_interaction): + mock_interaction.namespace.doctype = None + choices = await cog.invoice_name_autocomplete(mock_interaction, "") + assert choices == [] + + +@pytest.mark.asyncio +async def test_autocomplete_returns_empty_without_erp_identity( + cog, mock_member_interaction +): + cog.client.search_invoices = Mock() + with patch( + "five08.discord_bot.cogs.invoices.project_viewer_emails_for_discord", + return_value=[], + ): + choices = await cog.invoice_name_autocomplete(mock_member_interaction, "TEST") + assert choices == [] + cog.client.search_invoices.assert_not_called() + + +@pytest.mark.asyncio +async def test_autocomplete_scopes_to_owner_and_projects_for_member( + cog, mock_member_interaction +): + cog.client.search_invoices = Mock(return_value=[]) + with ( + patch( + "five08.discord_bot.cogs.invoices.project_viewer_emails_for_discord", + return_value=["member@example.com"], + ), + patch( + "five08.discord_bot.cogs.invoices.list_dashboard_projects", + return_value=[ + {"erpnext_project_id": "PROJ-1"}, + {"erpnext_project_id": None}, + ], + ), + ): + await cog.invoice_name_autocomplete(mock_member_interaction, "TEST") + kwargs = cog.client.search_invoices.call_args.kwargs + assert kwargs["owners"] == ["member@example.com"] + assert kwargs["projects"] == ["PROJ-1"]