Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions docs/examples/patterns/migrations_with_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from pathlib import Path

__all__ = ("test_migrations_with_schema",)


def test_migrations_with_schema(tmp_path: Path) -> None:
# start-example
from sqlspec.adapters.duckdb import DuckDBConfig
from sqlspec.migrations.commands import SyncMigrationCommands

migration_dir = tmp_path / "migrations"
db_path = tmp_path / "app.duckdb"

config = DuckDBConfig(
connection_config={"database": str(db_path)},
migration_config={
"script_location": str(migration_dir),
"version_table_name": "schema_versions",
"default_schema": "app_schema",
"version_table_schema": "admin_schema",
},
)

try:
with config.provide_session() as session:
session.execute("CREATE SCHEMA app_schema")
session.execute("CREATE SCHEMA admin_schema")

commands = SyncMigrationCommands(config)
commands.init(str(migration_dir), package=True)

(migration_dir / "0001_create_users.py").write_text(
'''"""Create users."""


def up():
"""Create an unqualified table in app_schema."""
return ["CREATE TABLE users (id INTEGER PRIMARY KEY, name VARCHAR NOT NULL)"]


def down():
"""Drop the unqualified table from app_schema."""
return ["DROP TABLE IF EXISTS users"]
'''
)

commands.upgrade()

with config.provide_session() as session:
users_table = session.select_value(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = ? AND table_name = ?
""",
("app_schema", "users"),
)
tracker_table = session.select_value(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = ? AND table_name = ?
""",
("admin_schema", "schema_versions"),
)

assert users_table == "users"
assert tracker_table == "schema_versions"
finally:
if config.connection_instance:
config.close_pool()
# end-example
62 changes: 62 additions & 0 deletions docs/usage/migrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,68 @@ specific extension, ``migration_config["include_extensions"]`` to opt in
explicitly by extension name, or ``migration_config["enabled"] = False`` to
disable migrations entirely for a database config.

Configuring a Default Schema
----------------------------

Use ``migration_config["default_schema"]`` when migration SQL should run
against a pre-existing schema or dataset without qualifying every table in each
migration file. SQLSpec validates the schema before creating the tracker table
or applying DDL, then configures the migration session before each migration is
executed.

Use ``migration_config["version_table_schema"]`` when the migration tracker
table should live somewhere different from the objects managed by migrations.
If ``version_table_schema`` is not set, the tracker schema resolves to
``default_schema``. If neither field is set, the tracker table is unqualified and
uses the adapter's normal default namespace.

.. code-block:: python

from sqlspec.adapters.asyncpg import AsyncpgConfig

config = AsyncpgConfig(
connection_config={"dsn": "postgresql://localhost/app"},
migration_config={
"script_location": "migrations/postgres",
"version_table_name": "schema_versions",
"default_schema": "app_schema",
"version_table_schema": "admin_schema",
},
)

The operator must create the target schema or dataset before running
migrations. The migration role also needs the database-specific privileges to
create objects there. For PostgreSQL, that usually means ``USAGE`` and
``CREATE`` on the target schema, plus permission to create or update the
tracker table.

Adapter support:

.. list-table::
:header-rows: 1

* - Adapter
- Behavior
* - ``asyncpg``, ``psycopg``, ``psqlpy``, ADBC PostgreSQL
- Uses PostgreSQL ``search_path`` and validates ``information_schema.schemata``.
* - ``oracledb``
- Uses ``ALTER SESSION SET CURRENT_SCHEMA`` and validates Oracle users.
* - ``duckdb``
- Uses ``SET search_path`` and validates ``information_schema.schemata``.
* - ``bigquery``
- Treats schemas as datasets and sets the BigQuery job ``default_dataset``.
* - ``sqlite``, ``aiosqlite``, ``asyncmy``
- Accept the setting as an explicit no-op and log that default schemas are unsupported.
* - ADBC SQL Server
- Accepts the setting as a no-op; configure the default schema at the user or login level.

Example with unqualified DDL:

.. literalinclude:: /examples/patterns/migrations_with_schema.py
:language: python
:start-after: # start-example
:end-before: # end-example

Logging and Echo Controls
-------------------------

Expand Down
3 changes: 3 additions & 0 deletions sqlspec/adapters/adbc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ def __init__(
observability_config=observability_config,
**kwargs,
)
object.__setattr__(
self, "supports_migration_schemas", is_postgres_dialect(resolve_dialect_from_config(self.connection_config))
)

