Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/151.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `infrahubctl schema export` command to export schemas from Infrahub.
20 changes: 20 additions & 0 deletions docs/docs/infrahubctl/infrahubctl-schema.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ $ infrahubctl schema [OPTIONS] COMMAND [ARGS]...
**Commands**:

* `check`: Check if schema files are valid and what...
* `export`: Export the schema from Infrahub as YAML...
* `load`: Load one or multiple schema files into...

## `infrahubctl schema check`
Expand All @@ -40,6 +41,25 @@ $ infrahubctl schema check [OPTIONS] SCHEMAS...
* `--config-file TEXT`: [env var: INFRAHUBCTL_CONFIG; default: infrahubctl.toml]
* `--help`: Show this message and exit.

## `infrahubctl schema export`

Export the schema from Infrahub as YAML files, one per namespace.

**Usage**:

```console
$ infrahubctl schema export [OPTIONS]
```

**Options**:

* `--directory PATH`: Directory path to store schema files [default: (dynamic)]
* `--branch TEXT`: Branch from which to export the schema
* `--namespace TEXT`: Namespace(s) to export (default: all user-defined)
* `--debug / --no-debug`: [default: no-debug]
* `--config-file TEXT`: [env var: INFRAHUBCTL_CONFIG; default: infrahubctl.toml]
* `--help`: Show this message and exit.

## `infrahubctl schema load`

Load one or multiple schema files into Infrahub.
Expand Down
47 changes: 47 additions & 0 deletions infrahub_sdk/ctl/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -211,3 +212,49 @@ def _display_schema_warnings(console: Console, warnings: list[SchemaWarning]) ->
console.print(
f"[yellow] {warning.type.value}: {warning.message} [{', '.join([kind.display for kind in warning.kinds])}]"
)


def _default_export_directory() -> Path:
timestamp = datetime.now(timezone.utc).astimezone().strftime("%Y%m%d-%H%M%S")
return Path(f"infrahub-schema-export-{timestamp}")


@app.command()
@catch_exception(console=console)
async def export(
directory: Path = typer.Option(_default_export_directory, help="Directory path to store schema files"),
branch: str = typer.Option(None, help="Branch from which to export the schema"),
namespace: list[str] = typer.Option([], help="Namespace(s) to export (default: all user-defined)"),
debug: bool = False,
_: str = CONFIG_PARAM,
) -> None:
"""Export the schema from Infrahub as YAML files, one per namespace."""
init_logging(debug=debug)

client = initialize_client()
user_schemas = await client.schema.export(
branch=branch,
namespaces=namespace or None,
)

if not user_schemas.namespaces:
console.print("[yellow]No user-defined schema found to export.")
return

directory.mkdir(parents=True, exist_ok=True)

for ns, data in sorted(user_schemas.namespaces.items()):
payload: dict[str, Any] = {"version": "1.0"}
if data.generics:
payload["generics"] = data.generics
if data.nodes:
payload["nodes"] = data.nodes

output_file = directory / f"{ns.lower()}.yml"
output_file.write_text(
yaml.dump(payload, default_flow_style=False, sort_keys=False, allow_unicode=True),
encoding="utf-8",
)
console.print(f"[green] Exported namespace '{ns}' to {output_file}")

console.print(f"[green] Schema exported to {directory}")
97 changes: 97 additions & 0 deletions infrahub_sdk/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from ..graphql import Mutation
from ..queries import SCHEMA_HASH_SYNC_STATUS
from .export import RESTRICTED_NAMESPACES, NamespaceExport, SchemaExport, schema_to_export_dict
from .main import (
AttributeSchema,
AttributeSchemaAPI,
Expand Down Expand Up @@ -54,16 +55,19 @@
"BranchSupportType",
"GenericSchema",
"GenericSchemaAPI",
"NamespaceExport",
"NodeSchema",
"NodeSchemaAPI",
"ProfileSchemaAPI",
"RelationshipCardinality",
"RelationshipKind",
"RelationshipSchema",
"RelationshipSchemaAPI",
"SchemaExport",
"SchemaRoot",
"SchemaRootAPI",
"TemplateSchemaAPI",
"schema_to_export_dict",
]


