fix(eda): per-group advisory lock makes Postgres adapter multi-worker safe#13
Merged
Merged
Conversation
… safe The Postgres EDA adapter used a per-group cursor without any row-level claim, so when multiple replicas shared a consumer group, every replica read the same `last_event_id`, fetched the same outbox rows, and dispatched every event N times in parallel. Cursor-advance was unconditional too (`WHERE last_event_id < $1` only blocks going backwards), so the duplicate dispatch was silent. Wrap `_drain` in `pg_try_advisory_lock($group_key)` where `$group_key` is a deterministic SHA-256 fold of the consumer-group name into a signed bigint. Whoever holds the lock drains; everyone else returns and waits for the next NOTIFY / poll tick. Session-level lock auto-releases on connection death, so a crashed worker never zombies the group. Adds 3 unit tests for `_group_lock_key` (determinism, distinctness, signed-bigint range). The dispatch-exactly-once behaviour against a real Postgres + 2 concurrent bus instances is covered by an integration test in the consuming service.
1 task
ancongui
added a commit
that referenced
this pull request
May 19, 2026
Bumps version from 26.5.4 → 26.5.5 and adds the CHANGELOG entry for the per-group ``pg_try_advisory_lock`` fix on ``pyfly.eda.adapters.postgres.PostgresEventBus._drain``. The fix itself shipped in #13 (commit a4c7eb5). This commit cuts the release. Co-authored-by: Andrés Contreras Guillén <ancongui@Andress-MacBook-Pro.local>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
PostgresEventBus._draininpg_try_advisory_lock($group_key)so concurrent replicas sharing a consumer group can't all drain in parallel. Whoever holds the lock dispatches; the others return and wait for the next NOTIFY / poll._group_lock_key(): deterministic SHA-256 → signed-bigint hash of the consumer-group name (the wire formatpg_advisory_lockrequires).Why
The adapter is single-consumer-per-group by design (
pyfly_eda_offsets.last_event_idis a shared cursor). Before this change, scaling worker replicas in a consuming service meant every replica read the same offset and dispatched every event N times in parallel. TheWHERE last_event_id < $1guard on the cursor-advance UPDATE only prevents going backwards — it doesn't prevent duplicate dispatch when two replicas read the same offset concurrently.Kafka and Redis Streams adapters were already safe (their brokers handle competitive consumption). Postgres was the outlier and is the default for services that already use Postgres for persistence.
Test plan
_group_lock_key: same group → same key, different groups → different keys, fits in signed bigint.PostgresEventBusinstances, same group, 20 events published → exactly 20 deliveries across the pair (no duplicates).