Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 12 additions & 23 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ dependencies = [
"sqlglot~=30.4.2",
"tenacity",
"time-machine",
"json-stream"
"json-stream",
]
classifiers = [
"Intended Audience :: Developers",
Expand All @@ -42,10 +42,7 @@ classifiers = [
athena = ["PyAthena[Pandas]"]
azuresql = ["pymssql"]
azuresql-odbc = ["pyodbc>=5.0.0"]
bigquery = [
"google-cloud-bigquery[pandas]",
"google-cloud-bigquery-storage"
]
bigquery = ["google-cloud-bigquery[pandas]", "google-cloud-bigquery-storage"]
# bigframes has to be separate to support environments with an older google-cloud-bigquery pin
# this is because that pin pulls in an older bigframes and the bigframes team
# pinned an older SQLGlot which is incompatible with SQLMesh
Expand All @@ -71,6 +68,7 @@ dev = [
"dbt-redshift",
"dbt-trino",
"Faker",
"feldera",
"google-auth",
"google-cloud-bigquery",
"google-cloud-bigquery-storage",
Expand Down Expand Up @@ -109,6 +107,7 @@ dbt = ["dbt-core<2"]
dlt = ["dlt"]
duckdb = []
fabric = ["pyodbc>=5.0.0"]
feldera = ["feldera"]
gcppostgres = ["cloud-sql-python-connector[pg8000]>=1.8.0"]
github = ["PyGithub>=2.6.0"]
motherduck = ["duckdb>=1.3.2"]
Expand Down Expand Up @@ -183,11 +182,7 @@ no_implicit_optional = true
disallow_untyped_defs = true

[[tool.mypy.overrides]]
module = [
"examples.*.macros.*",
"tests.*",
"sqlmesh.migrations.*"
]
module = ["examples.*.macros.*", "tests.*", "sqlmesh.migrations.*"]
disallow_untyped_defs = false
# Sometimes it's helpful to use types within an "untyped" function because it allows IDE assistance
# Unfortunately this causes MyPy to print an annoying 'By default the bodies of untyped functions are not checked'
Expand Down Expand Up @@ -225,9 +220,10 @@ module = [
"bs4.*",
"pydantic_core.*",
"dlt.*",
"feldera.*",
"bigframes.*",
"json_stream.*",
"duckdb.*"
"duckdb.*",
]
ignore_missing_imports = true

Expand Down Expand Up @@ -259,6 +255,7 @@ markers = [
"clickhouse_cloud: test for Clickhouse (cloud mode)",
"databricks: test for Databricks",
"duckdb: test for DuckDB",
"feldera: test for Feldera",
"fabric: test for Fabric",
"motherduck: test for MotherDuck",
"mssql: test for MSSQL",
Expand All @@ -274,36 +271,28 @@ markers = [

# Other
"set_default_connection",
"registry_isolation"
"registry_isolation",
]
addopts = "-n 0 --dist=loadgroup"
asyncio_default_fixture_loop_scope = "session"
log_cli = false # Set this to true to enable logging during tests
log_cli_format = "%(asctime)s.%(msecs)03d %(filename)s:%(lineno)d %(levelname)s %(message)s"
log_cli_level = "INFO"
filterwarnings = [
"ignore:The localize method is no longer necessary, as this time zone supports the fold attribute"
"ignore:The localize method is no longer necessary, as this time zone supports the fold attribute",
]
reruns_delay = 10

[tool.ruff]
line-length = 100

[tool.ruff.lint]
select = [
"F401",
"RET505",
"T100",
]
select = ["F401", "RET505", "T100"]
extend-select = ["TID"]


[tool.ruff.lint.flake8-tidy-imports]
banned-module-level-imports = [
"duckdb",
"numpy",
"pandas",
]
banned-module-level-imports = ["duckdb", "numpy", "pandas"]

# Bans imports from sqlmesh.lsp in files outside of sqlmesh/lsp
[tool.ruff.lint.flake8-tidy-imports.banned-api]
Expand Down
55 changes: 55 additions & 0 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
# Do not support row-level operations
"spark",
"trino",
"feldera",
# Nullable types are problematic
"clickhouse",
}
Expand Down Expand Up @@ -2342,6 +2343,60 @@ def init(cursor: t.Any) -> None:
return init


class FelderaConnectionConfig(ConnectionConfig):
"""Feldera connection configuration.

Args:
host: The Feldera API base URL.
api_key: The optional Feldera API key.
pipeline_name: The name of the backing Feldera pipeline.
compilation_profile: The Feldera compilation profile to use during deploys.
workers: The number of workers in the Feldera runtime config.
timeout: The timeout, in seconds, for Feldera API operations.
"""

host: str = "http://localhost:8080"
api_key: t.Optional[str] = None
pipeline_name: str
compilation_profile: str = "dev"
workers: int = 4
timeout: int = 300

type_: t.Literal["feldera"] = Field(alias="type", default="feldera")
DIALECT: t.ClassVar[t.Literal["feldera"]] = "feldera"
DISPLAY_NAME: t.ClassVar[t.Literal["Feldera"]] = "Feldera"
DISPLAY_ORDER: t.ClassVar[t.Literal[18]] = 18

concurrent_tasks: int = 1
register_comments: t.Literal[False] = False
pre_ping: t.Literal[False] = False

_engine_import_validator = _get_engine_import_validator("feldera", "feldera")

@property
def _connection_kwargs_keys(self) -> t.Set[str]:
return {
"host",
"api_key",
"pipeline_name",
"workers",
"compilation_profile",
"timeout",
}

@property
def _engine_adapter(self) -> t.Type[EngineAdapter]:
from sqlmesh.core.engine_adapter.feldera import FelderaEngineAdapter

return FelderaEngineAdapter

@property
def _connection_factory(self) -> t.Callable:
from sqlmesh.engines.feldera.db_api import connect

return connect


_CONNECTION_CONFIG_EXCLUDE: t.Set[t.Type[ConnectionConfig]] = {
ConnectionConfig, # type: ignore[type-abstract]
BaseDuckDBConnectionConfig, # type: ignore[type-abstract]
Expand Down
2 changes: 2 additions & 0 deletions sqlmesh/core/engine_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from sqlmesh.core.engine_adapter.athena import AthenaEngineAdapter
from sqlmesh.core.engine_adapter.risingwave import RisingwaveEngineAdapter
from sqlmesh.core.engine_adapter.fabric import FabricEngineAdapter
from sqlmesh.core.engine_adapter.feldera import FelderaEngineAdapter

DIALECT_TO_ENGINE_ADAPTER = {
"hive": SparkEngineAdapter,
Expand All @@ -37,6 +38,7 @@
"athena": AthenaEngineAdapter,
"risingwave": RisingwaveEngineAdapter,
"fabric": FabricEngineAdapter,
"feldera": FelderaEngineAdapter,
}

DIALECT_ALIASES = {
Expand Down
Loading