feat(eda): add PostgreSQL LISTEN/NOTIFY transport with transactional outbox#14
Merged
Conversation
added 5 commits
May 18, 2026 15:42
…outbox Adds POSTGRES as a first-class EDA transport alongside KAFKA and RABBITMQ. Publishing performs a single INSERT into a configurable outbox table; an AFTER INSERT trigger calls pg_notify(channel, id) so consumers LISTENing on the same channel pick up the event id and fetch the payload. The outbox table also tracks attempts, processed/failed timestamps, and a status field that transitions to DEAD_LETTER after configurable retries. The consumer holds one long-lived R2DBC connection for LISTEN and uses a pool for outbox queries; a periodic poll catches rows missed by NOTIFY (consumer offline at insert time, payload truncation, etc.). Channel names are derived deterministically from destinations to fit PostgreSQL's 63-byte identifier limit. POSTGRES is added to the AUTO publisher selection chain between RABBITMQ and APPLICATION_EVENT, and Spring Boot's default R2dbcAutoConfiguration is excluded in tests since the EDA module manages its own connection factories under firefly.eda.* properties. Includes Testcontainers integration tests covering publish, consumer dispatch over NOTIFY, ack semantics, deterministic channel mapping, and health reporting. README, CONFIGURATION.md, and PUBLISHER_TYPES.md updated with the new transport's properties and behavior.
PostgresIntegrationTest holds a live LISTEN connection and a polling subscription for the duration of the test. When the Spring TestContext cache shares its context across other @SpringBootTest classes, that lingering background work was pushing ResilienceIntegrationTest's context out of the cache on CI (visible as a 56ms run of 12 tests where the resilience4j registry was empty), causing five flaky failures. Marking the test class with @DirtiesContext(AFTER_CLASS) evicts its context as soon as it finishes so it can no longer crowd or interfere with the rest of the integration suite.
… cache EventPublisherFactory.publisherCache uses computeIfAbsent, so the first caller to populate a slot wins. When Spring's TestContext cache reused a context where the resilience factory was momentarily unavailable (e.g., bean initialisation order across cached contexts on CI), the cache stored the bare publisher. Subsequent calls -- including assertions in ResilienceIntegrationTest -- returned the unwrapped instance, so no retry/circuit breaker entries were ever registered in the resilience4j registries, and the tests failed with "Expecting actual not to be null". The factory now validates that the cached publisher matches the current resilience configuration (i.e., is a ResilientEventPublisher when the resilience factory is available) before returning it, and recreates the publisher otherwise. DestinationAwarePublisher delegates are unwrapped for the check so dynamic-destination publishers still benefit. Resilience state is preserved by name in the resilience4j registry, so recreation is safe.
… changes Spring's TestContext cache occasionally hands EventPublisherFactory a publisher that was wrapped around a different context's resilience4j registries. The wrapping looks correct (instanceof ResilientEventPublisher) but the autowired CircuitBreakerRegistry/RetryRegistry in the active context never see the registrations, so assertions in ResilienceIntegrationTest could not locate the named instances and failed with "Expecting actual not to be null". Track the last-seen ResilientEventPublisherFactory instance and clear the publisher cache whenever the resolved factory identity changes. This keeps the cache (so callers still get a stable publisher instance per type + connection) but guarantees the resilience entries land in the registries the test is actually inspecting. Equality-based tests that rely on caching (e.g., ComprehensiveDynamicTopicTest) continue to pass.
The publisher cache key normalised null -> defaultConnectionId ("default"),
but the resilience4j entry name kept the raw connectionId. As a result,
getPublisher(type, null) and getPublisher(type, "default") collided in the
cache yet produced different circuit-breaker / retry names. Whichever caller
populated the cache first won, leaving subsequent assertions hunting for a
named instance that did not exist (or, worse, finding the wrong one and
seeing zero metrics because publish flowed through the other instance).
This was the cause of the intermittent ResilienceIntegrationTest failures
on CI: an earlier test in the shared Spring context called the factory with
an explicit "default" connectionId, caching a publisher whose circuit
breaker was named "eda-publisher-application_event_default". When
ResilienceIntegrationTest later asked for the publisher with null and
queried for "eda-publisher-application_event_null", it found nothing.
The factory now resolves the connectionId once, up front, and uses the
resolved value for both the cache key and the resilience4j entry name.
ResilienceIntegrationTest is updated to read the suffix from the cache key
directly instead of assuming the asymmetric "default <-> null" mapping.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
POSTGRESas a first-class EDA transport, sitting betweenRABBITMQandAPPLICATION_EVENTin theAUTOselection chain.AFTER INSERTtrigger callspg_notify(channel, id)so consumersLISTENing on the same channel receive the row id and fetch the payload. The outbox row tracks attempts, processed/failed timestamps, and a status field that transitions toDEAD_LETTERafter the configured retry budget.LISTENand uses a pool for outbox queries; a periodic poll catches rows missed byNOTIFY(consumer offline at insert time, dropped connection, etc.). Channel names are derived deterministically from destinations to fit PostgreSQL's 63-byte identifier limit (lower-case +firefly_eda_prefix + optional hash suffix).R2dbcAutoConfigurationis excluded in tests because the EDA module manages its own factories underfirefly.eda.*and never reads fromspring.r2dbc.*.Key changes
PublisherType.POSTGRESadded;supportsPersistence/supportsOrderingupdated;EventPublisherFactory.getAutoSelectedPublisherpriority becomesKAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT.EdaProperties.Publishers.PostgresConfigandEdaProperties.Consumer.PostgresConfigadded; default connection wired in;getPublisherConfig(POSTGRES, ...)covered.PostgresEventPublisher) and consumer (PostgresEventConsumer) underorg.fireflyframework.eda.publisher.postgres/org.fireflyframework.eda.consumer.postgres.FireflyEdaPostgresPublisherAutoConfiguration,FireflyEdaPostgresConsumerAutoConfiguration) registered viaAutoConfiguration.imports. Publisher auto-config also provisions the outbox table, supporting indexes, NOTIFY function, and trigger whenauto-create-schema: true(default).io.r2dbc:r2dbc-spi,io.r2dbc:r2dbc-pool,org.postgresql:r2dbc-postgresql) plustestcontainers:postgresqlfor tests.docs/CONFIGURATION.md,docs/PUBLISHER_TYPES.md.Test plan
mvn test -Dtest=PostgresChannelMapperTest(7 tests)mvn test -Dtest=PostgresIntegrationTest(5 tests, Testcontainers PostgreSQL 16)mvn testfull suite (294 tests, 0 failures, 0 errors, 26 pre-existing skips)