Skip to content

fix(eda): per-group advisory lock makes Postgres adapter multi-worker safe#13

Merged
ancongui merged 1 commit into
mainfrom
fix/eda-postgres-multi-worker-safety
May 19, 2026
Merged

fix(eda): per-group advisory lock makes Postgres adapter multi-worker safe#13
ancongui merged 1 commit into
mainfrom
fix/eda-postgres-multi-worker-safety

Conversation

@ancongui
Copy link
Copy Markdown
Contributor

Summary

  • Wrap PostgresEventBus._drain in pg_try_advisory_lock($group_key) so concurrent replicas sharing a consumer group can't all drain in parallel. Whoever holds the lock dispatches; the others return and wait for the next NOTIFY / poll.
  • Add _group_lock_key(): deterministic SHA-256 → signed-bigint hash of the consumer-group name (the wire format pg_advisory_lock requires).
  • Session-level lock auto-releases on connection close, so a crashed worker never zombies the group.

Why

The adapter is single-consumer-per-group by design (pyfly_eda_offsets.last_event_id is a shared cursor). Before this change, scaling worker replicas in a consuming service meant every replica read the same offset and dispatched every event N times in parallel. The WHERE last_event_id < $1 guard on the cursor-advance UPDATE only prevents going backwards — it doesn't prevent duplicate dispatch when two replicas read the same offset concurrently.

Kafka and Redis Streams adapters were already safe (their brokers handle competitive consumption). Postgres was the outlier and is the default for services that already use Postgres for persistence.

Test plan

  • 3 new unit tests for _group_lock_key: same group → same key, different groups → different keys, fits in signed bigint.
  • Existing 39 pyfly EDA tests still pass (42 total).
  • Real-Postgres integration test in the consuming service: two PostgresEventBus instances, same group, 20 events published → exactly 20 deliveries across the pair (no duplicates).

… safe

The Postgres EDA adapter used a per-group cursor without any row-level
claim, so when multiple replicas shared a consumer group, every replica
read the same `last_event_id`, fetched the same outbox rows, and
dispatched every event N times in parallel. Cursor-advance was
unconditional too (`WHERE last_event_id < $1` only blocks going
backwards), so the duplicate dispatch was silent.

Wrap `_drain` in `pg_try_advisory_lock($group_key)` where `$group_key` is
a deterministic SHA-256 fold of the consumer-group name into a signed
bigint. Whoever holds the lock drains; everyone else returns and waits
for the next NOTIFY / poll tick. Session-level lock auto-releases on
connection death, so a crashed worker never zombies the group.

Adds 3 unit tests for `_group_lock_key` (determinism, distinctness,
signed-bigint range). The dispatch-exactly-once behaviour against a
real Postgres + 2 concurrent bus instances is covered by an integration
test in the consuming service.
@ancongui ancongui merged commit a4c7eb5 into main May 19, 2026
2 of 4 checks passed
@ancongui ancongui deleted the fix/eda-postgres-multi-worker-safety branch May 19, 2026 09:30
ancongui added a commit that referenced this pull request May 19, 2026
Bumps version from 26.5.4 → 26.5.5 and adds the CHANGELOG entry for
the per-group ``pg_try_advisory_lock`` fix on
``pyfly.eda.adapters.postgres.PostgresEventBus._drain``.

The fix itself shipped in #13 (commit a4c7eb5). This commit cuts the
release.

Co-authored-by: Andrés Contreras Guillén <ancongui@Andress-MacBook-Pro.local>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant