diff --git a/docs/tutorial/sharing_pool.md b/docs/tutorial/sharing_pool.md new file mode 100644 index 0000000..80ed43a --- /dev/null +++ b/docs/tutorial/sharing_pool.md @@ -0,0 +1,59 @@ +--- +title: Sharing a Connection Pool +--- + +By default, each component (broker, result backend, schedule source) creates and manages its own connection pool. If you are integrating taskiq-postgres into an application that already maintains a pool — or if you simply want to reduce the total number of database connections — you can pass a single pool to all three components. + +## How it works + +`PsycopgBroker`, `PsycopgResultBackend`, and `PsycopgScheduleSource` each accept an optional `pool` (or `write_pool`) keyword argument. When a pool is provided: + +- The component sets `_owns_pool = False` and uses the pool as-is. +- `startup()` opens the pool if it is not yet open, but will **not** close it on `shutdown()`. +- Lifecycle management (opening, closing) is your responsibility. + +## Basic example + +```python +import asyncio + +from psycopg import AsyncConnection, AsyncRawCursor +from psycopg_pool import AsyncConnectionPool +from taskiq import TaskiqScheduler + +from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend, PsycopgScheduleSource + +DSN = "postgres://user:password@localhost:5432/mydb" + +async def main() -> None: + # Create one pool shared by all components. + pool = AsyncConnectionPool(conninfo=DSN, open=False) + # A dedicated connection is required by the broker for LISTEN/NOTIFY. + read_conn = await AsyncConnection.connect( + conninfo=DSN, autocommit=True, cursor_factory=AsyncRawCursor + ) + + broker = PsycopgBroker( + write_pool=pool, + read_connection=read_conn, + ).with_result_backend( + PsycopgResultBackend(pool=pool) + ) + + schedule_source = PsycopgScheduleSource(broker=broker, pool=pool) + scheduler = TaskiqScheduler(broker=broker, sources=[schedule_source]) + + await broker.startup() + # ... run your application ... + await broker.shutdown() + + # Close shared resources after all components have shut down. + await read_conn.close() + await pool.close() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +You can see fully working example inside repository in `examples/example_with_shared_pool.py`. diff --git a/examples/example_with_shared_pool.py b/examples/example_with_shared_pool.py new file mode 100644 index 0000000..5b7a381 --- /dev/null +++ b/examples/example_with_shared_pool.py @@ -0,0 +1,70 @@ +""" +How to run: + + 1) Run worker in one terminal: + uv run taskiq worker examples.example_with_shared_pool:get_broker --workers 1 + + 2) Run this script in another terminal: + uv run python -m examples.example_with_shared_pool +""" + +import asyncio + +from psycopg import AsyncConnection, AsyncRawCursor +from psycopg_pool import AsyncConnectionPool +from taskiq import async_shared_broker + +from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend + + +DSN = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" + + +def create_pool() -> AsyncConnectionPool: + return AsyncConnectionPool(conninfo=DSN, open=False, timeout=5) + + +async def create_connection() -> AsyncConnection: + return await AsyncConnection.connect( + conninfo=DSN, + autocommit=True, + cursor_factory=AsyncRawCursor, + ) + + +def make_broker(pool: AsyncConnectionPool, connection: AsyncConnection) -> PsycopgBroker: + broker = PsycopgBroker( + write_pool=pool, + read_connection=connection, + ).with_result_backend(PsycopgResultBackend(pool=pool)) + async_shared_broker.default_broker(broker) + return broker + + +@async_shared_broker.task("solve_all_problems") +async def best_task_ever() -> None: + """Solve all problems in the world.""" + await asyncio.sleep(2) + print("All problems are solved!") + + +def get_broker() -> PsycopgBroker: + """Sync factory used by the taskiq worker CLI.""" + pool = create_pool() + connection = asyncio.run(create_connection()) + return make_broker(pool, connection) + + +async def main() -> None: + pool = create_pool() + connection = await create_connection() + broker = make_broker(pool, connection) + + await broker.startup() + task = await best_task_ever.kiq() + print(await task.wait_result()) + await broker.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/mkdocs.yml b/mkdocs.yml index 1d3c8a5..1b61af2 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -11,6 +11,7 @@ nav: - Tutorial: - tutorial/result_backend.md - tutorial/schedule_source.md + - tutorial/sharing_pool.md - tutorial/common_issues.md - API: - reference.md diff --git a/pyproject.toml b/pyproject.toml index cae898b..78646c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,6 @@ classifiers = [ "Intended Audience :: Developers", "Intended Audience :: Information Technology", "Framework :: AsyncIO", - "License :: OSI Approved :: MIT License", "Programming Language :: Python", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3 :: Only", @@ -61,6 +60,7 @@ dev = [ {include-group = "test"}, {include-group = "docs"}, "prek>=0.2.19", + "ty>=0.0.34", ] lint = [ "ruff>=0.14.8", @@ -73,11 +73,11 @@ lint = [ ] test = [ "polyfactory>=3.1.0", - "pytest>=9.0.1", + "pytest>=9.0.2", "pytest-asyncio>=1.3.0", "pytest-cov>=7.0.0", # for database in tests - "sqlalchemy-utils>=0.42.0", + "sqlalchemy-utils>=0.42.1", # for faster asyncio loop in tests "uvloop>=0.22.1", ] @@ -87,7 +87,7 @@ docs = [ ] [build-system] -requires = ["uv_build>=0.9,<0.10"] +requires = ["uv_build>=0.11,<0.12"] build-backend = "uv_build" [tool.uv.build-backend] @@ -145,6 +145,9 @@ ignore = [ "D203", # with D211 "D212", # with D213 "COM812", # with formatter + + "EM101", + "TRY003", ] [tool.ruff.lint.per-file-ignores] @@ -183,6 +186,9 @@ convention = "google" known-local-folder = ["taskiq_pg"] lines-after-imports = 2 +[tool.ruff.lint.pylint] +max-args = 9 + [tool.mypy] python_version = "3.10" modules = "taskiq_pg" diff --git a/src/taskiq_pg/_internal/broker.py b/src/taskiq_pg/_internal/broker.py index ed1afab..4eb6e1b 100644 --- a/src/taskiq_pg/_internal/broker.py +++ b/src/taskiq_pg/_internal/broker.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import abc import typing as tp @@ -16,7 +14,7 @@ class BasePostgresBroker(AsyncBroker, abc.ABC): """Base class for Postgres brokers.""" - def __init__( # noqa: PLR0913 + def __init__( self, dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres", result_backend: AsyncResultBackend[_T] | None = None, @@ -39,7 +37,6 @@ def __init__( # noqa: PLR0913 max_retry_attempts: Maximum number of message processing attempts. read_kwargs: Additional arguments for read connection creation. write_kwargs: Additional arguments for write pool creation. - """ super().__init__( result_backend=result_backend, diff --git a/src/taskiq_pg/_internal/result_backend.py b/src/taskiq_pg/_internal/result_backend.py index f2cdf14..ea3a5e0 100644 --- a/src/taskiq_pg/_internal/result_backend.py +++ b/src/taskiq_pg/_internal/result_backend.py @@ -31,7 +31,6 @@ def __init__( field_for_task_id: type of the field to store task_id. serializer: serializer class to serialize/deserialize result from task. connect_kwargs: additional arguments for creating connection pool. - """ self._dsn: tp.Final = dsn self.keep_results: tp.Final = keep_results diff --git a/src/taskiq_pg/_internal/schedule_source.py b/src/taskiq_pg/_internal/schedule_source.py index fa1f887..4c438b6 100644 --- a/src/taskiq_pg/_internal/schedule_source.py +++ b/src/taskiq_pg/_internal/schedule_source.py @@ -1,18 +1,13 @@ -from __future__ import annotations - import typing as tp import uuid from logging import getLogger from pydantic import ValidationError from taskiq import ScheduleSource +from taskiq.abc.broker import AsyncBroker from taskiq.scheduler.scheduled_task import ScheduledTask -if tp.TYPE_CHECKING: - from taskiq.abc.broker import AsyncBroker - - logger = getLogger("taskiq_pg") @@ -37,7 +32,6 @@ def __init__( broker: The TaskIQ broker instance to use for finding and managing tasks. Required if startup_schedule is provided. **connect_kwargs: Additional keyword arguments passed to the database connection pool. - """ self._broker: tp.Final = broker self._dsn: tp.Final = dsn diff --git a/src/taskiq_pg/aiopg/result_backend.py b/src/taskiq_pg/aiopg/result_backend.py index be92413..28a79ce 100644 --- a/src/taskiq_pg/aiopg/result_backend.py +++ b/src/taskiq_pg/aiopg/result_backend.py @@ -2,6 +2,7 @@ from aiopg import Pool, create_pool from taskiq import TaskiqResult +from taskiq.abc.serializer import TaskiqSerializer from taskiq.depends.progress_tracker import TaskProgress from taskiq_pg import exceptions @@ -13,19 +14,82 @@ class AiopgResultBackend(BasePostgresResultBackend): """Result backend for TaskIQ based on Aiopg.""" _database_pool: Pool + _owns_pool: bool + + @tp.overload + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = ..., + keep_results: bool = ..., + table_name: str = ..., + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = ..., + serializer: TaskiqSerializer | None = ..., + *, + pool: None = ..., + **connect_kwargs: tp.Any, + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = ..., + keep_results: bool = ..., + table_name: str = ..., + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = ..., + serializer: TaskiqSerializer | None = ..., + *, + pool: Pool, + ) -> None: ... + + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres", + keep_results: bool = True, + table_name: str = "taskiq_results", + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar", + serializer: TaskiqSerializer | None = None, + *, + pool: Pool | None = None, + **connect_kwargs: tp.Any, + ) -> None: + """ + Construct a new AiopgResultBackend. + + Args: + dsn: PostgreSQL connection string or callable. Can be None if pool is provided. + keep_results: Whether to keep results after reading. + table_name: Table to store results in. + field_for_task_id: Column type for task_id. + serializer: Serializer for task results. + pool: An existing connection pool to reuse. + **connect_kwargs: Extra kwargs for connection pool creation. + """ + self._owns_pool = True + if pool is not None: + self._owns_pool = False + self._database_pool = pool + + super().__init__( + dsn=dsn, + keep_results=keep_results, + table_name=table_name, + field_for_task_id=field_for_task_id, + serializer=serializer, + **connect_kwargs, + ) async def startup(self) -> None: """ Initialize the result backend. - Construct new connection pool - and create new table for results if not exists. + Construct new connection pool (if not provided externally) and create new table for results if not exists. """ try: - self._database_pool = await create_pool( - self.dsn, - **self.connect_kwargs, - ) + if self._owns_pool: + self._database_pool = await create_pool( + self.dsn, + **self.connect_kwargs, + ) async with self._database_pool.acquire() as connection, connection.cursor() as cursor: await cursor.execute( @@ -50,8 +114,8 @@ async def startup(self) -> None: raise exceptions.DatabaseConnectionError(str(error)) from error async def shutdown(self) -> None: - """Close the connection pool.""" - if getattr(self, "_database_pool", None) is not None: + """Close the connection pool if created by this backend.""" + if self._owns_pool and getattr(self, "_database_pool", None) is not None: self._database_pool.close() async def set_result( diff --git a/src/taskiq_pg/aiopg/schedule_source.py b/src/taskiq_pg/aiopg/schedule_source.py index ad757f5..2fc6434 100644 --- a/src/taskiq_pg/aiopg/schedule_source.py +++ b/src/taskiq_pg/aiopg/schedule_source.py @@ -1,7 +1,8 @@ +import typing as tp from logging import getLogger from aiopg import Pool, create_pool -from taskiq import ScheduledTask +from taskiq import AsyncBroker, ScheduledTask from taskiq_pg import exceptions from taskiq_pg._internal import BasePostgresScheduleSource @@ -21,6 +22,59 @@ class AiopgScheduleSource(BasePostgresScheduleSource): """Schedule source that uses aiopg to store schedules in PostgreSQL.""" _database_pool: Pool + _owns_pool: bool + + @tp.overload + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = ..., + table_name: str = ..., + *, + pool: None = ..., + **connect_kwargs: tp.Any, + ) -> None: ... + + @tp.overload + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = ..., + table_name: str = ..., + *, + pool: Pool, + ) -> None: ... + + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres", + table_name: str = "taskiq_schedules", + *, + pool: Pool | None = None, + **connect_kwargs: tp.Any, + ) -> None: + """ + Construct a new AiopgScheduleSource. + + Args: + broker: The TaskIQ broker instance. + dsn: PostgreSQL connection string or callable. Ignored in pool mode. + table_name: Table to store schedules in. + pool: An existing connection pool to reuse. + **connect_kwargs: Extra kwargs for connection pool creation. + """ + self._owns_pool = True + if pool is not None: + self._owns_pool = False + self._database_pool = pool + + super().__init__( + broker=broker, + dsn=dsn, + table_name=table_name, + **connect_kwargs, + ) async def _update_schedules_on_startup(self, schedules: list[ScheduledTask]) -> None: """Update schedules in the database on startup: truncate table and insert new ones.""" @@ -42,14 +96,15 @@ async def startup(self) -> None: """ Initialize the schedule source. - Construct new connection pool, create new table for schedules if not exists + Construct new connection pool (if not provided externally), create new table for schedules if not exists and fill table with schedules from task labels. """ try: - self._database_pool = await create_pool( - dsn=self.dsn, - **self._connect_kwargs, - ) + if self._owns_pool: + self._database_pool = await create_pool( + dsn=self.dsn, + **self._connect_kwargs, + ) async with self._database_pool.acquire() as connection, connection.cursor() as cursor: await cursor.execute(CREATE_SCHEDULES_TABLE_QUERY.format(self._table_name)) scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker() @@ -58,11 +113,11 @@ async def startup(self) -> None: raise exceptions.DatabaseConnectionError(str(error)) from error async def shutdown(self) -> None: - """Close the connection pool.""" - if getattr(self, "_database_pool", None) is not None: + """Close the connection pool if it was created by this schedule source.""" + if self._owns_pool and getattr(self, "_database_pool", None) is not None: self._database_pool.close() - async def get_schedules(self) -> list["ScheduledTask"]: + async def get_schedules(self) -> list[ScheduledTask]: """Fetch schedules from the database.""" async with self._database_pool.acquire() as connection, connection.cursor() as cursor: await cursor.execute( @@ -87,7 +142,7 @@ async def get_schedules(self) -> list["ScheduledTask"]: ) return schedules - async def add_schedule(self, schedule: "ScheduledTask") -> None: + async def add_schedule(self, schedule: ScheduledTask) -> None: """ Add a new schedule. diff --git a/src/taskiq_pg/asyncpg/broker.py b/src/taskiq_pg/asyncpg/broker.py index a330ff6..3d77e9d 100644 --- a/src/taskiq_pg/asyncpg/broker.py +++ b/src/taskiq_pg/asyncpg/broker.py @@ -1,12 +1,11 @@ -from __future__ import annotations - import asyncio import json import logging import typing as tp +from collections.abc import AsyncGenerator, Callable import asyncpg -from taskiq import AckableMessage, BrokerMessage +from taskiq import AckableMessage, AsyncResultBackend, BrokerMessage from taskiq_pg._internal.broker import BasePostgresBroker from taskiq_pg.asyncpg.queries import ( @@ -17,29 +16,147 @@ ) -if tp.TYPE_CHECKING: - from collections.abc import AsyncGenerator - - logger = logging.getLogger("taskiq.asyncpg_broker") +_T = tp.TypeVar("_T") + class AsyncpgBroker(BasePostgresBroker): """Broker that uses asyncpg as driver and PostgreSQL with LISTEN/NOTIFY mechanism.""" - _read_conn: asyncpg.Connection[asyncpg.Record] | None = None - _write_pool: asyncpg.pool.Pool[asyncpg.Record] | None = None + _read_conn: asyncpg.Connection | None = None + _write_pool: asyncpg.pool.Pool | None = None + _owns_write_pool: bool + _owns_read_conn: bool + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: None = ..., + read_connection: None = ..., + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: asyncpg.pool.Pool, + read_connection: None = ..., + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: None = ..., + read_connection: asyncpg.Connection, + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: asyncpg.pool.Pool, + read_connection: asyncpg.Connection, + ) -> None: ... + + def __init__( # noqa: PLR0913 + self, + dsn: str | Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres", + result_backend: AsyncResultBackend[_T] | None = None, + task_id_generator: Callable[[], str] | None = None, + channel_name: str = "taskiq", + table_name: str = "taskiq_messages", + max_retry_attempts: int = 5, + read_kwargs: dict[str, tp.Any] | None = None, + write_kwargs: dict[str, tp.Any] | None = None, + *, + write_pool: asyncpg.pool.Pool | None = None, + read_connection: asyncpg.Connection | None = None, + ) -> None: + """ + Construct a new AsyncpgBroker. + + Args: + dsn: PostgreSQL connection string or a callable returning one. + result_backend: Custom result backend. + task_id_generator: Custom task_id generator. + channel_name: Name of the LISTEN/NOTIFY channel. + table_name: Name of the table used to store messages. + max_retry_attempts: Maximum number of message processing attempts. + read_kwargs: Extra kwargs forwarded to `asyncpg.connect()`. + write_kwargs: Extra kwargs forwarded to `asyncpg.create_pool()`. + write_pool: An existing connection pool to reuse for writes. + read_connection: An existing connection to reuse for LISTEN. + """ + super().__init__( + dsn=dsn, + result_backend=result_backend, + task_id_generator=task_id_generator, + channel_name=channel_name, + table_name=table_name, + max_retry_attempts=max_retry_attempts, + read_kwargs=read_kwargs, + write_kwargs=write_kwargs, + ) + + self._owns_write_pool = True + if write_pool is not None: + self._write_pool = write_pool + self._owns_write_pool = False + + self._owns_read_conn = True + if read_connection is not None: + self._read_conn = read_connection + self._owns_read_conn = False async def startup(self) -> None: """Initialize the broker.""" await super().startup() - self._read_conn = await asyncpg.connect(self.dsn, **self.read_kwargs) - self._write_pool = await asyncpg.create_pool(self.dsn, **self.write_kwargs) + if not self._read_conn: + self._read_conn = await asyncpg.connect(self.dsn, **self.read_kwargs) + + if not self._write_pool: + self._write_pool = await asyncpg.create_pool(self.dsn, **self.write_kwargs) if self._read_conn is None: - msg = "_read_conn not initialized" - raise RuntimeError(msg) + raise RuntimeError("Read connection is not initialized") + if self._write_pool is None: + raise RuntimeError("Write pool is not initialized") async with self._write_pool.acquire() as conn: await conn.execute(CREATE_MESSAGE_TABLE_QUERY.format(self.table_name)) @@ -52,13 +169,14 @@ async def shutdown(self) -> None: await super().shutdown() if self._read_conn is not None: await self._read_conn.remove_listener(self.channel_name, self._notification_handler) - await self._read_conn.close() - if self._write_pool is not None: + if self._owns_read_conn: + await self._read_conn.close() + if self._write_pool is not None and self._owns_write_pool: await self._write_pool.close() def _notification_handler( self, - con_ref: asyncpg.Connection[asyncpg.Record] | asyncpg.pool.PoolConnectionProxy[asyncpg.Record], # noqa: ARG002 + con_ref: asyncpg.Connection | asyncpg.pool.PoolConnectionProxy, # noqa: ARG002 pid: int, # noqa: ARG002 channel: str, payload: object, diff --git a/src/taskiq_pg/asyncpg/result_backend.py b/src/taskiq_pg/asyncpg/result_backend.py index 008ce14..73573c4 100644 --- a/src/taskiq_pg/asyncpg/result_backend.py +++ b/src/taskiq_pg/asyncpg/result_backend.py @@ -2,6 +2,7 @@ import asyncpg from taskiq import TaskiqResult +from taskiq.abc.serializer import TaskiqSerializer from taskiq.compat import model_dump, model_validate from taskiq.depends.progress_tracker import TaskProgress @@ -13,18 +14,83 @@ class AsyncpgResultBackend(BasePostgresResultBackend): """Result backend for TaskIQ based on asyncpg.""" _database_pool: "asyncpg.Pool[tp.Any]" + _owns_pool: bool + + @tp.overload + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = ..., + keep_results: bool = ..., + table_name: str = ..., + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = ..., + serializer: TaskiqSerializer | None = ..., + *, + pool: None = ..., + **connect_kwargs: tp.Any, + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = ..., + keep_results: bool = ..., + table_name: str = ..., + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = ..., + serializer: TaskiqSerializer | None = ..., + *, + pool: "asyncpg.Pool[tp.Any]", + ) -> None: ... + + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres", + keep_results: bool = True, + table_name: str = "taskiq_results", + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar", + serializer: TaskiqSerializer | None = None, + *, + pool: "asyncpg.Pool[tp.Any] | None" = None, + **connect_kwargs: tp.Any, + ) -> None: + """ + Construct a new AsyncpgResultBackend. + + Args: + dsn: PostgreSQL connection string or callable. Can be None if pool is provided. + keep_results: Whether to keep results after reading. + table_name: Table to store results in. + field_for_task_id: Column type for task_id. + serializer: Serializer for task results. + pool: An existing connection pool to reuse. + **connect_kwargs: Extra kwargs for connection pool creation. + """ + self._owns_pool = True + if pool is not None: + self._owns_pool = False + self._database_pool = pool + + super().__init__( + dsn=dsn, + keep_results=keep_results, + table_name=table_name, + field_for_task_id=field_for_task_id, + serializer=serializer, + **connect_kwargs, + ) async def startup(self) -> None: """ Initialize the result backend. - Construct new connection pool and create new table for results if not exists. + Construct new connection pool (if not provided externally) and create new table + for results if not exists. """ - _database_pool = await asyncpg.create_pool( - dsn=self.dsn, - **self.connect_kwargs, - ) - self._database_pool = _database_pool + if self._owns_pool: + _database_pool = await asyncpg.create_pool( + dsn=self.dsn, + **self.connect_kwargs, + ) + self._database_pool = _database_pool await self._database_pool.execute( queries.CREATE_TABLE_QUERY.format( @@ -45,8 +111,8 @@ async def startup(self) -> None: ) async def shutdown(self) -> None: - """Close the connection pool.""" - if getattr(self, "_database_pool", None) is not None: + """Close the connection pool if it was created by this backend.""" + if self._owns_pool and getattr(self, "_database_pool", None) is not None: await self._database_pool.close() async def set_result( diff --git a/src/taskiq_pg/asyncpg/schedule_source.py b/src/taskiq_pg/asyncpg/schedule_source.py index 2e82a67..52c921f 100644 --- a/src/taskiq_pg/asyncpg/schedule_source.py +++ b/src/taskiq_pg/asyncpg/schedule_source.py @@ -1,8 +1,9 @@ import json +import typing as tp from logging import getLogger import asyncpg -from taskiq import ScheduledTask +from taskiq import AsyncBroker, ScheduledTask from taskiq_pg._internal import BasePostgresScheduleSource from taskiq_pg.asyncpg.queries import ( @@ -21,6 +22,59 @@ class AsyncpgScheduleSource(BasePostgresScheduleSource): """Schedule source that uses asyncpg to store schedules in PostgreSQL.""" _database_pool: "asyncpg.Pool[asyncpg.Record]" + _owns_pool: bool + + @tp.overload + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = ..., + table_name: str = ..., + *, + pool: None = ..., + **connect_kwargs: tp.Any, + ) -> None: ... + + @tp.overload + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = ..., + table_name: str = ..., + *, + pool: "asyncpg.Pool[asyncpg.Record]", + ) -> None: ... + + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres", + table_name: str = "taskiq_schedules", + *, + pool: "asyncpg.Pool[asyncpg.Record] | None" = None, + **connect_kwargs: tp.Any, + ) -> None: + """ + Construct a new AsyncpgScheduleSource. + + Args: + broker: The TaskIQ broker instance. + dsn: PostgreSQL connection string or callable. Ignored in pool mode. + table_name: Table to store schedules in. + pool: An existing connection pool to reuse. + **connect_kwargs: Extra kwargs for connection pool creation. + """ + self._owns_pool = True + if pool is not None: + self._owns_pool = False + self._database_pool = pool + + super().__init__( + broker=broker, + dsn=dsn, + table_name=table_name, + **connect_kwargs, + ) async def _update_schedules_on_startup(self, schedules: list[ScheduledTask]) -> None: """Update schedules in the database on startup: truncate table and insert new ones.""" @@ -40,13 +94,14 @@ async def startup(self) -> None: """ Initialize the schedule source. - Construct new connection pool, create new table for schedules if not exists + Construct new connection pool (if not provided externally), create new table for schedules if not exists and fill table with schedules from task labels. """ - self._database_pool = await asyncpg.create_pool( - dsn=self.dsn, - **self._connect_kwargs, - ) + if self._owns_pool: + self._database_pool = await asyncpg.create_pool( + dsn=self.dsn, + **self._connect_kwargs, + ) await self._database_pool.execute( CREATE_SCHEDULES_TABLE_QUERY.format( self._table_name, @@ -56,8 +111,8 @@ async def startup(self) -> None: await self._update_schedules_on_startup(scheduled_tasks_for_creation) async def shutdown(self) -> None: - """Close the connection pool.""" - if getattr(self, "_database_pool", None) is not None: + """Close the connection pool if it created by this schedule source.""" + if self._owns_pool and getattr(self, "_database_pool", None) is not None: await self._database_pool.close() async def get_schedules(self) -> list["ScheduledTask"]: diff --git a/src/taskiq_pg/psqlpy/broker.py b/src/taskiq_pg/psqlpy/broker.py index 63a4d02..56713ad 100644 --- a/src/taskiq_pg/psqlpy/broker.py +++ b/src/taskiq_pg/psqlpy/broker.py @@ -1,14 +1,14 @@ import asyncio import logging import typing as tp -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Callable from dataclasses import dataclass from datetime import datetime import psqlpy from psqlpy.exceptions import ConnectionExecuteError from psqlpy.extra_types import JSONB -from taskiq import AckableMessage, BrokerMessage +from taskiq import AckableMessage, AsyncResultBackend, BrokerMessage from taskiq_pg._internal.broker import BasePostgresBroker from taskiq_pg.psqlpy.queries import ( @@ -21,6 +21,8 @@ logger = logging.getLogger("taskiq.psqlpy_broker") +_T = tp.TypeVar("_T") + @dataclass class MessageRow: @@ -42,18 +44,138 @@ class PSQLPyBroker(BasePostgresBroker): _write_pool: psqlpy.ConnectionPool _listener: psqlpy.Listener _queue: asyncio.Queue + _owns_write_pool: bool + _owns_read_conn: bool + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: "Callable[[], str] | None" = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: None = ..., + read_connection: None = ..., + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: psqlpy.ConnectionPool, + read_connection: None = ..., + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: None = ..., + read_connection: psqlpy.Connection, + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: psqlpy.ConnectionPool, + read_connection: psqlpy.Connection, + ) -> None: ... + + def __init__( # noqa: PLR0913 + self, + dsn: str | Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres", + result_backend: AsyncResultBackend[_T] | None = None, + task_id_generator: Callable[[], str] | None = None, + channel_name: str = "taskiq", + table_name: str = "taskiq_messages", + max_retry_attempts: int = 5, + read_kwargs: dict[str, tp.Any] | None = None, + write_kwargs: dict[str, tp.Any] | None = None, + *, + write_pool: psqlpy.ConnectionPool | None = None, + read_connection: psqlpy.Connection | None = None, + ) -> None: + """ + Construct a new PSQLPyBroker. + + Args: + dsn: PostgreSQL connection string or callable. + result_backend: Custom result backend. + task_id_generator: Custom task_id generator. + channel_name: Name of the LISTEN/NOTIFY channel. + table_name: Name of the table used to store messages. + max_retry_attempts: Maximum number of message processing attempts. + read_kwargs: Extra kwargs forwarded to `psqlpy.connect()` + write_kwargs: Extra kwargs forwarded to `psqlpy.ConnectionPool()` + write_pool: An existing pool to reuse for writes. + read_connection: An existing connection to reuse for LISTEN. + """ + super().__init__( + dsn=dsn, + result_backend=result_backend, + task_id_generator=task_id_generator, + channel_name=channel_name, + table_name=table_name, + max_retry_attempts=max_retry_attempts, + read_kwargs=read_kwargs, + write_kwargs=write_kwargs, + ) + + self._owns_write_pool = True + if write_pool is not None: + self._write_pool = write_pool + self._owns_write_pool = False + + self._owns_read_conn = True + if read_connection is not None: + self._read_conn = read_connection + self._owns_read_conn = False async def startup(self) -> None: """Initialize the broker.""" await super().startup() - self._read_conn = await psqlpy.connect( - dsn=self.dsn, - **self.read_kwargs, - ) - self._write_pool = psqlpy.ConnectionPool( - dsn=self.dsn, - **self.write_kwargs, - ) + + if self._owns_read_conn: + self._read_conn = await psqlpy.connect( + dsn=self.dsn, + **self.read_kwargs, + ) + + if self._owns_write_pool: + self._write_pool = psqlpy.ConnectionPool( + dsn=self.dsn, + **self.write_kwargs, + ) # create messages table if it doesn't exist async with self._write_pool.acquire() as conn: @@ -70,9 +192,9 @@ async def startup(self) -> None: async def shutdown(self) -> None: """Close all connections on shutdown.""" await super().shutdown() - if self._read_conn is not None: + if self._read_conn is not None and self._owns_read_conn: self._read_conn.close() - if self._write_pool is not None: + if self._write_pool is not None and self._owns_write_pool: self._write_pool.close() if self._listener is not None: self._listener.abort_listen() diff --git a/src/taskiq_pg/psqlpy/result_backend.py b/src/taskiq_pg/psqlpy/result_backend.py index d836aee..ad9ed69 100644 --- a/src/taskiq_pg/psqlpy/result_backend.py +++ b/src/taskiq_pg/psqlpy/result_backend.py @@ -3,6 +3,7 @@ from psqlpy import ConnectionPool from psqlpy.exceptions import BaseConnectionError from taskiq import TaskiqResult +from taskiq.abc.serializer import TaskiqSerializer from taskiq.compat import model_dump, model_validate from taskiq.depends.progress_tracker import TaskProgress @@ -15,18 +16,81 @@ class PSQLPyResultBackend(BasePostgresResultBackend): """Result backend for TaskIQ based on PSQLPy.""" _database_pool: ConnectionPool + _owns_pool: bool + + @tp.overload + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = ..., + keep_results: bool = ..., + table_name: str = ..., + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = ..., + serializer: TaskiqSerializer | None = ..., + *, + pool: None = ..., + **connect_kwargs: tp.Any, + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = ..., + keep_results: bool = ..., + table_name: str = ..., + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = ..., + serializer: TaskiqSerializer | None = ..., + *, + pool: ConnectionPool, + ) -> None: ... + + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres", + keep_results: bool = True, + table_name: str = "taskiq_results", + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar", + serializer: TaskiqSerializer | None = None, + *, + pool: ConnectionPool | None = None, + **connect_kwargs: tp.Any, + ) -> None: + """ + Construct a new PSQLPyResultBackend. + + Args: + dsn: PostgreSQL connection string or callable. Can be None if pool is provided. + keep_results: Whether to keep results after reading. + table_name: Table to store results in. + field_for_task_id: Column type for task_id. + serializer: Serializer for task results. + pool: An existing connection pool to reuse. + **connect_kwargs: Extra kwargs for connection pool creation. + """ + self._owns_pool = True + if pool is not None: + self._owns_pool = False + self._database_pool = pool + + super().__init__( + dsn=dsn, + keep_results=keep_results, + table_name=table_name, + field_for_task_id=field_for_task_id, + serializer=serializer, + **connect_kwargs, + ) async def startup(self) -> None: """ Initialize the result backend. - Construct new connection pool - and create new table for results if not exists. + Construct new connection pool (if not provided externally) and create new table for results if not exists. """ - self._database_pool = ConnectionPool( - dsn=self.dsn, - **self.connect_kwargs, - ) + if self._owns_pool: + self._database_pool = ConnectionPool( + dsn=self.dsn, + **self.connect_kwargs, + ) connection = await self._database_pool.connection() await connection.execute( querystring=queries.CREATE_TABLE_QUERY.format( @@ -47,8 +111,8 @@ async def startup(self) -> None: ) async def shutdown(self) -> None: - """Close the connection pool.""" - if getattr(self, "_database_pool", None) is not None: + """Close the connection pool if it was created by this result backend.""" + if self._owns_pool and getattr(self, "_database_pool", None) is not None: self._database_pool.close() async def set_result( diff --git a/src/taskiq_pg/psqlpy/schedule_source.py b/src/taskiq_pg/psqlpy/schedule_source.py index 8476976..0114c60 100644 --- a/src/taskiq_pg/psqlpy/schedule_source.py +++ b/src/taskiq_pg/psqlpy/schedule_source.py @@ -1,9 +1,10 @@ +import typing as tp import uuid from logging import getLogger from psqlpy import ConnectionPool from psqlpy.extra_types import JSONB -from taskiq import ScheduledTask +from taskiq import AsyncBroker, ScheduledTask from taskiq_pg._internal import BasePostgresScheduleSource from taskiq_pg.psqlpy.queries import ( @@ -22,6 +23,60 @@ class PSQLPyScheduleSource(BasePostgresScheduleSource): """Schedule source that uses psqlpy to store schedules in PostgreSQL.""" _database_pool: ConnectionPool + _owns_pool: bool + + @tp.overload + def __init__( + self, + broker: AsyncBroker, + dsn: "str | tp.Callable[[], str]" = ..., + table_name: str = ..., + *, + pool: None = ..., + **connect_kwargs: tp.Any, + ) -> None: ... + + @tp.overload + def __init__( + self, + broker: AsyncBroker, + dsn: "str | tp.Callable[[], str]" = ..., + table_name: str = ..., + *, + pool: ConnectionPool, + ) -> None: ... + + def __init__( + self, + broker: AsyncBroker, + dsn: "str | tp.Callable[[], str]" = "postgresql://postgres:postgres@localhost:5432/postgres", + table_name: str = "taskiq_schedules", + *, + pool: ConnectionPool | None = None, + **connect_kwargs: tp.Any, + ) -> None: + """ + Construct a new PSQLPyScheduleSource. + + Args: + broker: The TaskIQ broker instance. + dsn: PostgreSQL connection string or callable. Ignored in pool mode. + table_name: Table to store schedules in. + pool: An existing connection pool to reuse. + **connect_kwargs: Extra kwargs for connection pool creation. + """ + if pool is not None: + self._owns_pool = False + self._database_pool = pool + else: + self._owns_pool = True + + super().__init__( + broker=broker, + dsn=dsn, + table_name=table_name, + **connect_kwargs, + ) async def _update_schedules_on_startup(self, schedules: list[ScheduledTask]) -> None: """Update schedules in the database on startup: truncate table and insert new ones.""" @@ -50,13 +105,14 @@ async def startup(self) -> None: """ Initialize the schedule source. - Construct new connection pool, create new table for schedules if not exists - and fill table with schedules from task labels. + Construct new connection pool (if not provided externally), create new table for + schedules if not exists and fill table with schedules from task labels. """ - self._database_pool = ConnectionPool( - dsn=self.dsn, - **self._connect_kwargs, - ) + if self._owns_pool: + self._database_pool = ConnectionPool( + dsn=self.dsn, + **self._connect_kwargs, + ) async with self._database_pool.acquire() as connection: await connection.execute( CREATE_SCHEDULES_TABLE_QUERY.format( @@ -67,8 +123,8 @@ async def startup(self) -> None: await self._update_schedules_on_startup(scheduled_tasks_for_creation) async def shutdown(self) -> None: - """Close the connection pool.""" - if getattr(self, "_database_pool", None) is not None: + """Close the connection pool if was created by this schedule source.""" + if self._owns_pool and getattr(self, "_database_pool", None) is not None: self._database_pool.close() async def get_schedules(self) -> list["ScheduledTask"]: diff --git a/src/taskiq_pg/psycopg/broker.py b/src/taskiq_pg/psycopg/broker.py index ae481f1..cd49f4f 100644 --- a/src/taskiq_pg/psycopg/broker.py +++ b/src/taskiq_pg/psycopg/broker.py @@ -2,13 +2,13 @@ import json import logging import typing as tp -from collections.abc import AsyncGenerator +from collections.abc import AsyncGenerator, Callable from contextlib import suppress import psycopg from psycopg import AsyncConnection, AsyncRawCursor, sql from psycopg_pool import AsyncConnectionPool -from taskiq import AckableMessage, BrokerMessage +from taskiq import AckableMessage, AsyncResultBackend, BrokerMessage from taskiq_pg._internal.broker import BasePostgresBroker from taskiq_pg.psycopg.queries import ( @@ -21,6 +21,8 @@ logger = logging.getLogger("taskiq.psycopg_broker") +_T = tp.TypeVar("_T") + class PsycopgBroker(BasePostgresBroker): """Broker that uses PostgreSQL and psycopg with LISTEN/NOTIFY.""" @@ -28,22 +30,144 @@ class PsycopgBroker(BasePostgresBroker): _read_conn: AsyncConnection _write_pool: AsyncConnectionPool _notifies_iter: tp.AsyncIterator[tp.Any] + _owns_write_pool: bool + _owns_read_conn: bool + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: None = ..., + read_connection: None = ..., + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: AsyncConnectionPool, + read_connection: None = ..., + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: None = ..., + read_connection: AsyncConnection, + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: str | Callable[[], str] = ..., + result_backend: AsyncResultBackend[_T] | None = ..., + task_id_generator: Callable[[], str] | None = ..., + channel_name: str = ..., + table_name: str = ..., + max_retry_attempts: int = ..., + read_kwargs: dict[str, tp.Any] | None = ..., + write_kwargs: dict[str, tp.Any] | None = ..., + *, + write_pool: AsyncConnectionPool, + read_connection: AsyncConnection, + ) -> None: ... + + def __init__( # noqa: PLR0913 + self, + dsn: str | Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres", + result_backend: AsyncResultBackend[_T] | None = None, + task_id_generator: Callable[[], str] | None = None, + channel_name: str = "taskiq", + table_name: str = "taskiq_messages", + max_retry_attempts: int = 5, + read_kwargs: dict[str, tp.Any] | None = None, + write_kwargs: dict[str, tp.Any] | None = None, + *, + write_pool: AsyncConnectionPool | None = None, + read_connection: AsyncConnection | None = None, + ) -> None: + """ + Construct a new PsycopgBroker. + + Args: + dsn: PostgreSQL connection string or callable. + result_backend: Custom result backend. + task_id_generator: Custom task_id generator. + channel_name: Name of the LISTEN/NOTIFY channel. + table_name: Name of the table used to store messages. + max_retry_attempts: Maximum number of message processing attempts. + read_kwargs: Extra kwargs forwarded to `AsyncConnection.connect()`. + write_kwargs: Extra kwargs forwarded to `AsyncConnectionPool()`. + write_pool: An existing connection pool to reuse for writes. + read_connection: An existing connection to reuse for LISTEN. + """ + super().__init__( + dsn=dsn, + result_backend=result_backend, + task_id_generator=task_id_generator, + channel_name=channel_name, + table_name=table_name, + max_retry_attempts=max_retry_attempts, + read_kwargs=read_kwargs, + write_kwargs=write_kwargs, + ) + + self._owns_write_pool = True + if write_pool is not None: + self._write_pool = write_pool + self._owns_write_pool = False + + self._owns_read_conn = True + if read_connection is not None: + self._read_conn = read_connection + self._owns_read_conn = False async def startup(self) -> None: """Initialize the broker.""" await super().startup() - self._read_conn = await AsyncConnection.connect( - conninfo=self.dsn, - **self.read_kwargs, - autocommit=True, - cursor_factory=AsyncRawCursor, - ) - self._write_pool = AsyncConnectionPool( - conninfo=self.dsn if self.dsn is not None else "", - open=False, - **self.write_kwargs, - ) - await self._write_pool.open() + + if self._owns_read_conn: + self._read_conn = await AsyncConnection.connect( + conninfo=self.dsn, + **self.read_kwargs, + autocommit=True, + cursor_factory=AsyncRawCursor, + ) + + if self._owns_write_pool: + self._write_pool = AsyncConnectionPool( + conninfo=self.dsn if self.dsn is not None else "", + open=False, + **self.write_kwargs, + ) + + if self._write_pool.closed: + await self._write_pool.open() async with self._write_pool.connection() as connection, connection.cursor() as cursor: await cursor.execute(sql.SQL(CREATE_MESSAGE_TABLE_QUERY).format(sql.Identifier(self.table_name))) @@ -57,10 +181,10 @@ async def shutdown(self) -> None: if self._notifies_iter is not None: with suppress(RuntimeError): # RuntimeError: aclose(): asynchronous generator is already running await self._notifies_iter.aclose() # type: ignore[attr-defined] - if self._read_conn is not None: - await self._read_conn.notifies().aclose() - await self._read_conn.close() - if self._write_pool is not None: + if self._read_conn is not None and self._owns_read_conn: + await self._read_conn.notifies().aclose() + await self._read_conn.close() + if self._write_pool is not None and self._owns_write_pool: await self._write_pool.close() async def kick(self, message: BrokerMessage) -> None: diff --git a/src/taskiq_pg/psycopg/result_backend.py b/src/taskiq_pg/psycopg/result_backend.py index 940f668..5aa391b 100644 --- a/src/taskiq_pg/psycopg/result_backend.py +++ b/src/taskiq_pg/psycopg/result_backend.py @@ -3,6 +3,7 @@ from psycopg import sql from psycopg_pool import AsyncConnectionPool from taskiq import TaskiqResult +from taskiq.abc.serializer import TaskiqSerializer from taskiq.compat import model_dump, model_validate from taskiq.depends.progress_tracker import TaskProgress @@ -15,20 +16,87 @@ class PsycopgResultBackend(BasePostgresResultBackend): """Result backend for TaskIQ based on psycopg.""" _database_pool: AsyncConnectionPool + _owns_pool: bool + + @tp.overload + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = ..., + keep_results: bool = ..., + table_name: str = ..., + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = ..., + serializer: TaskiqSerializer | None = ..., + *, + pool: None = ..., + **connect_kwargs: tp.Any, + ) -> None: ... + + @tp.overload + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = ..., + keep_results: bool = ..., + table_name: str = ..., + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = ..., + serializer: TaskiqSerializer | None = ..., + *, + pool: AsyncConnectionPool, + ) -> None: ... + + def __init__( + self, + dsn: tp.Callable[[], str] | str | None = "postgres://postgres:postgres@localhost:5432/postgres", + keep_results: bool = True, + table_name: str = "taskiq_results", + field_for_task_id: tp.Literal["VarChar", "Text", "Uuid"] = "VarChar", + serializer: TaskiqSerializer | None = None, + *, + pool: AsyncConnectionPool | None = None, + **connect_kwargs: tp.Any, + ) -> None: + """ + Construct a new PsycopgResultBackend. + + Args: + dsn: PostgreSQL connection string or callable. Can be None if pool is provided. + keep_results: Whether to keep results after reading. + table_name: Table to store results in. + field_for_task_id: Column type for task_id. + serializer: Serializer for task results. + pool: An existing connection pool to reuse. + **connect_kwargs: Extra kwargs for connection pool creation. + """ + self._owns_pool = True + if pool is not None: + self._owns_pool = False + self._database_pool = pool + + super().__init__( + dsn=dsn, + keep_results=keep_results, + table_name=table_name, + field_for_task_id=field_for_task_id, + serializer=serializer, + **connect_kwargs, + ) async def startup(self) -> None: """ Initialize the result backend. - Construct new connection pool - and create new table for results if not exists. + Construct new connection pool (if not provided externally) and create new table + for results if not exists. """ - self._database_pool = AsyncConnectionPool( - conninfo=self.dsn if self.dsn is not None else "", - open=False, - **self.connect_kwargs, - ) - await self._database_pool.open() + if self._owns_pool: + self._database_pool = AsyncConnectionPool( + conninfo=self.dsn if self.dsn is not None else "", + open=False, + **self.connect_kwargs, + ) + + if self._database_pool.closed: + await self._database_pool.open() + async with self._database_pool.connection() as connection, connection.cursor() as cursor: await cursor.execute( query=sql.SQL(queries.CREATE_TABLE_QUERY).format( @@ -49,8 +117,8 @@ async def startup(self) -> None: ) async def shutdown(self) -> None: - """Close the connection pool.""" - if getattr(self, "_database_pool", None) is not None: + """Close the connection pool (only if owned by this backend).""" + if self._owns_pool and getattr(self, "_database_pool", None) is not None: await self._database_pool.close() async def set_result( diff --git a/src/taskiq_pg/psycopg/schedule_source.py b/src/taskiq_pg/psycopg/schedule_source.py index e773c7b..418f021 100644 --- a/src/taskiq_pg/psycopg/schedule_source.py +++ b/src/taskiq_pg/psycopg/schedule_source.py @@ -1,9 +1,10 @@ +import typing as tp import uuid from logging import getLogger from psycopg import sql from psycopg_pool import AsyncConnectionPool -from taskiq import ScheduledTask +from taskiq import AsyncBroker, ScheduledTask from taskiq_pg._internal import BasePostgresScheduleSource from taskiq_pg.psycopg.queries import ( @@ -21,7 +22,61 @@ class PsycopgScheduleSource(BasePostgresScheduleSource): """Schedule source that uses psycopg to store schedules in PostgreSQL.""" - _database_pool: AsyncConnectionPool + _database_pool: "AsyncConnectionPool" + _owns_pool: bool + + @tp.overload + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = ..., + table_name: str = ..., + *, + pool: None = ..., + **connect_kwargs: tp.Any, + ) -> None: ... + + @tp.overload + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = ..., + table_name: str = ..., + *, + pool: AsyncConnectionPool, + ) -> None: ... + + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres", + table_name: str = "taskiq_schedules", + *, + pool: AsyncConnectionPool | None = None, + **connect_kwargs: tp.Any, + ) -> None: + """ + Construct a new PsycopgScheduleSource. + + Args: + broker: The TaskIQ broker instance. + dsn: PostgreSQL connection string or callable. Ignored in pool mode. + table_name: Table to store schedules in. + pool: An existing connection pool to reuse. + **connect_kwargs: Extra kwargs for connection pool creation. + + """ + self._owns_pool = True + if pool is not None: + self._owns_pool = False + self._database_pool = pool + + super().__init__( + broker=broker, + dsn=dsn, + table_name=table_name, + **connect_kwargs, + ) async def _update_schedules_on_startup(self, schedules: list[ScheduledTask]) -> None: """Update schedules in the database on startup: truncate table and insert new ones.""" @@ -46,15 +101,18 @@ async def startup(self) -> None: """ Initialize the schedule source. - Construct new connection pool, create new table for schedules if not exists - and fill table with schedules from task labels. + Construct new connection pool (if not provided externally), create new table for + schedules if not exists and fill table with schedules from task labels. """ - self._database_pool = AsyncConnectionPool( - conninfo=self.dsn if self.dsn is not None else "", - open=False, - **self._connect_kwargs, - ) - await self._database_pool.open() + if self._owns_pool: + self._database_pool = AsyncConnectionPool( + conninfo=self.dsn if self.dsn is not None else "", + open=False, + **self._connect_kwargs, + ) + + if self._database_pool.closed: + await self._database_pool.open() async with self._database_pool.connection() as connection, connection.cursor() as cursor: await cursor.execute( @@ -64,8 +122,8 @@ async def startup(self) -> None: await self._update_schedules_on_startup(scheduled_tasks_for_creation) async def shutdown(self) -> None: - """Close the connection pool.""" - if getattr(self, "_database_pool", None) is not None: + """Close the connection pool (only if owned by this source).""" + if self._owns_pool and getattr(self, "_database_pool", None) is not None: await self._database_pool.close() async def get_schedules(self) -> list["ScheduledTask"]: @@ -110,7 +168,7 @@ async def add_schedule(self, schedule: "ScheduledTask") -> None: schedule.model_dump_json( exclude={"schedule_id", "task_name"}, ), - ] + ], ) async def delete_schedule(self, schedule_id: str) -> None: diff --git a/tests/integration/test_shared_pool.py b/tests/integration/test_shared_pool.py new file mode 100644 index 0000000..696354b --- /dev/null +++ b/tests/integration/test_shared_pool.py @@ -0,0 +1,216 @@ +import uuid + +import asyncpg +import psqlpy +import pytest +from psycopg import AsyncConnection +from psycopg_pool import AsyncConnectionPool + +from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend, AsyncpgScheduleSource +from taskiq_pg.psqlpy import PSQLPyBroker, PSQLPyResultBackend, PSQLPyScheduleSource +from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend, PsycopgScheduleSource + + +@pytest.mark.integration +async def test_asyncpg_shared_pool_not_closed_after_shutdown(pg_dsn: str) -> None: + """Pool stays usable after broker, result_backend and schedule_source shut down.""" + pool: asyncpg.Pool = await asyncpg.create_pool(dsn=pg_dsn) + + broker = AsyncpgBroker(write_pool=pool, read_connection=await asyncpg.connect(dsn=pg_dsn)) + result_backend = AsyncpgResultBackend( + dsn=pg_dsn, + table_name=f"taskiq_results_{uuid.uuid4().hex}", + pool=pool, + ) + schedule_source = AsyncpgScheduleSource( + broker=broker, + dsn=pg_dsn, + table_name=f"taskiq_schedules_{uuid.uuid4().hex}", + pool=pool, + ) + + await broker.startup() + await result_backend.startup() + await schedule_source.startup() + + await schedule_source.shutdown() + await result_backend.shutdown() + await broker.shutdown() + + # Pool must still be alive — execute a simple query on it + result = await pool.fetchval("SELECT 1") + assert result == 1 + + await pool.close() + + +@pytest.mark.integration +async def test_asyncpg_broker_uses_provided_write_pool(pg_dsn: str) -> None: + """Broker created with only write_pool (no read_connection) creates its own read conn.""" + pool: asyncpg.Pool = await asyncpg.create_pool(dsn=pg_dsn) + + broker = AsyncpgBroker(dsn=pg_dsn, write_pool=pool) + await broker.startup() + await broker.shutdown() + + # Pool must still be alive after broker shutdown + result = await pool.fetchval("SELECT 1") + assert result == 1 + + await pool.close() + + +@pytest.mark.integration +async def test_asyncpg_broker_uses_provided_read_connection(pg_dsn: str) -> None: + """Broker created with only read_connection (no write_pool) creates its own pool.""" + read_conn = await asyncpg.connect(dsn=pg_dsn) + + broker = AsyncpgBroker(dsn=pg_dsn, read_connection=read_conn) + await broker.startup() + await broker.shutdown() + + # Connection must still be alive after broker shutdown + result = await read_conn.fetchval("SELECT 1") + assert result == 1 + + await read_conn.close() + + +@pytest.mark.integration +async def test_psycopg_shared_pool_not_closed_after_shutdown(pg_dsn: str) -> None: + """Pool stays usable after broker, result_backend and schedule_source shut down.""" + pool = AsyncConnectionPool(conninfo=pg_dsn, open=False) + await pool.open() + + read_conn = await AsyncConnection.connect(conninfo=pg_dsn, autocommit=True) + broker = PsycopgBroker(write_pool=pool, read_connection=read_conn) + result_backend = PsycopgResultBackend( + dsn=pg_dsn, + table_name=f"taskiq_results_{uuid.uuid4().hex}", + pool=pool, + ) + schedule_source = PsycopgScheduleSource( + broker=broker, + dsn=pg_dsn, + table_name=f"taskiq_schedules_{uuid.uuid4().hex}", + pool=pool, + ) + + await broker.startup() + await result_backend.startup() + await schedule_source.startup() + + await schedule_source.shutdown() + await result_backend.shutdown() + await broker.shutdown() + + # Pool must still be alive + async with pool.connection() as conn: + result = await conn.execute("SELECT 1") + row = await result.fetchone() + assert row is not None + assert row[0] == 1 + + await pool.close() + + +@pytest.mark.integration +async def test_psycopg_broker_uses_provided_write_pool(pg_dsn: str) -> None: + """Broker created with only write_pool creates its own read connection.""" + pool = AsyncConnectionPool(conninfo=pg_dsn, open=False) + await pool.open() + + broker = PsycopgBroker(dsn=pg_dsn, write_pool=pool) + await broker.startup() + await broker.shutdown() + + async with pool.connection() as conn: + result = await conn.execute("SELECT 1") + row = await result.fetchone() + assert row is not None + assert row[0] == 1 + + await pool.close() + + +@pytest.mark.integration +async def test_psycopg_broker_uses_provided_read_connection(pg_dsn: str) -> None: + """Broker created with only read_connection creates its own pool.""" + read_conn = await AsyncConnection.connect(conninfo=pg_dsn, autocommit=True) + + broker = PsycopgBroker(dsn=pg_dsn, read_connection=read_conn) + await broker.startup() + await broker.shutdown() + + # Connection must still be alive + result = await read_conn.execute("SELECT 1") + row = await result.fetchone() + assert row is not None + assert row[0] == 1 + + await read_conn.close() + + +@pytest.mark.integration +async def test_psqlpy_shared_pool_not_closed_after_shutdown(pg_dsn: str) -> None: + """Pool stays usable after broker, result_backend and schedule_source shut down.""" + pool = psqlpy.ConnectionPool(dsn=pg_dsn) + read_conn = await psqlpy.connect(dsn=pg_dsn) + + broker = PSQLPyBroker(write_pool=pool, read_connection=read_conn) + result_backend = PSQLPyResultBackend( + dsn=pg_dsn, + table_name=f"taskiq_results_{uuid.uuid4().hex}", + pool=pool, + ) + schedule_source = PSQLPyScheduleSource( + broker=broker, + dsn=pg_dsn, + table_name=f"taskiq_schedules_{uuid.uuid4().hex}", + pool=pool, + ) + + await broker.startup() + await result_backend.startup() + await schedule_source.startup() + + await schedule_source.shutdown() + await result_backend.shutdown() + await broker.shutdown() + + # Pool must still be alive + async with pool.acquire() as conn: + result = await conn.fetch("SELECT 1 AS val") + assert result.result()[0]["val"] == 1 + + pool.close() + + +@pytest.mark.integration +async def test_psqlpy_broker_uses_provided_write_pool(pg_dsn: str) -> None: + """Broker created with only write_pool creates its own read connection.""" + pool = psqlpy.ConnectionPool(dsn=pg_dsn) + + broker = PSQLPyBroker(dsn=pg_dsn, write_pool=pool) + await broker.startup() + await broker.shutdown() + + async with pool.acquire() as conn: + result = await conn.fetch("SELECT 1 AS val") + assert result.result()[0]["val"] == 1 + + pool.close() + + +@pytest.mark.integration +async def test_psqlpy_broker_uses_provided_read_connection(pg_dsn: str) -> None: + """Broker created with only read_connection creates its own pool.""" + read_conn = await psqlpy.connect(dsn=pg_dsn) + + broker = PSQLPyBroker(dsn=pg_dsn, read_connection=read_conn) + await broker.startup() + await broker.shutdown() + + # Connection must still be alive — run a query on it + result = await read_conn.fetch("SELECT 1 AS val") + assert result.result()[0]["val"] == 1 diff --git a/uv.lock b/uv.lock index 21875f2..c061d3f 100644 --- a/uv.lock +++ b/uv.lock @@ -1922,7 +1922,7 @@ wheels = [ [[package]] name = "pytest" -version = "9.0.1" +version = "9.0.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "colorama", marker = "sys_platform == 'win32'" }, @@ -1933,9 +1933,9 @@ dependencies = [ { name = "pygments" }, { name = "tomli", marker = "python_full_version < '3.11'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/07/56/f013048ac4bc4c1d9be45afd4ab209ea62822fb1598f40687e6bf45dcea4/pytest-9.0.1.tar.gz", hash = "sha256:3e9c069ea73583e255c3b21cf46b8d3c56f6e3a1a8f6da94ccb0fcf57b9d73c8", size = 1564125, upload-time = "2025-11-12T13:05:09.333Z" } +sdist = { url = "https://files.pythonhosted.org/packages/7d/0d/549bd94f1a0a402dc8cf64563a117c0f3765662e2e668477624baeec44d5/pytest-9.0.3.tar.gz", hash = "sha256:b86ada508af81d19edeb213c681b1d48246c1a91d304c6c81a427674c17eb91c", size = 1572165, upload-time = "2026-04-07T17:16:18.027Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/0b/8b/6300fb80f858cda1c51ffa17075df5d846757081d11ab4aa35cef9e6258b/pytest-9.0.1-py3-none-any.whl", hash = "sha256:67be0030d194df2dfa7b556f2e56fb3c3315bd5c8822c6951162b92b32ce7dad", size = 373668, upload-time = "2025-11-12T13:05:07.379Z" }, + { url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" }, ] [[package]] @@ -2173,14 +2173,14 @@ wheels = [ [[package]] name = "sqlalchemy-utils" -version = "0.42.0" +version = "0.42.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "sqlalchemy" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/63/80/4e15fdcfc25a2226122bf316f0ebac86d840ab3fb38b38ca4cabc395865e/sqlalchemy_utils-0.42.0.tar.gz", hash = "sha256:6d1ecd3eed8b941f0faf8a531f5d5cee7cffa2598fcf8163de8c31c7a417a5e0", size = 130531, upload-time = "2025-08-30T18:43:41.904Z" } +sdist = { url = "https://files.pythonhosted.org/packages/0f/7d/eb9565b6a49426552a5bf5c57e7c239c506dc0e4e5315aec6d1e8241dc7c/sqlalchemy_utils-0.42.1.tar.gz", hash = "sha256:881f9cd9e5044dc8f827bccb0425ce2e55490ce44fc0bb848c55cc8ee44cc02e", size = 130789, upload-time = "2025-12-13T03:14:13.591Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/52/86/21e97809b017a4ebc88971eea335130782421851b0ed8dc3ab6126b479f1/sqlalchemy_utils-0.42.0-py3-none-any.whl", hash = "sha256:c8c0b7f00f4734f6f20e9a4d06b39d79d58c8629cba50924fcaeb20e28eb4f48", size = 91744, upload-time = "2025-08-30T18:43:40.199Z" }, + { url = "https://files.pythonhosted.org/packages/7c/25/7400c18c3ee97914cc99c90007795c00a4ec5b60c853b49db7ba24d11179/sqlalchemy_utils-0.42.1-py3-none-any.whl", hash = "sha256:243cfe1b3a1dae3c74118ae633f1d1e0ed8c787387bc33e556e37c990594ac80", size = 91761, upload-time = "2025-12-13T03:14:15.014Z" }, ] [[package]] @@ -2257,6 +2257,7 @@ dev = [ { name = "pytest-cov" }, { name = "ruff" }, { name = "sqlalchemy-utils" }, + { name = "ty" }, { name = "uvloop" }, { name = "zizmor" }, ] @@ -2301,11 +2302,12 @@ dev = [ { name = "mypy", specifier = ">=1.19.0" }, { name = "polyfactory", specifier = ">=3.1.0" }, { name = "prek", specifier = ">=0.2.19" }, - { name = "pytest", specifier = ">=9.0.1" }, + { name = "pytest", specifier = ">=9.0.2" }, { name = "pytest-asyncio", specifier = ">=1.3.0" }, { name = "pytest-cov", specifier = ">=7.0.0" }, { name = "ruff", specifier = ">=0.14.8" }, - { name = "sqlalchemy-utils", specifier = ">=0.42.0" }, + { name = "sqlalchemy-utils", specifier = ">=0.42.1" }, + { name = "ty", specifier = ">=0.0.34" }, { name = "uvloop", specifier = ">=0.22.1" }, { name = "zizmor", specifier = ">=1.18.0" }, ] @@ -2323,10 +2325,10 @@ lint = [ ] test = [ { name = "polyfactory", specifier = ">=3.1.0" }, - { name = "pytest", specifier = ">=9.0.1" }, + { name = "pytest", specifier = ">=9.0.2" }, { name = "pytest-asyncio", specifier = ">=1.3.0" }, { name = "pytest-cov", specifier = ">=7.0.0" }, - { name = "sqlalchemy-utils", specifier = ">=0.42.0" }, + { name = "sqlalchemy-utils", specifier = ">=0.42.1" }, { name = "uvloop", specifier = ">=0.22.1" }, ] @@ -2339,6 +2341,30 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cf/db/ce8eda256fa131af12e0a76d481711abe4681b6923c27efb9a255c9e4594/tomli-2.0.2-py3-none-any.whl", hash = "sha256:2ebe24485c53d303f690b0ec092806a085f07af5a5aa1464f3931eec36caaa38", size = 13237, upload-time = "2024-10-02T10:46:11.806Z" }, ] +[[package]] +name = "ty" +version = "0.0.34" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/c4/69/e24eefe2c35c0fdbdec9b60e162727af669bb76d64d993d982eb67b24c38/ty-0.0.34.tar.gz", hash = "sha256:a6efe66b0f13c03a65e6c72ec9abfe2792e2fd063c74fa67e2c4930e29d661be", size = 5585933, upload-time = "2026-05-01T23:06:46.388Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/83/7b/8b85003d6639ef17a97dcbb31f4511cfe78f1c81a964470db100c8c883e7/ty-0.0.34-py3-none-linux_armv6l.whl", hash = "sha256:9ecc3d14f07a95a6ceb88e07f8e62358dbd37325d3d5bd56da7217ff1fef7fb8", size = 11067094, upload-time = "2026-05-01T23:06:21.133Z" }, + { url = "https://files.pythonhosted.org/packages/d7/25/b0098f65b020b015c40567c763fc66fffbec88b2ba6f584bca1e92f05ebb/ty-0.0.34-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:0dccffd8a9d02321cd2dee3249df205e26d62694e741f4eeca36b157fd8b419f", size = 10840909, upload-time = "2026-05-01T23:06:18.409Z" }, + { url = "https://files.pythonhosted.org/packages/e4/55/5e4adcf7d2a1006b844903b27cb81244a9b748d850433a46a6c21776c401/ty-0.0.34-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b0ea47a2998e167ab3b21d2f4b5309a9cf33c297809f6d7e3e753252223174d0", size = 10279378, upload-time = "2026-05-01T23:06:37.962Z" }, + { url = "https://files.pythonhosted.org/packages/4d/91/f537dca0db8fe2558e8ab04d8941d687b384fcc1df5eb9023b2db75ac26c/ty-0.0.34-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8b37da00b41a118a459ae56d8947e70651073fb33ebfbceb820e4a10b22d5023", size = 10817423, upload-time = "2026-05-01T23:06:26.247Z" }, + { url = "https://files.pythonhosted.org/packages/2c/c4/55a3ad1da2815af1009bdc1b8c90dc11a364cd314e4b48c5128ba9d38859/ty-0.0.34-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:81cbbb93c2342fe3de43e625d3a9eb149633e9f485e816ebf6395d08685355d8", size = 10851826, upload-time = "2026-05-01T23:06:24.198Z" }, + { url = "https://files.pythonhosted.org/packages/ce/8c/9c7606af22d73fb43ea4369472d9c66ece11231be73b0efe8e3c61655559/ty-0.0.34-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:4c5b4dea1594a021289e172582df9cde7089dce14b276fc650e7b212b1772e12", size = 11356318, upload-time = "2026-05-01T23:06:51.139Z" }, + { url = "https://files.pythonhosted.org/packages/20/54/bb423f663721ab4138b216425c6b55eaefd3a068243b24d6d8fe988f4e13/ty-0.0.34-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:030fb00aa2d2a5b5ae9d9183d574e0c82dae80566700a7490c43669d8ece40cd", size = 11902968, upload-time = "2026-05-01T23:06:35.82Z" }, + { url = "https://files.pythonhosted.org/packages/b6/22/01122b21ab6b534a2f618c6bbe5f1f7f49fd56f4b2ec8887cd6d40d08fb3/ty-0.0.34-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:5ae9555e24e36c63a8218e037a5a63f15579eb6aa94f41017e57cd41d335cfb5", size = 11548860, upload-time = "2026-05-01T23:06:42.155Z" }, + { url = "https://files.pythonhosted.org/packages/d1/50/86008b1392ec64bed1957bbcc7aaa43b466b50dfc91bb131841c21d7c5c3/ty-0.0.34-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:99eb23df9ed129fc26d1ab00d6f0b8dfe5253b09c2ac6abdb11523fa70d67f10", size = 11457097, upload-time = "2026-05-01T23:06:53.477Z" }, + { url = "https://files.pythonhosted.org/packages/92/3e/4558b2296963ba99c58d8409c57d7db4f3061b656c3613cb21c02c1ef4c2/ty-0.0.34-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:85de45382016eceae69e104815eb2cfa200787df104002e262a86cbd43ed2c02", size = 10798192, upload-time = "2026-05-01T23:06:40.004Z" }, + { url = "https://files.pythonhosted.org/packages/76/bf/650d24402be2ef678528d60caac1d9477a40fc37e3792ecef07834fd7a4a/ty-0.0.34-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:14cb575fb8fa5131f5129d100cfe23c1575d23faf5dfc5158432749a3e38c9b5", size = 10890390, upload-time = "2026-05-01T23:06:33.076Z" }, + { url = "https://files.pythonhosted.org/packages/5c/ef/ccd2ca13906079f7935fd7e067661b24233017f57d987d51d6a121d85bb5/ty-0.0.34-py3-none-musllinux_1_2_i686.whl", hash = "sha256:c6fc0b69d8450e6910ba9db34572b959b81329a97ae273c391f70e9fb6c1aade", size = 11031564, upload-time = "2026-05-01T23:06:55.812Z" }, + { url = "https://files.pythonhosted.org/packages/ba/2d/d27b72005b6f43599e3bcabab0d7135ac0c230b7a307bb99f9eea02c1cda/ty-0.0.34-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:30dfcec2f0fde3993f4f912ed0e057dcbebc8615299f610a4c2ddb7b5a3e1e06", size = 11553430, upload-time = "2026-05-01T23:06:31.096Z" }, + { url = "https://files.pythonhosted.org/packages/a7/12/20812e1ad930b8d4af70eebf19ad23cff6e31efcfa613ef884531fcdbaa1/ty-0.0.34-py3-none-win32.whl", hash = "sha256:97b77ddf007271b812a313a8f0a14929bc5590958433e1fb83ef585676f53342", size = 10436048, upload-time = "2026-05-01T23:06:49.108Z" }, + { url = "https://files.pythonhosted.org/packages/b0/6a/afa095c5987868fbda27c0f731146ac8e3d07b357adfa83daccaee5b1a16/ty-0.0.34-py3-none-win_amd64.whl", hash = "sha256:1f543968accb952705134028d1fda8656882787dbbc667ad4d6c3ba23791d604", size = 11462526, upload-time = "2026-05-01T23:06:28.514Z" }, + { url = "https://files.pythonhosted.org/packages/63/8f/bf041a06260d77662c0605e56dacfe90b786bf824cbe1aed238d15fe5e84/ty-0.0.34-py3-none-win_arm64.whl", hash = "sha256:ea09108cbcb16b6b06d7596312b433bf49681e78d30e4dc7fb3c1b248a95e09a", size = 10846945, upload-time = "2026-05-01T23:06:44.428Z" }, +] + [[package]] name = "typing-extensions" version = "4.15.0"