Expand Down Expand Up @@ -118,6 +122,47 @@ def __init__(self, client: InfrahubClient | InfrahubClientSync) -> None:
self.client = client
self.cache = {}

@staticmethod
def _build_export_schemas(
schema_nodes: MutableMapping[str, MainSchemaTypesAPI],
namespaces: list[str] | None = None,
) -> SchemaExport:
"""Organize fetched schemas into a per-namespace export structure.

Filters out system types (Profile/Template) and restricted namespaces
(see :data:`RESTRICTED_NAMESPACES`), and optionally limits to specific
namespaces. If the caller requests restricted namespaces they are
silently excluded and a :func:`warnings.warn` is emitted.

Returns:
A :class:`SchemaExport` containing user-defined schemas by namespace.
"""
if namespaces:
restricted = set(namespaces) & set(RESTRICTED_NAMESPACES)
if restricted:
warnings.warn(
f"Restricted namespace(s) {sorted(restricted)} requested but will be excluded from export",
stacklevel=3,
)

ns_map: dict[str, NamespaceExport] = {}
for schema in schema_nodes.values():
if isinstance(schema, (ProfileSchemaAPI, TemplateSchemaAPI)):
continue
if schema.namespace in RESTRICTED_NAMESPACES:
continue
if namespaces and schema.namespace not in namespaces:
continue
ns = schema.namespace
if ns not in ns_map:
ns_map[ns] = NamespaceExport()
schema_dict = schema_to_export_dict(schema)
if isinstance(schema, GenericSchemaAPI):
ns_map[ns].generics.append(schema_dict)
else:
ns_map[ns].nodes.append(schema_dict)
return SchemaExport(namespaces=ns_map)

def validate(self, data: dict[str, Any]) -> None:
SchemaRoot(**data)

