Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/anomalies.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ qualytics anomalies list --datastore-id 1 --status Active
# Filter by date range
qualytics anomalies list --datastore-id 1 \
--start-date 2026-01-01 --end-date 2026-01-31

# Only anomalies whose source records were enriched
qualytics anomalies list --datastore-id 1 --source-enriched

# Only anomalies whose source records were NOT enriched
qualytics anomalies list --datastore-id 1 --no-source-enriched
```

## Inspecting an Anomaly
Expand All @@ -40,8 +46,19 @@ qualytics anomalies update --id 42 --status Acknowledged

# Bulk update
qualytics anomalies update --ids 42,43,44 --status Active

# Set assignees on a single anomaly
qualytics anomalies update --id 42 --status Active --assignee-ids "7,12"

# Clear assignees (empty string)
qualytics anomalies update --id 42 --status Active --assignee-ids ""

# Bulk-assign a reviewer
qualytics anomalies update --ids 42,43,44 --status Active --assignee-ids "7"
```

User IDs come from `qualytics users list`.

## Archiving

Archive anomalies with a resolution status:
Expand Down
23 changes: 23 additions & 0 deletions docs/checks.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,29 @@ qualytics checks create --datastore-id 1 --file checks.yaml
| `tags` | list[string] | No | Tags for filtering |
| `status` | string | No | `Active` or `Draft` (default: Active) |

## Ownership and Assignment

Each check can have an owner and a default anomaly assignee. Set them via the
CLI flags `--owner-id` and `--default-anomaly-assignee-id` on `checks create`
or `checks update`. Pass `0` on `update` to clear an existing value.

User IDs are environment-specific, so ownership is **not** carried in the
portable export YAML — `checks export` strips these fields and `checks
import` won't restore them. Apply ownership at import time per environment.

```bash
# Apply a single owner to every check in a bulk import
qualytics checks create --datastore-id 1 --file checks.yaml --owner-id 7

# Update a check's assignee
qualytics checks update --id 42 --file check.yaml --default-anomaly-assignee-id 12

# Clear ownership on update
qualytics checks update --id 42 --file check.yaml --owner-id 0
```

User IDs come from `qualytics users list`.

## Export and Import

### Export checks
Expand Down
26 changes: 26 additions & 0 deletions docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,32 @@ qualytics connections create \
--parameters '{"role": "ANALYST", "warehouse": "COMPUTE_WH"}'
```

### IAM Role authentication (S3, Athena, Redshift)

S3, Athena, and Redshift connections can authenticate via an IAM Role instead
of static credentials. Use `--authentication-type IAM_ROLE` together with
`--role-arn` (and optionally `--external-id`).

```bash
# S3 with IAM Role (alternative to --access-key / --secret-key)
qualytics connections create \
--type s3 \
--name s3-prod \
--uri s3://my-bucket \
--authentication-type IAM_ROLE \
--role-arn arn:aws:iam::123456789012:role/QualyticsReader \
--external-id my-external-id

# Athena with IAM Role (alternative to --username / --password)
qualytics connections create \
--type athena \
--name athena-prod \
--authentication-type IAM_ROLE \
--role-arn arn:aws:iam::123456789012:role/QualyticsAthena
```

Default authentication is `SHARED_KEY` for S3 and `BASIC` for Athena/Redshift.

## Listing and Retrieving

```bash
Expand Down
20 changes: 13 additions & 7 deletions docs/operations.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Operations

Operations are the data processing workflows in Qualytics. The standard lifecycle is: **sync** (discover containers) then **profile** (infer checks) then **scan** (detect anomalies).
Operations are the data processing workflows in Qualytics. The standard lifecycle is: **sync** (discover containers) then **profile** (generate AI managed checks) then **scan** (detect anomalies).

## Commands

| Command | Description |
|---------|-------------|
| `operations sync` | Trigger a sync operation (discover containers) |
| `operations profile` | Trigger a profile operation (infer quality checks) |
| `operations profile` | Trigger a profile operation (generate AI managed checks) |
| `operations scan` | Trigger a scan operation (detect anomalies) |
| `operations materialize` | Trigger a materialize operation (computed containers) |
| `operations export` | Trigger an export operation (anomalies, checks, profiles) |
Expand All @@ -25,16 +25,16 @@ qualytics operations sync --datastore-id 1

Discovers tables, views, and files in the datastore and creates container records.

### 2. Profile (infer checks)
### 2. Profile (generate AI managed checks)

```bash
qualytics operations profile --datastore-id 1

# With inference threshold (higher = more checks inferred)
qualytics operations profile --datastore-id 1 --inference-threshold 3
# AI Effort levels: off, low, medium, high, xhigh, max
qualytics operations profile --datastore-id 1 --ai-effort high
```

Profiles container data to infer quality checks based on statistical analysis.
Profiles container data and generates AI managed checks based on statistical analysis.

### 3. Scan (detect anomalies)

Expand All @@ -46,9 +46,15 @@ qualytics operations scan --datastore-id 1 --container-names "orders,customers"

# Incremental scan (only new/updated records)
qualytics operations scan --datastore-id 1 --incremental

# Disable auto-resolution of passed anomalies (server default is on)
qualytics operations scan --datastore-id 1 --no-auto-resolve-passed-anomalies
```

Runs quality checks against the data and detects anomalies.
Runs quality checks against the data and detects anomalies. When a scan
completes, open anomalies whose fingerprints no longer fail are auto-resolved
unless `--no-auto-resolve-passed-anomalies` is passed. Auto-resolution is
silently disabled for incremental scans regardless of the flag.

## Running in Background

Expand Down
3 changes: 3 additions & 0 deletions qualytics/api/anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def list_anomalies(
archived: str | None = None,
sort_created: str | None = None,
sort_weight: str | None = None,
source_enriched: bool | None = None,
page: int = 1,
size: int = 100,
) -> dict:
Expand Down Expand Up @@ -53,6 +54,8 @@ def list_anomalies(
params["sort_created"] = sort_created
if sort_weight:
params["sort_weight"] = sort_weight
if source_enriched is not None:
params["source_enriched"] = str(source_enriched).lower()
response = client.get("anomalies", params=params)
return response.json()

Expand Down
33 changes: 32 additions & 1 deletion qualytics/api/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
"""Centralized API client for the Qualytics controlplane."""

import os

import requests
import urllib3
from rich import print

DEFAULT_TIMEOUT = 30


def _resolve_timeout(config: dict | None) -> int:
"""Resolve request timeout from env var, config, or default.

Precedence: ``QUALYTICS_TIMEOUT`` env var → ``timeout`` in config → 30.
"""
env_value = os.environ.get("QUALYTICS_TIMEOUT")
if env_value:
try:
parsed = int(env_value)
if parsed > 0:
return parsed
except ValueError:
pass
if config:
cfg_value = config.get("timeout")
if isinstance(cfg_value, int) and cfg_value > 0:
return cfg_value
return DEFAULT_TIMEOUT


class QualyticsAPIError(Exception):
"""Base exception for API errors."""
Expand Down Expand Up @@ -51,7 +75,7 @@ def __init__(
base_url: str,
token: str,
ssl_verify: bool = True,
timeout: int = 30,
timeout: int = DEFAULT_TIMEOUT,
):
self.base_url = base_url.rstrip("/")
if not self.base_url.endswith("/"):
Expand Down Expand Up @@ -109,6 +133,12 @@ def _request(self, method: str, path: str, **kwargs) -> requests.Response:
"If your server runs plain HTTP, use http:// instead of https:// "
"in your URL (e.g. qualytics init --url http://localhost:8000)."
)
except requests.exceptions.Timeout:
raise ConnectionError(
f"Request to {url} timed out after {kwargs['timeout']}s. "
"Increase the timeout via QUALYTICS_TIMEOUT=<seconds> "
"or set 'timeout' in ~/.qualytics/config.yaml."
)
except requests.exceptions.ConnectionError:
raise ConnectionError(
f"Could not connect to {url}. "
Expand Down Expand Up @@ -180,4 +210,5 @@ def get_client(config: dict | None = None) -> QualyticsClient:
base_url=base_url,
token=token,
ssl_verify=ssl_verify,
timeout=_resolve_timeout(config),
)
24 changes: 23 additions & 1 deletion qualytics/cli/anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ def anomalies_list(
end_date: str | None = typer.Option(
None, "--end-date", help="End date (YYYY-MM-DD)"
),
source_enriched: bool | None = typer.Option(
None,
"--source-enriched/--no-source-enriched",
help="Filter by source-record enrichment status (omit flag for no filter)",
),
fmt: OutputFormat = typer.Option(
OutputFormat.YAML, "--format", help="Output format: yaml or json"
),
Expand Down Expand Up @@ -116,6 +121,7 @@ def anomalies_list(
start_date=start_date,
end_date=end_date,
archived=archived,
source_enriched=source_enriched,
)

print(f"[green]Found {len(all_anomalies)} anomalies.[/green]")
Expand All @@ -142,6 +148,11 @@ def anomalies_update(
None, "--description", help="Update description"
),
tags: str | None = typer.Option(None, "--tags", help="Comma-separated tag names"),
assignee_ids: str | None = typer.Option(
None,
"--assignee-ids",
help='Comma-separated assignee user IDs (e.g. "1,2"). Empty string clears assignees.',
),
):
"""Update anomaly status (Active or Acknowledged)."""
if not anomaly_id and not ids:
Expand All @@ -156,6 +167,12 @@ def anomalies_update(
)
raise typer.Exit(code=1)

parsed_assignees: list[int] | None = None
if assignee_ids is not None:
parsed_assignees = (
[int(x) for x in _parse_comma_list(assignee_ids)] if assignee_ids else []
)

client = get_client()

if anomaly_id and not ids:
Expand All @@ -165,6 +182,8 @@ def anomalies_update(
payload["description"] = description
if tags:
payload["tags"] = _parse_comma_list(tags)
if parsed_assignees is not None:
payload["assignee_ids"] = parsed_assignees
result = update_anomaly(client, anomaly_id, payload)
print(f"[green]Anomaly {result['id']} updated to '{status}'.[/green]")
else:
Expand All @@ -175,7 +194,10 @@ def anomalies_update(
if ids:
id_list.extend(int(x) for x in _parse_comma_list(ids))

items = [{"id": aid, "status": status} for aid in id_list]
item_template: dict = {"status": status}
if parsed_assignees is not None:
item_template["assignee_ids"] = parsed_assignees
items = [{"id": aid, **item_template} for aid in id_list]
bulk_update_anomalies(client, items)
print(f"[green]Updated {len(id_list)} anomalies to '{status}'.[/green]")

Expand Down
5 changes: 5 additions & 0 deletions qualytics/cli/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,18 @@ def auth_status():
)
ssl_label = "[green]enabled[/green]" if ssl_verify else "[yellow]disabled[/yellow]"

from ..api.client import _resolve_timeout

timeout = _resolve_timeout(config)

print(f"[bold]{host}[/bold]")
print(f" URL: {url}")
print(f" Status: {status_icon}")
print(f" Token: {masked_token}")
if expiry_line:
print(f" Expiry: {expiry_line}")
print(f" SSL Verification: {ssl_label}")
print(f" Request timeout: {timeout}s")
print(f" Config file: {CONFIG_PATH}")

if not token_valid:
Expand Down
39 changes: 39 additions & 0 deletions qualytics/cli/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ def checks_create(
file: str = typer.Option(
..., "--file", "-f", help="YAML/JSON file with check definition(s)"
),
owner_id: int | None = typer.Option(
None,
"--owner-id",
help="Apply this owner user ID to every check in the batch (overrides file)",
),
default_anomaly_assignee_id: int | None = typer.Option(
None,
"--default-anomaly-assignee-id",
help="Apply this default anomaly assignee user ID to every check (overrides file)",
),
):
"""Create quality checks from a file (single or bulk)."""
client = get_client()
Expand Down Expand Up @@ -102,6 +112,10 @@ def checks_create(
failed += 1
continue
try:
if owner_id is not None:
check["owner_id"] = owner_id
if default_anomaly_assignee_id is not None:
check["default_anomaly_assignee_id"] = default_anomaly_assignee_id
payload = _build_create_payload(check, container_id)
result = create_quality_check(client, payload)
print(
Expand Down Expand Up @@ -198,6 +212,16 @@ def checks_update(
file: str = typer.Option(
..., "--file", "-f", help="YAML/JSON file with updated check definition"
),
owner_id: int | None = typer.Option(
None,
"--owner-id",
help="Owner user ID (overrides file). Pass 0 to clear.",
),
default_anomaly_assignee_id: int | None = typer.Option(
None,
"--default-anomaly-assignee-id",
help="Default anomaly assignee user ID (overrides file). Pass 0 to clear.",
),
):
"""Update a quality check from a file."""
client = get_client()
Expand All @@ -214,6 +238,21 @@ def checks_update(
"status": data.get("status", "Active"),
}

# CLI flag (if given) takes precedence over the file value.
# Pass through to the same normalization the service uses (0 → clear).
effective_owner = owner_id if owner_id is not None else data.get("owner_id")
effective_assignee = (
default_anomaly_assignee_id
if default_anomaly_assignee_id is not None
else data.get("default_anomaly_assignee_id")
)
if effective_owner is not None:
payload["owner_id"] = effective_owner if effective_owner else None
if effective_assignee is not None:
payload["default_anomaly_assignee_id"] = (
effective_assignee if effective_assignee else None
)

result = update_quality_check(client, check_id, payload)
print(f"[green]Quality check {result['id']} updated successfully.[/green]")

Expand Down
Loading
Loading