From ae3b34c40c4e766aeed8bff1518de598f4ee5100 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Tue, 19 May 2026 11:28:47 +0200 Subject: [PATCH] fix(eda): per-group advisory lock makes Postgres adapter multi-worker safe The Postgres EDA adapter used a per-group cursor without any row-level claim, so when multiple replicas shared a consumer group, every replica read the same `last_event_id`, fetched the same outbox rows, and dispatched every event N times in parallel. Cursor-advance was unconditional too (`WHERE last_event_id < $1` only blocks going backwards), so the duplicate dispatch was silent. Wrap `_drain` in `pg_try_advisory_lock($group_key)` where `$group_key` is a deterministic SHA-256 fold of the consumer-group name into a signed bigint. Whoever holds the lock drains; everyone else returns and waits for the next NOTIFY / poll tick. Session-level lock auto-releases on connection death, so a crashed worker never zombies the group. Adds 3 unit tests for `_group_lock_key` (determinism, distinctness, signed-bigint range). The dispatch-exactly-once behaviour against a real Postgres + 2 concurrent bus instances is covered by an integration test in the consuming service. --- src/pyfly/eda/adapters/postgres.py | 46 ++++++++++++++++++++++++++++ tests/eda/test_postgres_event_bus.py | 23 +++++++++++++- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/src/pyfly/eda/adapters/postgres.py b/src/pyfly/eda/adapters/postgres.py index af0f626..c63279d 100644 --- a/src/pyfly/eda/adapters/postgres.py +++ b/src/pyfly/eda/adapters/postgres.py @@ -54,6 +54,7 @@ import asyncio import fnmatch +import hashlib import json import logging import re @@ -76,6 +77,23 @@ def _quote_ident(name: str) -> str: return name +def _group_lock_key(group: str) -> int: + """Stable signed-64-bit key for ``pg_try_advisory_lock``. + + Postgres advisory locks take a single ``bigint``. We hash the + consumer-group name with SHA-256 and fold the first 8 bytes into a + signed 64-bit integer (the wire-format Postgres expects) so two + workers configured with the same group land on the same lock key + deterministically across replicas and restarts. + """ + digest = hashlib.sha256(group.encode("utf-8")).digest() + raw = int.from_bytes(digest[:8], byteorder="big", signed=False) + # Fold into signed 64-bit range. + if raw >= 2**63: + raw -= 2**64 + return raw + + def _normalise_dsn(dsn: str) -> str: """Strip SQLAlchemy dialect markers so asyncpg can parse the URL.""" for marker in ("postgresql+asyncpg://", "postgresql+psycopg://", "postgres+asyncpg://"): @@ -266,6 +284,34 @@ async def _drain(self) -> None: # be silently dropped. if not self._handlers: return + # Per-group advisory lock makes the drainer single-writer even + # when multiple replicas share the same consumer_group. Whoever + # holds the lock advances the cursor; everyone else returns and + # picks up on the next NOTIFY / poll. Session-level lock auto- + # releases on connection close, so a crashed worker never zombies + # the lock. + lock_key = _group_lock_key(self._group) + async with self._pool.acquire() as lock_conn: + got_lock = await lock_conn.fetchval( + "SELECT pg_try_advisory_lock($1)", lock_key + ) + if not got_lock: + return + try: + await self._drain_with_lock() + finally: + try: + await lock_conn.fetchval( + "SELECT pg_advisory_unlock($1)", lock_key + ) + except Exception: + logger.debug( + "pg_advisory_unlock raised; lock will release on conn close", + exc_info=True, + ) + + async def _drain_with_lock(self) -> None: + """Drain loop body; only invoked while holding the group's advisory lock.""" while not self._closed: async with self._pool.acquire() as conn: offset = await conn.fetchval( diff --git a/tests/eda/test_postgres_event_bus.py b/tests/eda/test_postgres_event_bus.py index cf7f223..8422a45 100644 --- a/tests/eda/test_postgres_event_bus.py +++ b/tests/eda/test_postgres_event_bus.py @@ -10,7 +10,12 @@ import pytest -from pyfly.eda.adapters.postgres import PostgresEventBus, _normalise_dsn, _quote_ident +from pyfly.eda.adapters.postgres import ( + PostgresEventBus, + _group_lock_key, + _normalise_dsn, + _quote_ident, +) from pyfly.eda.ports.outbound import EventPublisher @@ -62,3 +67,19 @@ def test_normalise_dsn_applied_in_constructor(self) -> None: ) assert bus._dsn == "postgresql://idp:idp@pg:5432/flydesk_idp" assert bus._listen_dsn == "postgresql://idp:idp@pg:5432/flydesk_idp" + + +class TestGroupLockKey: + """``_group_lock_key`` -- deterministic, signed-64-bit advisory lock.""" + + def test_same_group_yields_same_key(self) -> None: + assert _group_lock_key("flydocs-workers") == _group_lock_key("flydocs-workers") + + def test_different_groups_yield_different_keys(self) -> None: + assert _group_lock_key("flydocs-workers") != _group_lock_key("flydocs-bbox-workers") + + def test_fits_in_signed_bigint(self) -> None: + """Postgres advisory locks take ``bigint`` (signed 64-bit).""" + for group in ("a", "flydocs-workers", "very-long-group-name-with-suffix"): + key = _group_lock_key(group) + assert -(2**63) <= key < 2**63