Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/pyfly/eda/adapters/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

import asyncio
import fnmatch
import hashlib
import json
import logging
import re
Expand All @@ -76,6 +77,23 @@ def _quote_ident(name: str) -> str:
return name


def _group_lock_key(group: str) -> int:
"""Stable signed-64-bit key for ``pg_try_advisory_lock``.

Postgres advisory locks take a single ``bigint``. We hash the
consumer-group name with SHA-256 and fold the first 8 bytes into a
signed 64-bit integer (the wire-format Postgres expects) so two
workers configured with the same group land on the same lock key
deterministically across replicas and restarts.
"""
digest = hashlib.sha256(group.encode("utf-8")).digest()
raw = int.from_bytes(digest[:8], byteorder="big", signed=False)
# Fold into signed 64-bit range.
if raw >= 2**63:
raw -= 2**64
return raw


def _normalise_dsn(dsn: str) -> str:
"""Strip SQLAlchemy dialect markers so asyncpg can parse the URL."""
for marker in ("postgresql+asyncpg://", "postgresql+psycopg://", "postgres+asyncpg://"):
Expand Down Expand Up @@ -266,6 +284,34 @@ async def _drain(self) -> None:
# be silently dropped.
if not self._handlers:
return
# Per-group advisory lock makes the drainer single-writer even
# when multiple replicas share the same consumer_group. Whoever
# holds the lock advances the cursor; everyone else returns and
# picks up on the next NOTIFY / poll. Session-level lock auto-
# releases on connection close, so a crashed worker never zombies
# the lock.
lock_key = _group_lock_key(self._group)
async with self._pool.acquire() as lock_conn:
got_lock = await lock_conn.fetchval(
"SELECT pg_try_advisory_lock($1)", lock_key
)
if not got_lock:
return
try:
await self._drain_with_lock()
finally:
try:
await lock_conn.fetchval(
"SELECT pg_advisory_unlock($1)", lock_key
)
except Exception:
logger.debug(
"pg_advisory_unlock raised; lock will release on conn close",
exc_info=True,
)

async def _drain_with_lock(self) -> None:
"""Drain loop body; only invoked while holding the group's advisory lock."""
while not self._closed:
async with self._pool.acquire() as conn:
offset = await conn.fetchval(
Expand Down
23 changes: 22 additions & 1 deletion tests/eda/test_postgres_event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

import pytest

from pyfly.eda.adapters.postgres import PostgresEventBus, _normalise_dsn, _quote_ident
from pyfly.eda.adapters.postgres import (
PostgresEventBus,
_group_lock_key,
_normalise_dsn,
_quote_ident,
)
from pyfly.eda.ports.outbound import EventPublisher


Expand Down Expand Up @@ -62,3 +67,19 @@ def test_normalise_dsn_applied_in_constructor(self) -> None:
)
assert bus._dsn == "postgresql://idp:idp@pg:5432/flydesk_idp"
assert bus._listen_dsn == "postgresql://idp:idp@pg:5432/flydesk_idp"


class TestGroupLockKey:
"""``_group_lock_key`` -- deterministic, signed-64-bit advisory lock."""

def test_same_group_yields_same_key(self) -> None:
assert _group_lock_key("flydocs-workers") == _group_lock_key("flydocs-workers")

def test_different_groups_yield_different_keys(self) -> None:
assert _group_lock_key("flydocs-workers") != _group_lock_key("flydocs-bbox-workers")

def test_fits_in_signed_bigint(self) -> None:
"""Postgres advisory locks take ``bigint`` (signed 64-bit)."""
for group in ("a", "flydocs-workers", "very-long-group-name-with-suffix"):
key = _group_lock_key(group)
assert -(2**63) <= key < 2**63
Loading