Expand Down Expand Up @@ -497,6 +542,32 @@ async def fetch(

return branch_schema.nodes

async def export(
self,
branch: str | None = None,
namespaces: list[str] | None = None,
) -> SchemaExport:
"""Export user-defined schemas organized by namespace.

Fetches schemas from the server, filters out system types and
restricted namespaces (see :data:`RESTRICTED_NAMESPACES`), and returns
a :class:`SchemaExport` object with per-namespace data. Restricted
namespaces such as ``Core`` and ``Builtin`` are always excluded even if
explicitly listed in *namespaces*; a warning is emitted when this
happens.

Args:
branch: Branch to export from. Defaults to default_branch.
namespaces: Optional list of namespaces to include. If empty/None,
all user-defined namespaces are exported.

Returns:
A :class:`SchemaExport` containing user-defined schemas by namespace.
"""
branch = branch or self.client.default_branch
schema_nodes = await self.fetch(branch=branch, namespaces=namespaces, populate_cache=False)
return self._build_export_schemas(schema_nodes=schema_nodes, namespaces=namespaces)

async def get_graphql_schema(self, branch: str | None = None) -> str:
"""Get the GraphQL schema as a string.

Expand Down Expand Up @@ -739,6 +810,32 @@ def fetch(

return branch_schema.nodes

def export(
self,
branch: str | None = None,
namespaces: list[str] | None = None,
) -> SchemaExport:
"""Export user-defined schemas organized by namespace.

Fetches schemas from the server, filters out system types and
restricted namespaces (see :data:`RESTRICTED_NAMESPACES`), and returns
a :class:`SchemaExport` object with per-namespace data. Restricted
namespaces such as ``Core`` and ``Builtin`` are always excluded even if
explicitly listed in *namespaces*; a warning is emitted when this
happens.

Args:
branch: Branch to export from. Defaults to default_branch.
namespaces: Optional list of namespaces to include. If empty/None,
all user-defined namespaces are exported.

Returns:
A :class:`SchemaExport` containing user-defined schemas by namespace.
"""
branch = branch or self.client.default_branch
schema_nodes = self.fetch(branch=branch, namespaces=namespaces, populate_cache=False)
return self._build_export_schemas(schema_nodes=schema_nodes, namespaces=namespaces)

def get_graphql_schema(self, branch: str | None = None) -> str:
"""Get the GraphQL schema as a string.

Expand Down
120 changes: 120 additions & 0 deletions infrahub_sdk/schema/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from __future__ import annotations

from typing import Any

from pydantic import BaseModel, Field

from .main import GenericSchemaAPI, NodeSchemaAPI


class NamespaceExport(BaseModel):
"""Export data for a single namespace."""

nodes: list[dict[str, Any]] = Field(default_factory=list)
generics: list[dict[str, Any]] = Field(default_factory=list)


class SchemaExport(BaseModel):
"""Result of a schema export, organized by namespace."""

namespaces: dict[str, NamespaceExport] = Field(default_factory=dict)

def to_dict(self) -> dict[str, dict[str, list[dict[str, Any]]]]:
"""Convert to plain dict for YAML serialization."""
return {ns: data.model_dump(exclude_defaults=True) for ns, data in self.namespaces.items()}


# Namespaces reserved by the Infrahub server — mirrored from
# backend/infrahub/core/constants/__init__.py in the opsmill/infrahub repo.
RESTRICTED_NAMESPACES: list[str] = [
"Account",
"Branch",
"Builtin",
"Core",
"Deprecated",
"Diff",
"Infrahub",
"Internal",
"Lineage",
"Schema",
"Profile",
"Template",
]

_SCHEMA_EXPORT_EXCLUDE: set[str] = {"hash", "hierarchy", "used_by", "id", "state"}
# branch is inherited from the node and need not be repeated on each field
_FIELD_EXPORT_EXCLUDE: set[str] = {"inherited", "allow_override", "hierarchical", "id", "state", "branch"}

# Attribute field values that match schema loading defaults — omitted for cleaner output
_ATTR_EXPORT_DEFAULTS: dict[str, Any] = {
"read_only": False,
"optional": False,
}

# Relationship field values that match schema loading defaults — omitted for cleaner output
_REL_EXPORT_DEFAULTS: dict[str, Any] = {
"direction": "bidirectional",
"on_delete": "no-action",
"cardinality": "many",
"optional": True,
"min_count": 0,
"max_count": 0,
"read_only": False,
}

# Relationship kinds that Infrahub generates automatically — never user-defined
_AUTO_GENERATED_REL_KINDS: frozenset[str] = frozenset({"Group", "Profile", "Hierarchy"})


def schema_to_export_dict(schema: NodeSchemaAPI | GenericSchemaAPI) -> dict[str, Any]:
"""Convert an API schema object to an export-ready dict (omits API-internal fields)."""
data = schema.model_dump(exclude=_SCHEMA_EXPORT_EXCLUDE, exclude_none=True)

# Pop attrs/rels so they can be re-inserted last for better readability
data.pop("attributes", None)
data.pop("relationships", None)

# Generics with Hierarchy relationships were defined with `hierarchical: true`.
# Restore that flag and drop the auto-generated rels so the schema round-trips cleanly.
if isinstance(schema, GenericSchemaAPI) and any(
rel.kind == "Hierarchy" for rel in schema.relationships if not rel.inherited
):
data["hierarchical"] = True

# Strip uniqueness_constraints that are auto-generated from `unique: true` attributes
# (single-field entries of the form ["<attr>__value"]). User-defined multi-field
# constraints are preserved.
unique_attr_suffixes = {f"{attr.name}__value" for attr in schema.attributes if attr.unique}
user_constraints = [
c
for c in (data.pop("uniqueness_constraints", None) or [])
if not (len(c) == 1 and c[0] in unique_attr_suffixes)
]
if user_constraints:
data["uniqueness_constraints"] = user_constraints

attributes = [
{
k: v
for k, v in attr.model_dump(exclude=_FIELD_EXPORT_EXCLUDE, exclude_none=True).items()
if k not in _ATTR_EXPORT_DEFAULTS or v != _ATTR_EXPORT_DEFAULTS[k]
}
for attr in schema.attributes
if not attr.inherited
]
if attributes:
data["attributes"] = attributes

relationships = [
{
k: v
for k, v in rel.model_dump(exclude=_FIELD_EXPORT_EXCLUDE, exclude_none=True).items()
if k not in _REL_EXPORT_DEFAULTS or v != _REL_EXPORT_DEFAULTS[k]
}
for rel in schema.relationships
if not rel.inherited and rel.kind not in _AUTO_GENERATED_REL_KINDS
]
if relationships:
data["relationships"] = relationships

return data
Loading