diff --git a/changelog/151.added.md b/changelog/151.added.md new file mode 100644 index 00000000..c770976f --- /dev/null +++ b/changelog/151.added.md @@ -0,0 +1 @@ +Add `infrahubctl schema export` command to export schemas from Infrahub. diff --git a/docs/docs/infrahubctl/infrahubctl-schema.mdx b/docs/docs/infrahubctl/infrahubctl-schema.mdx index 7d569cc4..bee685b1 100644 --- a/docs/docs/infrahubctl/infrahubctl-schema.mdx +++ b/docs/docs/infrahubctl/infrahubctl-schema.mdx @@ -17,6 +17,7 @@ $ infrahubctl schema [OPTIONS] COMMAND [ARGS]... **Commands**: * `check`: Check if schema files are valid and what... +* `export`: Export the schema from Infrahub as YAML... * `load`: Load one or multiple schema files into... ## `infrahubctl schema check` @@ -40,6 +41,25 @@ $ infrahubctl schema check [OPTIONS] SCHEMAS... * `--config-file TEXT`: [env var: INFRAHUBCTL_CONFIG; default: infrahubctl.toml] * `--help`: Show this message and exit. +## `infrahubctl schema export` + +Export the schema from Infrahub as YAML files, one per namespace. + +**Usage**: + +```console +$ infrahubctl schema export [OPTIONS] +``` + +**Options**: + +* `--directory PATH`: Directory path to store schema files [default: (dynamic)] +* `--branch TEXT`: Branch from which to export the schema +* `--namespace TEXT`: Namespace(s) to export (default: all user-defined) +* `--debug / --no-debug`: [default: no-debug] +* `--config-file TEXT`: [env var: INFRAHUBCTL_CONFIG; default: infrahubctl.toml] +* `--help`: Show this message and exit. + ## `infrahubctl schema load` Load one or multiple schema files into Infrahub. diff --git a/infrahub_sdk/ctl/schema.py b/infrahub_sdk/ctl/schema.py index 5a977b59..2179da7d 100644 --- a/infrahub_sdk/ctl/schema.py +++ b/infrahub_sdk/ctl/schema.py @@ -2,6 +2,7 @@ import asyncio import time +from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING, Any @@ -211,3 +212,49 @@ def _display_schema_warnings(console: Console, warnings: list[SchemaWarning]) -> console.print( f"[yellow] {warning.type.value}: {warning.message} [{', '.join([kind.display for kind in warning.kinds])}]" ) + + +def _default_export_directory() -> Path: + timestamp = datetime.now(timezone.utc).astimezone().strftime("%Y%m%d-%H%M%S") + return Path(f"infrahub-schema-export-{timestamp}") + + +@app.command() +@catch_exception(console=console) +async def export( + directory: Path = typer.Option(_default_export_directory, help="Directory path to store schema files"), + branch: str = typer.Option(None, help="Branch from which to export the schema"), + namespace: list[str] = typer.Option([], help="Namespace(s) to export (default: all user-defined)"), + debug: bool = False, + _: str = CONFIG_PARAM, +) -> None: + """Export the schema from Infrahub as YAML files, one per namespace.""" + init_logging(debug=debug) + + client = initialize_client() + user_schemas = await client.schema.export( + branch=branch, + namespaces=namespace or None, + ) + + if not user_schemas.namespaces: + console.print("[yellow]No user-defined schema found to export.") + return + + directory.mkdir(parents=True, exist_ok=True) + + for ns, data in sorted(user_schemas.namespaces.items()): + payload: dict[str, Any] = {"version": "1.0"} + if data.generics: + payload["generics"] = data.generics + if data.nodes: + payload["nodes"] = data.nodes + + output_file = directory / f"{ns.lower()}.yml" + output_file.write_text( + yaml.dump(payload, default_flow_style=False, sort_keys=False, allow_unicode=True), + encoding="utf-8", + ) + console.print(f"[green] Exported namespace '{ns}' to {output_file}") + + console.print(f"[green] Schema exported to {directory}") diff --git a/infrahub_sdk/schema/__init__.py b/infrahub_sdk/schema/__init__.py index 3e61ad2a..557e76f3 100644 --- a/infrahub_sdk/schema/__init__.py +++ b/infrahub_sdk/schema/__init__.py @@ -22,6 +22,7 @@ ) from ..graphql import Mutation from ..queries import SCHEMA_HASH_SYNC_STATUS +from .export import RESTRICTED_NAMESPACES, NamespaceExport, SchemaExport, schema_to_export_dict from .main import ( AttributeSchema, AttributeSchemaAPI, @@ -54,6 +55,7 @@ "BranchSupportType", "GenericSchema", "GenericSchemaAPI", + "NamespaceExport", "NodeSchema", "NodeSchemaAPI", "ProfileSchemaAPI", @@ -61,9 +63,11 @@ "RelationshipKind", "RelationshipSchema", "RelationshipSchemaAPI", + "SchemaExport", "SchemaRoot", "SchemaRootAPI", "TemplateSchemaAPI", + "schema_to_export_dict", ] @@ -118,6 +122,47 @@ def __init__(self, client: InfrahubClient | InfrahubClientSync) -> None: self.client = client self.cache = {} + @staticmethod + def _build_export_schemas( + schema_nodes: MutableMapping[str, MainSchemaTypesAPI], + namespaces: list[str] | None = None, + ) -> SchemaExport: + """Organize fetched schemas into a per-namespace export structure. + + Filters out system types (Profile/Template) and restricted namespaces + (see :data:`RESTRICTED_NAMESPACES`), and optionally limits to specific + namespaces. If the caller requests restricted namespaces they are + silently excluded and a :func:`warnings.warn` is emitted. + + Returns: + A :class:`SchemaExport` containing user-defined schemas by namespace. + """ + if namespaces: + restricted = set(namespaces) & set(RESTRICTED_NAMESPACES) + if restricted: + warnings.warn( + f"Restricted namespace(s) {sorted(restricted)} requested but will be excluded from export", + stacklevel=3, + ) + + ns_map: dict[str, NamespaceExport] = {} + for schema in schema_nodes.values(): + if isinstance(schema, (ProfileSchemaAPI, TemplateSchemaAPI)): + continue + if schema.namespace in RESTRICTED_NAMESPACES: + continue + if namespaces and schema.namespace not in namespaces: + continue + ns = schema.namespace + if ns not in ns_map: + ns_map[ns] = NamespaceExport() + schema_dict = schema_to_export_dict(schema) + if isinstance(schema, GenericSchemaAPI): + ns_map[ns].generics.append(schema_dict) + else: + ns_map[ns].nodes.append(schema_dict) + return SchemaExport(namespaces=ns_map) + def validate(self, data: dict[str, Any]) -> None: SchemaRoot(**data) @@ -497,6 +542,32 @@ async def fetch( return branch_schema.nodes + async def export( + self, + branch: str | None = None, + namespaces: list[str] | None = None, + ) -> SchemaExport: + """Export user-defined schemas organized by namespace. + + Fetches schemas from the server, filters out system types and + restricted namespaces (see :data:`RESTRICTED_NAMESPACES`), and returns + a :class:`SchemaExport` object with per-namespace data. Restricted + namespaces such as ``Core`` and ``Builtin`` are always excluded even if + explicitly listed in *namespaces*; a warning is emitted when this + happens. + + Args: + branch: Branch to export from. Defaults to default_branch. + namespaces: Optional list of namespaces to include. If empty/None, + all user-defined namespaces are exported. + + Returns: + A :class:`SchemaExport` containing user-defined schemas by namespace. + """ + branch = branch or self.client.default_branch + schema_nodes = await self.fetch(branch=branch, namespaces=namespaces, populate_cache=False) + return self._build_export_schemas(schema_nodes=schema_nodes, namespaces=namespaces) + async def get_graphql_schema(self, branch: str | None = None) -> str: """Get the GraphQL schema as a string. @@ -739,6 +810,32 @@ def fetch( return branch_schema.nodes + def export( + self, + branch: str | None = None, + namespaces: list[str] | None = None, + ) -> SchemaExport: + """Export user-defined schemas organized by namespace. + + Fetches schemas from the server, filters out system types and + restricted namespaces (see :data:`RESTRICTED_NAMESPACES`), and returns + a :class:`SchemaExport` object with per-namespace data. Restricted + namespaces such as ``Core`` and ``Builtin`` are always excluded even if + explicitly listed in *namespaces*; a warning is emitted when this + happens. + + Args: + branch: Branch to export from. Defaults to default_branch. + namespaces: Optional list of namespaces to include. If empty/None, + all user-defined namespaces are exported. + + Returns: + A :class:`SchemaExport` containing user-defined schemas by namespace. + """ + branch = branch or self.client.default_branch + schema_nodes = self.fetch(branch=branch, namespaces=namespaces, populate_cache=False) + return self._build_export_schemas(schema_nodes=schema_nodes, namespaces=namespaces) + def get_graphql_schema(self, branch: str | None = None) -> str: """Get the GraphQL schema as a string. diff --git a/infrahub_sdk/schema/export.py b/infrahub_sdk/schema/export.py new file mode 100644 index 00000000..d5a09c77 --- /dev/null +++ b/infrahub_sdk/schema/export.py @@ -0,0 +1,120 @@ +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, Field + +from .main import GenericSchemaAPI, NodeSchemaAPI + + +class NamespaceExport(BaseModel): + """Export data for a single namespace.""" + + nodes: list[dict[str, Any]] = Field(default_factory=list) + generics: list[dict[str, Any]] = Field(default_factory=list) + + +class SchemaExport(BaseModel): + """Result of a schema export, organized by namespace.""" + + namespaces: dict[str, NamespaceExport] = Field(default_factory=dict) + + def to_dict(self) -> dict[str, dict[str, list[dict[str, Any]]]]: + """Convert to plain dict for YAML serialization.""" + return {ns: data.model_dump(exclude_defaults=True) for ns, data in self.namespaces.items()} + + +# Namespaces reserved by the Infrahub server — mirrored from +# backend/infrahub/core/constants/__init__.py in the opsmill/infrahub repo. +RESTRICTED_NAMESPACES: list[str] = [ + "Account", + "Branch", + "Builtin", + "Core", + "Deprecated", + "Diff", + "Infrahub", + "Internal", + "Lineage", + "Schema", + "Profile", + "Template", +] + +_SCHEMA_EXPORT_EXCLUDE: set[str] = {"hash", "hierarchy", "used_by", "id", "state"} +# branch is inherited from the node and need not be repeated on each field +_FIELD_EXPORT_EXCLUDE: set[str] = {"inherited", "allow_override", "hierarchical", "id", "state", "branch"} + +# Attribute field values that match schema loading defaults — omitted for cleaner output +_ATTR_EXPORT_DEFAULTS: dict[str, Any] = { + "read_only": False, + "optional": False, +} + +# Relationship field values that match schema loading defaults — omitted for cleaner output +_REL_EXPORT_DEFAULTS: dict[str, Any] = { + "direction": "bidirectional", + "on_delete": "no-action", + "cardinality": "many", + "optional": True, + "min_count": 0, + "max_count": 0, + "read_only": False, +} + +# Relationship kinds that Infrahub generates automatically — never user-defined +_AUTO_GENERATED_REL_KINDS: frozenset[str] = frozenset({"Group", "Profile", "Hierarchy"}) + + +def schema_to_export_dict(schema: NodeSchemaAPI | GenericSchemaAPI) -> dict[str, Any]: + """Convert an API schema object to an export-ready dict (omits API-internal fields).""" + data = schema.model_dump(exclude=_SCHEMA_EXPORT_EXCLUDE, exclude_none=True) + + # Pop attrs/rels so they can be re-inserted last for better readability + data.pop("attributes", None) + data.pop("relationships", None) + + # Generics with Hierarchy relationships were defined with `hierarchical: true`. + # Restore that flag and drop the auto-generated rels so the schema round-trips cleanly. + if isinstance(schema, GenericSchemaAPI) and any( + rel.kind == "Hierarchy" for rel in schema.relationships if not rel.inherited + ): + data["hierarchical"] = True + + # Strip uniqueness_constraints that are auto-generated from `unique: true` attributes + # (single-field entries of the form ["__value"]). User-defined multi-field + # constraints are preserved. + unique_attr_suffixes = {f"{attr.name}__value" for attr in schema.attributes if attr.unique} + user_constraints = [ + c + for c in (data.pop("uniqueness_constraints", None) or []) + if not (len(c) == 1 and c[0] in unique_attr_suffixes) + ] + if user_constraints: + data["uniqueness_constraints"] = user_constraints + + attributes = [ + { + k: v + for k, v in attr.model_dump(exclude=_FIELD_EXPORT_EXCLUDE, exclude_none=True).items() + if k not in _ATTR_EXPORT_DEFAULTS or v != _ATTR_EXPORT_DEFAULTS[k] + } + for attr in schema.attributes + if not attr.inherited + ] + if attributes: + data["attributes"] = attributes + + relationships = [ + { + k: v + for k, v in rel.model_dump(exclude=_FIELD_EXPORT_EXCLUDE, exclude_none=True).items() + if k not in _REL_EXPORT_DEFAULTS or v != _REL_EXPORT_DEFAULTS[k] + } + for rel in schema.relationships + if not rel.inherited and rel.kind not in _AUTO_GENERATED_REL_KINDS + ] + if relationships: + data["relationships"] = relationships + + return data diff --git a/tests/unit/sdk/test_schema_export.py b/tests/unit/sdk/test_schema_export.py new file mode 100644 index 00000000..ed2814e4 --- /dev/null +++ b/tests/unit/sdk/test_schema_export.py @@ -0,0 +1,269 @@ +from __future__ import annotations + +import warnings +from typing import TYPE_CHECKING, Any + +import pytest + +from infrahub_sdk.schema import ( + GenericSchemaAPI, + InfrahubSchemaBase, + NodeSchemaAPI, + ProfileSchemaAPI, + SchemaExport, + TemplateSchemaAPI, +) + +if TYPE_CHECKING: + from pytest_httpx import HTTPXMock + + from tests.unit.sdk.conftest import BothClients + +client_types = ["standard", "sync"] + +# --------------------------------------------------------------------------- +# Minimal schema API response builders (reused from ctl tests) +# --------------------------------------------------------------------------- + +_BASE_NODE: dict[str, Any] = { + "id": None, + "state": "present", + "hash": None, + "hierarchy": None, + "label": None, + "description": None, + "include_in_menu": None, + "menu_placement": None, + "display_label": None, + "display_labels": None, + "human_friendly_id": None, + "icon": None, + "uniqueness_constraints": None, + "documentation": None, + "order_by": None, + "inherit_from": [], + "branch": "aware", + "default_filter": None, + "generate_profile": None, + "generate_template": None, + "parent": None, + "children": None, + "attributes": [], + "relationships": [], +} + +_BASE_GENERIC: dict[str, Any] = { + "id": None, + "state": "present", + "hash": None, + "used_by": [], + "label": None, + "description": None, + "include_in_menu": None, + "menu_placement": None, + "display_label": None, + "display_labels": None, + "human_friendly_id": None, + "icon": None, + "uniqueness_constraints": None, + "documentation": None, + "order_by": None, + "attributes": [], + "relationships": [], +} + + +def _make_node_schema(namespace: str, name: str) -> NodeSchemaAPI: + return NodeSchemaAPI(**{**_BASE_NODE, "namespace": namespace, "name": name}) + + +def _make_generic_schema(namespace: str, name: str) -> GenericSchemaAPI: + return GenericSchemaAPI(**{**_BASE_GENERIC, "namespace": namespace, "name": name}) + + +def _make_profile_schema(namespace: str, name: str) -> ProfileSchemaAPI: + return ProfileSchemaAPI( + **{ + **_BASE_NODE, + "namespace": namespace, + "name": name, + } + ) + + +def _make_template_schema(namespace: str, name: str) -> TemplateSchemaAPI: + return TemplateSchemaAPI( + **{ + **_BASE_NODE, + "namespace": namespace, + "name": name, + } + ) + + +# --------------------------------------------------------------------------- +# _build_export_schemas tests +# --------------------------------------------------------------------------- + + +class TestBuildExportSchemas: + def test_separates_nodes_and_generics(self) -> None: + schema_nodes = { + "InfraDevice": _make_node_schema("Infra", "Device"), + "InfraInterface": _make_generic_schema("Infra", "Interface"), + } + result = InfrahubSchemaBase._build_export_schemas(schema_nodes) + assert isinstance(result, SchemaExport) + assert "Infra" in result.namespaces + assert len(result.namespaces["Infra"].nodes) == 1 + assert len(result.namespaces["Infra"].generics) == 1 + assert result.namespaces["Infra"].nodes[0]["name"] == "Device" + assert result.namespaces["Infra"].generics[0]["name"] == "Interface" + + def test_groups_by_namespace(self) -> None: + schema_nodes = { + "InfraDevice": _make_node_schema("Infra", "Device"), + "DcimRack": _make_node_schema("Dcim", "Rack"), + } + result = InfrahubSchemaBase._build_export_schemas(schema_nodes) + assert set(result.namespaces.keys()) == {"Infra", "Dcim"} + + def test_filters_profiles_and_templates(self) -> None: + schema_nodes = { + "InfraDevice": _make_node_schema("Infra", "Device"), + "ProfileInfraDevice": _make_profile_schema("Profile", "InfraDevice"), + "TemplateInfraDevice": _make_template_schema("Template", "InfraDevice"), + } + result = InfrahubSchemaBase._build_export_schemas(schema_nodes) + assert "Infra" in result.namespaces + assert "Profile" not in result.namespaces + assert "Template" not in result.namespaces + + def test_filters_restricted_namespaces(self) -> None: + schema_nodes = { + "CoreRepository": _make_node_schema("Core", "Repository"), + "BuiltinTag": _make_node_schema("Builtin", "Tag"), + "InfraDevice": _make_node_schema("Infra", "Device"), + } + result = InfrahubSchemaBase._build_export_schemas(schema_nodes) + assert "Core" not in result.namespaces + assert "Builtin" not in result.namespaces + assert "Infra" in result.namespaces + + def test_namespace_filter(self) -> None: + schema_nodes = { + "InfraDevice": _make_node_schema("Infra", "Device"), + "DcimRack": _make_node_schema("Dcim", "Rack"), + } + result = InfrahubSchemaBase._build_export_schemas(schema_nodes, namespaces=["Infra"]) + assert "Infra" in result.namespaces + assert "Dcim" not in result.namespaces + + def test_empty_when_no_user_schemas(self) -> None: + schema_nodes = { + "CoreRepository": _make_node_schema("Core", "Repository"), + } + result = InfrahubSchemaBase._build_export_schemas(schema_nodes) + assert result.namespaces == {} + + def test_warns_on_restricted_namespaces(self) -> None: + schema_nodes = { + "InfraDevice": _make_node_schema("Infra", "Device"), + } + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + result = InfrahubSchemaBase._build_export_schemas(schema_nodes, namespaces=["Infra", "Core"]) + assert len(w) == 1 + assert "Core" in str(w[0].message) + assert "Infra" in result.namespaces + + def test_to_dict(self) -> None: + schema_nodes = { + "InfraDevice": _make_node_schema("Infra", "Device"), + "InfraInterface": _make_generic_schema("Infra", "Interface"), + } + result = InfrahubSchemaBase._build_export_schemas(schema_nodes) + as_dict = result.to_dict() + assert isinstance(as_dict, dict) + assert "Infra" in as_dict + assert isinstance(as_dict["Infra"], dict) + assert len(as_dict["Infra"]["nodes"]) == 1 + assert len(as_dict["Infra"]["generics"]) == 1 + + +# --------------------------------------------------------------------------- +# Integration tests for export() method on client.schema +# --------------------------------------------------------------------------- + + +def _schema_response( + nodes: list[dict] | None = None, + generics: list[dict] | None = None, + profiles: list[dict] | None = None, + templates: list[dict] | None = None, +) -> dict: + return { + "main": "aabbccdd", + "nodes": nodes or [], + "generics": generics or [], + "profiles": profiles or [], + "templates": templates or [], + } + + +def _make_node_dict(namespace: str, name: str) -> dict[str, Any]: + return {**_BASE_NODE, "namespace": namespace, "name": name} + + +def _make_generic_dict(namespace: str, name: str) -> dict[str, Any]: + return {**_BASE_GENERIC, "namespace": namespace, "name": name} + + +@pytest.mark.parametrize("client_type", client_types) +async def test_export_returns_user_schemas(httpx_mock: HTTPXMock, clients: BothClients, client_type: str) -> None: + response = _schema_response( + nodes=[_make_node_dict("Infra", "Device"), _make_node_dict("Dcim", "Rack")], + generics=[_make_generic_dict("Infra", "GenericInterface")], + ) + httpx_mock.add_response(method="GET", url="http://mock/api/schema?branch=main", json=response) + + if client_type == "standard": + result = await clients.standard.schema.export(branch="main") + else: + result = clients.sync.schema.export(branch="main") + + assert isinstance(result, SchemaExport) + assert "Infra" in result.namespaces + assert "Dcim" in result.namespaces + assert len(result.namespaces["Infra"].nodes) == 1 + assert len(result.namespaces["Infra"].generics) == 1 + assert len(result.namespaces["Dcim"].nodes) == 1 + + +@pytest.mark.parametrize("client_type", client_types) +async def test_export_with_namespace_filter(httpx_mock: HTTPXMock, clients: BothClients, client_type: str) -> None: + response = _schema_response( + nodes=[_make_node_dict("Infra", "Device")], + ) + httpx_mock.add_response(method="GET", url="http://mock/api/schema?branch=main&namespaces=Infra", json=response) + + if client_type == "standard": + result = await clients.standard.schema.export(branch="main", namespaces=["Infra"]) + else: + result = clients.sync.schema.export(branch="main", namespaces=["Infra"]) + + assert "Infra" in result.namespaces + assert "Dcim" not in result.namespaces + + +@pytest.mark.parametrize("client_type", client_types) +async def test_export_empty_when_only_restricted(httpx_mock: HTTPXMock, clients: BothClients, client_type: str) -> None: + response = _schema_response(nodes=[_make_node_dict("Core", "Repository")]) + httpx_mock.add_response(method="GET", url="http://mock/api/schema?branch=main", json=response) + + if client_type == "standard": + result = await clients.standard.schema.export(branch="main") + else: + result = clients.sync.schema.export(branch="main") + + assert result.namespaces == {}