def create_connection(self) -> AdbcConnection:
"""Create and return a new connection using the specified driver.
Expand Down
35 changes: 35 additions & 0 deletions sqlspec/adapters/adbc/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from sqlspec.core import SQL, StatementConfig, build_arrow_result_from_table, get_cache_config, register_driver_profile
from sqlspec.driver import BaseSyncExceptionHandler, SyncDriverAdapterBase
from sqlspec.exceptions import DatabaseConnectionError, SQLSpecError
from sqlspec.migrations.utils import quote_migration_identifier
from sqlspec.utils.logging import get_logger
from sqlspec.utils.module_loader import ensure_pyarrow
from sqlspec.utils.serializers import to_json
Expand Down Expand Up @@ -282,6 +283,40 @@ def rollback(self) -> None:
msg = f"Failed to rollback transaction: {e}"
raise SQLSpecError(msg) from e

def set_migration_session_schema(self, schema: str) -> None:
"""Set the PostgreSQL search path for migration SQL when using ADBC PostgreSQL."""
if not self._is_postgres:
super().set_migration_session_schema(schema)
if self._dialect_name in {"mssql", "sqlserver", "tsql"}:
logger.debug(
"SQL Server schema support not yet implemented for ADBC; configure default schema at the "
"user/login level; ignoring default_schema=%r",
schema,
)
else:
logger.debug("%s driver does not support default schemas; ignoring default_schema=%r", "ADBC", schema)
return
quoted_schema = quote_migration_identifier(schema)
with self.with_cursor(self.connection) as cursor:
cursor.execute(f'SET search_path TO {quoted_schema}, "$user", public')

def has_schema(self, schema: str) -> bool:
"""Return whether a PostgreSQL schema exists when using ADBC PostgreSQL."""
if not self._is_postgres:
super().has_schema(schema)
if self._dialect_name in {"mssql", "sqlserver", "tsql"}:
logger.debug(
"SQL Server schema support not yet implemented for ADBC; configure default schema at the "
"user/login level; accepting default_schema=%r",
schema,
)
else:
logger.debug("%s driver does not support default schemas; accepting default_schema=%r", "ADBC", schema)
return True
with self.with_cursor(self.connection) as cursor:
cursor.execute("SELECT 1 FROM information_schema.schemata WHERE schema_name = $1", parameters=[schema])
return cursor.fetchone() is not None

def with_cursor(self, connection: "AdbcConnection") -> "AdbcCursor":
"""Create context manager for cursor.

Expand Down
14 changes: 14 additions & 0 deletions sqlspec/adapters/aiosqlite/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from sqlspec.core import ArrowResult, get_cache_config, register_driver_profile
from sqlspec.driver import AsyncDriverAdapterBase, BaseAsyncExceptionHandler
from sqlspec.exceptions import SQLSpecError
from sqlspec.utils.logging import get_logger

if TYPE_CHECKING:
from sqlspec.adapters.aiosqlite._typing import AiosqliteConnection
Expand All @@ -47,6 +48,8 @@
SQLITE_IOERR_CODE = 10
SQLITE_MISMATCH_CODE = 20

logger = get_logger(__name__)


class AiosqliteExceptionHandler(BaseAsyncExceptionHandler):
"""Async context manager for handling aiosqlite database exceptions.
Expand Down Expand Up @@ -181,6 +184,17 @@ async def rollback(self) -> None:
msg = f"Failed to rollback transaction: {e}"
raise SQLSpecError(msg) from e

async def set_migration_session_schema(self, schema: str) -> None:
"""Ignore migration default schema for aiosqlite."""
await super().set_migration_session_schema(schema)
logger.debug("%s driver does not support default schemas; ignoring default_schema=%r", "aiosqlite", schema)

async def has_schema(self, schema: str) -> bool:
"""Return True because SQLite has no separate schema namespace."""
await super().has_schema(schema)
logger.debug("%s driver does not support default schemas; accepting default_schema=%r", "aiosqlite", schema)
return True

def with_cursor(self, connection: "AiosqliteConnection") -> "AiosqliteCursor":
"""Create async context manager for AIOSQLite cursor."""
return AiosqliteCursor(connection)
Expand Down
11 changes: 11 additions & 0 deletions sqlspec/adapters/asyncmy/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@ async def rollback(self) -> None:
msg = f"Failed to rollback MySQL transaction: {e}"
raise SQLSpecError(msg) from e

async def set_migration_session_schema(self, schema: str) -> None:
"""Ignore migration default schema for asyncmy."""
await super().set_migration_session_schema(schema)
logger.debug("%s driver does not support default schemas; ignoring default_schema=%r", "asyncmy", schema)

async def has_schema(self, schema: str) -> bool:
"""Return True because asyncmy does not manage migration default schemas."""
await super().has_schema(schema)
logger.debug("%s driver does not support default schemas; accepting default_schema=%r", "asyncmy", schema)
return True

def with_cursor(self, connection: "AsyncmyConnection") -> "AsyncmyCursor":
"""Create cursor context manager for the connection.

Expand Down
1 change: 1 addition & 0 deletions sqlspec/adapters/asyncpg/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ class AsyncpgConfig(AsyncDatabaseConfig[AsyncpgConnection, "Pool[Record]", Async
driver_type: "ClassVar[type[AsyncpgDriver]]" = AsyncpgDriver
connection_type: "ClassVar[type[AsyncpgConnection]]" = type(AsyncpgConnection) # type: ignore[assignment]
supports_transactional_ddl: "ClassVar[bool]" = True
supports_migration_schemas: "ClassVar[bool]" = True
supports_native_arrow_export: "ClassVar[bool]" = True
supports_native_arrow_import: "ClassVar[bool]" = True
supports_native_parquet_export: "ClassVar[bool]" = True
Expand Down
12 changes: 12 additions & 0 deletions sqlspec/adapters/asyncpg/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
describe_stack_statement,
)
from sqlspec.exceptions import SQLSpecError, StackExecutionError
from sqlspec.migrations.utils import quote_migration_identifier
from sqlspec.utils.logging import get_logger
from sqlspec.utils.type_guards import has_sqlstate

Expand Down Expand Up @@ -228,6 +229,17 @@ async def rollback(self) -> None:
msg = f"Failed to rollback async transaction: {e}"
raise SQLSpecError(msg) from e

async def set_migration_session_schema(self, schema: str) -> None:
"""Set the PostgreSQL search path for migration SQL."""
quoted_schema = quote_migration_identifier(schema)
await self.connection.execute(f'SET LOCAL search_path TO {quoted_schema}, "$user", public')

async def has_schema(self, schema: str) -> bool:
"""Return whether a PostgreSQL schema exists."""
return bool(
await self.connection.fetchval("SELECT 1 FROM information_schema.schemata WHERE schema_name = $1", schema)
)

def with_cursor(self, connection: "AsyncpgConnection") -> "AsyncpgCursor":
"""Create context manager for AsyncPG cursor."""
return AsyncpgCursor(connection)
Expand Down
1 change: 1 addition & 0 deletions sqlspec/adapters/bigquery/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ class BigQueryConfig(NoPoolSyncConfig[BigQueryConnection, BigQueryDriver]):
driver_type: ClassVar[type[BigQueryDriver]] = BigQueryDriver
connection_type: "ClassVar[type[BigQueryConnection]]" = BigQueryConnection
supports_transactional_ddl: ClassVar[bool] = False
supports_migration_schemas: ClassVar[bool] = True
supports_native_parquet_import: ClassVar[bool] = True
supports_native_arrow_export: ClassVar[bool] = True
supports_native_parquet_export: ClassVar[bool] = True
Expand Down
37 changes: 37 additions & 0 deletions sqlspec/adapters/bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from google.cloud.bigquery import LoadJobConfig, QueryJob, QueryJobConfig
from google.cloud.exceptions import GoogleCloudError
from sqlglot import exp
from sqlglot.generator import _DISPATCH_CACHE # pyright: ignore[reportPrivateUsage]
from sqlglot.generators.bigquery import BigQueryGenerator

from sqlspec.core import (
DriverParameterProfile,
Expand Down Expand Up @@ -76,6 +78,41 @@
COLUMN_CACHE_MAX_SIZE = 256


if not getattr(BigQueryGenerator, "_sqlspec_constraint_rendering", False):
_original_primary_key_column_constraint_sql = BigQueryGenerator.primarykeycolumnconstraint_sql
_original_column_def_sql = BigQueryGenerator.columndef_sql

def _primary_key_column_constraint_sql(self: BigQueryGenerator, expression: exp.PrimaryKeyColumnConstraint) -> str:
rendered = _original_primary_key_column_constraint_sql(self, expression)
if "NOT ENFORCED" not in rendered.upper():
return f"{rendered} NOT ENFORCED"
return rendered

def _column_constraint_sort_key(item: tuple[int, exp.ColumnConstraint]) -> tuple[int, int]:
index, constraint = item
kind = constraint.args.get("kind")
if isinstance(kind, exp.DefaultColumnConstraint):
return 1, index
if isinstance(kind, exp.NotNullColumnConstraint):
return 2, index
return 0, index

def _column_def_sql(self: BigQueryGenerator, expression: exp.ColumnDef, sep: str = " ") -> str:
constraints = expression.args.get("constraints")
if constraints:
expression = expression.copy()
expression.set(
"constraints",
[constraint for _, constraint in sorted(enumerate(constraints), key=_column_constraint_sort_key)],
)
return _original_column_def_sql(self, expression, sep=sep)

BigQueryGenerator.primarykeycolumnconstraint_sql = _primary_key_column_constraint_sql # type: ignore[assignment, method-assign]
BigQueryGenerator.columndef_sql = _column_def_sql # type: ignore[assignment, method-assign]
BigQueryGenerator._sqlspec_constraint_rendering = True # type: ignore[attr-defined]
_DISPATCH_CACHE.pop(BigQueryGenerator, None)


def _identity(value: Any) -> Any:
return value

Expand Down
Loading
Loading