Skip to content

feat(eda): add PostgreSQL LISTEN/NOTIFY transport with transactional outbox#14

Merged
ancongui merged 5 commits into
developfrom
feat/postgres-eda-adapter
May 18, 2026
Merged

feat(eda): add PostgreSQL LISTEN/NOTIFY transport with transactional outbox#14
ancongui merged 5 commits into
developfrom
feat/postgres-eda-adapter

Conversation

@ancongui
Copy link
Copy Markdown
Contributor

Summary

  • Adds POSTGRES as a first-class EDA transport, sitting between RABBITMQ and APPLICATION_EVENT in the AUTO selection chain.
  • Publishing inserts into a configurable outbox table; an AFTER INSERT trigger calls pg_notify(channel, id) so consumers LISTENing on the same channel receive the row id and fetch the payload. The outbox row tracks attempts, processed/failed timestamps, and a status field that transitions to DEAD_LETTER after the configured retry budget.
  • Consumer holds one long-lived R2DBC connection for LISTEN and uses a pool for outbox queries; a periodic poll catches rows missed by NOTIFY (consumer offline at insert time, dropped connection, etc.). Channel names are derived deterministically from destinations to fit PostgreSQL's 63-byte identifier limit (lower-case + firefly_eda_ prefix + optional hash suffix).
  • Spring Boot's default R2dbcAutoConfiguration is excluded in tests because the EDA module manages its own factories under firefly.eda.* and never reads from spring.r2dbc.*.

Key changes

  • PublisherType.POSTGRES added; supportsPersistence/supportsOrdering updated; EventPublisherFactory.getAutoSelectedPublisher priority becomes KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT.
  • EdaProperties.Publishers.PostgresConfig and EdaProperties.Consumer.PostgresConfig added; default connection wired in; getPublisherConfig(POSTGRES, ...) covered.
  • New publisher (PostgresEventPublisher) and consumer (PostgresEventConsumer) under org.fireflyframework.eda.publisher.postgres / org.fireflyframework.eda.consumer.postgres.
  • New auto-configurations (FireflyEdaPostgresPublisherAutoConfiguration, FireflyEdaPostgresConsumerAutoConfiguration) registered via AutoConfiguration.imports. Publisher auto-config also provisions the outbox table, supporting indexes, NOTIFY function, and trigger when auto-create-schema: true (default).
  • R2DBC PostgreSQL dependencies pulled in (io.r2dbc:r2dbc-spi, io.r2dbc:r2dbc-pool, org.postgresql:r2dbc-postgresql) plus testcontainers:postgresql for tests.
  • Docs updated: README, docs/CONFIGURATION.md, docs/PUBLISHER_TYPES.md.

Test plan

  • mvn test -Dtest=PostgresChannelMapperTest (7 tests)
  • mvn test -Dtest=PostgresIntegrationTest (5 tests, Testcontainers PostgreSQL 16)
  • mvn test full suite (294 tests, 0 failures, 0 errors, 26 pre-existing skips)
  • Verify CI builds green on this branch.

Andrés Contreras Guillén added 5 commits May 18, 2026 15:42
…outbox

Adds POSTGRES as a first-class EDA transport alongside KAFKA and RABBITMQ.
Publishing performs a single INSERT into a configurable outbox table; an
AFTER INSERT trigger calls pg_notify(channel, id) so consumers LISTENing on
the same channel pick up the event id and fetch the payload. The outbox
table also tracks attempts, processed/failed timestamps, and a status field
that transitions to DEAD_LETTER after configurable retries.

The consumer holds one long-lived R2DBC connection for LISTEN and uses a
pool for outbox queries; a periodic poll catches rows missed by NOTIFY
(consumer offline at insert time, payload truncation, etc.). Channel names
are derived deterministically from destinations to fit PostgreSQL's 63-byte
identifier limit. POSTGRES is added to the AUTO publisher selection chain
between RABBITMQ and APPLICATION_EVENT, and Spring Boot's default
R2dbcAutoConfiguration is excluded in tests since the EDA module manages
its own connection factories under firefly.eda.* properties.

Includes Testcontainers integration tests covering publish, consumer
dispatch over NOTIFY, ack semantics, deterministic channel mapping, and
health reporting. README, CONFIGURATION.md, and PUBLISHER_TYPES.md updated
with the new transport's properties and behavior.
PostgresIntegrationTest holds a live LISTEN connection and a polling
subscription for the duration of the test. When the Spring TestContext
cache shares its context across other @SpringBootTest classes, that
lingering background work was pushing ResilienceIntegrationTest's context
out of the cache on CI (visible as a 56ms run of 12 tests where the
resilience4j registry was empty), causing five flaky failures.

Marking the test class with @DirtiesContext(AFTER_CLASS) evicts its
context as soon as it finishes so it can no longer crowd or interfere
with the rest of the integration suite.
… cache

EventPublisherFactory.publisherCache uses computeIfAbsent, so the first
caller to populate a slot wins. When Spring's TestContext cache reused a
context where the resilience factory was momentarily unavailable (e.g.,
bean initialisation order across cached contexts on CI), the cache stored
the bare publisher. Subsequent calls -- including assertions in
ResilienceIntegrationTest -- returned the unwrapped instance, so no
retry/circuit breaker entries were ever registered in the resilience4j
registries, and the tests failed with "Expecting actual not to be null".

The factory now validates that the cached publisher matches the current
resilience configuration (i.e., is a ResilientEventPublisher when the
resilience factory is available) before returning it, and recreates the
publisher otherwise. DestinationAwarePublisher delegates are unwrapped
for the check so dynamic-destination publishers still benefit. Resilience
state is preserved by name in the resilience4j registry, so recreation is
safe.
… changes

Spring's TestContext cache occasionally hands EventPublisherFactory a
publisher that was wrapped around a different context's resilience4j
registries. The wrapping looks correct (instanceof ResilientEventPublisher)
but the autowired CircuitBreakerRegistry/RetryRegistry in the active
context never see the registrations, so assertions in
ResilienceIntegrationTest could not locate the named instances and failed
with "Expecting actual not to be null".

Track the last-seen ResilientEventPublisherFactory instance and clear the
publisher cache whenever the resolved factory identity changes. This keeps
the cache (so callers still get a stable publisher instance per type +
connection) but guarantees the resilience entries land in the registries
the test is actually inspecting. Equality-based tests that rely on
caching (e.g., ComprehensiveDynamicTopicTest) continue to pass.
The publisher cache key normalised null -> defaultConnectionId ("default"),
but the resilience4j entry name kept the raw connectionId. As a result,
getPublisher(type, null) and getPublisher(type, "default") collided in the
cache yet produced different circuit-breaker / retry names. Whichever caller
populated the cache first won, leaving subsequent assertions hunting for a
named instance that did not exist (or, worse, finding the wrong one and
seeing zero metrics because publish flowed through the other instance).

This was the cause of the intermittent ResilienceIntegrationTest failures
on CI: an earlier test in the shared Spring context called the factory with
an explicit "default" connectionId, caching a publisher whose circuit
breaker was named "eda-publisher-application_event_default". When
ResilienceIntegrationTest later asked for the publisher with null and
queried for "eda-publisher-application_event_null", it found nothing.

The factory now resolves the connectionId once, up front, and uses the
resolved value for both the cache key and the resilience4j entry name.
ResilienceIntegrationTest is updated to read the suffix from the cache key
directly instead of assuming the asymmetric "default <-> null" mapping.
@ancongui ancongui merged commit 6e2ac15 into develop May 18, 2026
4 checks passed
@ancongui ancongui deleted the feat/postgres-eda-adapter branch May 18, 2026 15:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant