From 45edff9c6c394324cb21d65db66a6b8fe07764de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Mon, 18 May 2026 15:42:12 +0200 Subject: [PATCH 1/5] feat(eda): add PostgreSQL LISTEN/NOTIFY transport with transactional outbox Adds POSTGRES as a first-class EDA transport alongside KAFKA and RABBITMQ. Publishing performs a single INSERT into a configurable outbox table; an AFTER INSERT trigger calls pg_notify(channel, id) so consumers LISTENing on the same channel pick up the event id and fetch the payload. The outbox table also tracks attempts, processed/failed timestamps, and a status field that transitions to DEAD_LETTER after configurable retries. The consumer holds one long-lived R2DBC connection for LISTEN and uses a pool for outbox queries; a periodic poll catches rows missed by NOTIFY (consumer offline at insert time, payload truncation, etc.). Channel names are derived deterministically from destinations to fit PostgreSQL's 63-byte identifier limit. POSTGRES is added to the AUTO publisher selection chain between RABBITMQ and APPLICATION_EVENT, and Spring Boot's default R2dbcAutoConfiguration is excluded in tests since the EDA module manages its own connection factories under firefly.eda.* properties. Includes Testcontainers integration tests covering publish, consumer dispatch over NOTIFY, ack semantics, deterministic channel mapping, and health reporting. README, CONFIGURATION.md, and PUBLISHER_TYPES.md updated with the new transport's properties and behavior. --- README.md | 87 ++- docs/CONFIGURATION.md | 93 +++ docs/PUBLISHER_TYPES.md | 86 ++- pom.xml | 19 + .../eda/annotation/PublisherType.java | 18 +- .../config/FireflyEdaAutoConfiguration.java | 31 + ...yEdaPostgresConsumerAutoConfiguration.java | 111 +++ ...EdaPostgresPublisherAutoConfiguration.java | 202 ++++++ .../postgres/PostgresEventConsumer.java | 631 ++++++++++++++++++ .../eda/properties/EdaProperties.java | 164 +++++ .../eda/publisher/EventPublisherFactory.java | 3 +- .../postgres/PostgresChannelMapper.java | 94 +++ .../postgres/PostgresEventPublisher.java | 318 +++++++++ ...ot.autoconfigure.AutoConfiguration.imports | 2 + .../integration/PostgresIntegrationTest.java | 238 +++++++ .../postgres/PostgresChannelMapperTest.java | 83 +++ .../resources/application-test.properties | 10 + 17 files changed, 2158 insertions(+), 32 deletions(-) create mode 100644 src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresConsumerAutoConfiguration.java create mode 100644 src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresPublisherAutoConfiguration.java create mode 100644 src/main/java/org/fireflyframework/eda/consumer/postgres/PostgresEventConsumer.java create mode 100644 src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapper.java create mode 100644 src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresEventPublisher.java create mode 100644 src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java create mode 100644 src/test/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapperTest.java diff --git a/README.md b/README.md index 2abdd9b..d09dd32 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![Java](https://img.shields.io/badge/Java-21%2B-orange.svg)](https://openjdk.org) [![Spring Boot](https://img.shields.io/badge/Spring%20Boot-3.x-green.svg)](https://spring.io/projects/spring-boot) -> Unified event-driven architecture library with Kafka, RabbitMQ, and Spring Application Events support. +> Unified event-driven architecture library with Kafka, RabbitMQ, PostgreSQL (LISTEN/NOTIFY + outbox), and Spring Application Events support. --- @@ -23,7 +23,7 @@ ## Overview -Firefly Framework EDA provides a standardized messaging abstraction for event-driven architectures, supporting multiple broker implementations through a unified publisher/consumer API. It enables reactive event publishing and consumption with built-in support for Apache Kafka, RabbitMQ, and Spring Application Events as transport mechanisms. +Firefly Framework EDA provides a standardized messaging abstraction for event-driven architectures, supporting multiple broker implementations through a unified publisher/consumer API. It enables reactive event publishing and consumption with built-in support for Apache Kafka, RabbitMQ, PostgreSQL (via `LISTEN`/`NOTIFY` backed by a transactional outbox table), and Spring Application Events as transport mechanisms. The library features annotation-driven event publishing (`@EventPublisher`, `@PublishResult`), declarative event listeners (`@EventListener`), and a comprehensive set of event filtering, serialization, and error handling capabilities. It includes support for JSON, Avro, and Protobuf message serialization formats. @@ -31,7 +31,7 @@ The resilient publisher wrapper provides circuit breaker integration, while the ## Features -- Multi-broker support: Apache Kafka, RabbitMQ, Spring Application Events +- Multi-broker support: Apache Kafka, RabbitMQ, PostgreSQL (`LISTEN`/`NOTIFY` + outbox), Spring Application Events - Annotation-driven publishing: `@EventPublisher`, `@PublishResult` - Declarative event listening: `@EventListener` with SpEL-based filtering - Event envelope pattern with metadata propagation @@ -41,16 +41,17 @@ The resilient publisher wrapper provides circuit breaker integration, while the - Resilient publisher with circuit breaker support - Dynamic event listener registration at runtime - AMQP admin auto-configuration for RabbitMQ exchanges and queues +- Transactional outbox table for PostgreSQL with auto-created schema and trigger - Custom error handling strategies with metrics and notification handlers - Health indicators and metrics for Actuator integration -- Spring Boot auto-configuration for Kafka and RabbitMQ +- Spring Boot auto-configuration for Kafka, RabbitMQ, and PostgreSQL ## Requirements - Java 21+ - Spring Boot 3.x - Maven 3.9+ -- Apache Kafka or RabbitMQ (depending on chosen broker) +- Apache Kafka, RabbitMQ, or PostgreSQL 11+ (depending on chosen transport) ## Installation @@ -95,16 +96,76 @@ public class OrderEventHandler { ```yaml firefly: eda: - broker: kafka # kafka, rabbitmq, spring - kafka: - bootstrap-servers: localhost:9092 - consumer: - group-id: my-service - rabbitmq: - host: localhost - port: 5672 + enabled: true + default-publisher-type: AUTO # AUTO chooses KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT + publishers: + enabled: true + kafka: + default: + enabled: true + bootstrap-servers: localhost:9092 + rabbitmq: + default: + enabled: true + host: localhost + port: 5672 + postgres: + default: + enabled: true + host: localhost + port: 5432 + database: app + username: app + password: secret + schema: public + outbox-table: firefly_eda_outbox + default-destination: events + auto-create-schema: true # provision outbox table + NOTIFY trigger at startup + consumer: + enabled: true + group-id: my-service + kafka: + default: + enabled: true + bootstrap-servers: localhost:9092 + rabbitmq: + default: + enabled: true + host: localhost + port: 5672 + queues: events-queue + postgres: + default: + enabled: true + host: localhost + port: 5432 + database: app + username: app + password: secret + channels: events,order-events # destinations to LISTEN on + polling-interval: 30s # NOTIFY-loss fallback poll cadence + max-attempts: 3 # outbox row moves to DEAD_LETTER after N failures ``` +### PostgreSQL transport at a glance + +- Each `publish()` performs a single `INSERT` into `firefly_eda_outbox`. A + database trigger fires `pg_notify(channel, id)` for every inserted row. +- The consumer holds a dedicated R2DBC connection that runs `LISTEN ` + for every subscribed destination. Notifications carry only the outbox row + id so payloads can be arbitrarily large. +- On dispatch, the listener pipeline either marks the row `PROCESSED` or + increments `attempts`; once `attempts` reaches `max-attempts`, the row + moves to `DEAD_LETTER` status. +- A periodic poll (`polling-interval`) catches rows that slipped past the + live channel (e.g., consumer offline at insert time, payload too large, + connection reset). Set it to `0s` to disable polling. +- Channel names are derived deterministically from destinations via the + built-in mapper: non-alphanumeric characters become `_`, the result is + lower-cased, prefixed with `firefly_eda_`, and truncated to fit + PostgreSQL's 63-byte identifier limit with a stable hash suffix when + needed. + ## Documentation Additional documentation is available in the [docs/](docs/) directory: diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 34aea82..57c03ec 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -179,6 +179,54 @@ firefly: | `default-routing-key` | string | `"event"` | Default routing key | | `properties` | map | `{}` | Additional RabbitMQ connection properties | +### PostgreSQL Publisher + +The PostgreSQL publisher uses an outbox table together with `pg_notify` to deliver events. Configuration lives under `firefly.eda.publishers.postgres.` -- never under `spring.r2dbc.*`. + +```yaml +firefly: + eda: + publishers: + postgres: + default: # Connection ID + enabled: true + host: "localhost" + port: 5432 + database: "app" + username: "app" + password: "secret" + schema: "public" + outbox-table: "firefly_eda_outbox" + default-destination: "events" + auto-create-schema: true + max-pool-size: 10 + properties: + statement_timeout: "30000" +``` + +| Property | Type | Default | Description | +|----------|------|---------|-------------| +| `enabled` | boolean | `false` | Whether this PostgreSQL publisher connection is enabled | +| `host` | string | `"localhost"` | PostgreSQL host | +| `port` | int | `5432` | PostgreSQL port | +| `database` | string | `null` | Database name | +| `username` | string | `null` | Database username | +| `password` | string | `null` | Database password | +| `schema` | string | `"public"` | Schema containing the outbox table | +| `outbox-table` | string | `"firefly_eda_outbox"` | Name of the outbox table | +| `default-destination` | string | `"events"` | Default destination when none is provided to `publish()` | +| `auto-create-schema` | boolean | `true` | Provision the outbox table, index, NOTIFY function and trigger at startup | +| `max-pool-size` | int | `10` | Maximum R2DBC connection pool size used for outbox inserts | +| `properties` | map | `{}` | Additional R2DBC PostgreSQL startup options (e.g., statement timeout) | + +When `auto-create-schema: true`, the publisher provisions: + +- `firefly_eda_outbox` table with `id`, `destination`, `channel`, `payload (BYTEA)`, `headers (JSONB)`, `status`, `attempts`, `error_message`, and timestamp columns +- An index on `(status, created_at)` filtered to `status = 'PENDING'` +- An index on `(channel, status)` +- A `firefly_eda_notify_event()` trigger function that calls `pg_notify(NEW.channel, NEW.id::text)` +- An `AFTER INSERT` trigger on the outbox table that runs the function + ## Consumer Configuration Consumers are configured under `firefly.eda.consumer`. @@ -289,6 +337,51 @@ firefly: **Important**: RabbitMQ queues must be pre-declared and configured here. The `@EventListener` destinations are used for filtering messages after they are consumed from these queues, not for determining which queues to subscribe to. +### PostgreSQL Consumer + +The PostgreSQL consumer holds one long-lived R2DBC connection to receive `NOTIFY` messages and creates short-lived connections to drain the outbox table. Configuration lives under `firefly.eda.consumer.postgres.`. Channels are derived from `@EventListener` annotations with `consumerType=POSTGRES` or `consumerType=AUTO`, and additionally from the `channels` property. + +```yaml +firefly: + eda: + consumer: + postgres: + default: + enabled: true + host: "localhost" + port: 5432 + database: "app" + username: "app" + password: "secret" + schema: "public" + outbox-table: "firefly_eda_outbox" + channels: "events,order-events" # comma-separated destinations to LISTEN on + polling-interval: 30s # NOTIFY-loss fallback poll cadence; set to 0s to disable + max-attempts: 3 # rows beyond this attempt count are marked DEAD_LETTER + batch-size: 50 # max rows fetched per poll cycle + max-pool-size: 5 + properties: {} +``` + +| Property | Type | Default | Description | +|----------|------|---------|-------------| +| `enabled` | boolean | `false` | Whether this PostgreSQL consumer connection is enabled | +| `host` | string | `"localhost"` | PostgreSQL host | +| `port` | int | `5432` | PostgreSQL port | +| `database` | string | `null` | Database name | +| `username` | string | `null` | Database username | +| `password` | string | `null` | Database password | +| `schema` | string | `"public"` | Schema containing the outbox table | +| `outbox-table` | string | `"firefly_eda_outbox"` | Outbox table the consumer reads from | +| `channels` | string | `"events"` | Comma-separated destinations to LISTEN on (combined with any from `@EventListener`) | +| `polling-interval` | duration | `30s` | Fallback poll cadence for rows missed by NOTIFY; `0s` disables polling | +| `max-attempts` | int | `3` | Failure attempts before a row transitions to `DEAD_LETTER` | +| `batch-size` | int | `50` | Maximum rows fetched per poll cycle | +| `max-pool-size` | int | `5` | Connection pool size for outbox queries; LISTEN uses one connection on top of this budget | +| `properties` | map | `{}` | Additional R2DBC PostgreSQL startup options | + +**How acknowledgement works**: On successful dispatch, the row is updated to `status='PROCESSED', processed_at=NOW()`. On failure, `attempts` is incremented and `error_message` is recorded; once `attempts >= max-attempts`, the row moves to `status='DEAD_LETTER'`. Use the outbox table directly to inspect, reset, or requeue events. + ### Application Event Consumer ```yaml diff --git a/docs/PUBLISHER_TYPES.md b/docs/PUBLISHER_TYPES.md index 64d5a1e..b81dcb5 100644 --- a/docs/PUBLISHER_TYPES.md +++ b/docs/PUBLISHER_TYPES.md @@ -8,7 +8,7 @@ The `PublisherType` enum defines the following supported messaging platforms: ### AUTO **Description**: Automatically select the best available publisher -**Selection Priority**: KAFKA → RABBITMQ → APPLICATION_EVENT → NOOP +**Selection Priority**: KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT → NOOP **Use Case**: Let the system choose the optimal publisher based on availability and configuration ```java @@ -60,6 +60,53 @@ firefly: default-exchange: events ``` +### POSTGRES +**Description**: PostgreSQL `LISTEN`/`NOTIFY` with a transactional outbox table +**Features**: Persistent storage, payload sizes beyond `NOTIFY`'s 8 kB limit, retry counts, dead-letter status, polling fallback for missed notifications +**Best For**: Services that already use PostgreSQL and want reliable event publishing without an additional broker; outbox-style transactional event publishing +**Persistence**: ✅ Yes (outbox table rows) +**Ordering**: ✅ Yes (per destination -- rows are ordered by `created_at` / `id`) +**Cloud Service**: ❌ No (self-hosted; works with managed PostgreSQL services) + +```yaml +firefly: + eda: + publishers: + postgres: + default: + enabled: true + host: localhost + port: 5432 + database: app + username: app + password: secret + schema: public + outbox-table: firefly_eda_outbox + default-destination: events + auto-create-schema: true # provision outbox table + NOTIFY trigger at startup + consumer: + postgres: + default: + enabled: true + host: localhost + port: 5432 + database: app + username: app + password: secret + channels: events,order-events # destinations to LISTEN on + polling-interval: 30s + max-attempts: 3 +``` + +**How it works**: Each `publish()` performs a single `INSERT` into the +configured outbox table. A trigger fires `pg_notify(channel, id)` for the +new row, so any consumer that has issued `LISTEN` on the matching channel +receives the row id and fetches the full payload from the table. The +consumer marks rows as `PROCESSED` after successful dispatch, increments +`attempts` on failure, and transitions to `DEAD_LETTER` once `max-attempts` +is reached. A periodic poll catches rows that slipped past the live +channel (consumer offline, payload too large, etc.). + ### APPLICATION_EVENT **Description**: Spring Application Event Bus (in-memory) **Features**: Synchronous processing, JVM-local, simple integration @@ -99,23 +146,25 @@ class ServiceTest { When using `PublisherType.AUTO`, the system selects publishers in this priority order: 1. **KAFKA** - If Kafka is configured and available -2. **RABBITMQ** - If RabbitMQ is configured and available -3. **APPLICATION_EVENT** - If Spring context is available (always true) -4. **NOOP** - If explicitly enabled for testing +2. **RABBITMQ** - If RabbitMQ is configured and available +3. **POSTGRES** - If a PostgreSQL EDA connection is configured and available +4. **APPLICATION_EVENT** - If Spring context is available (always true) +5. **NOOP** - If explicitly enabled for testing ## Feature Comparison Matrix -| Feature | KAFKA | RABBITMQ | APPLICATION_EVENT | NOOP | -|---------|-------|----------|------------------|------| -| **Throughput** | Very High | High | High | N/A | -| **Persistence** | ✅ | ✅ | ❌ | ❌ | -| **Ordering** | ✅ | ❌ | ❌ | ❌ | -| **Partitioning** | ✅ | ❌ | ❌ | ❌ | -| **Complex Routing** | ❌ | ✅ | ❌ | ❌ | -| **Guaranteed Delivery** | ✅ | ✅ | ❌ | ❌ | -| **Multi-Instance** | ✅ | ✅ | ❌ | ❌ | -| **Cloud Native** | ✅ | ✅ | ❌ | ❌ | -| **Setup Complexity** | Medium | Medium | Low | None | +| Feature | KAFKA | RABBITMQ | POSTGRES | APPLICATION_EVENT | NOOP | +|---------|-------|----------|----------|------------------|------| +| **Throughput** | Very High | High | Medium | High | N/A | +| **Persistence** | ✅ | ✅ | ✅ | ❌ | ❌ | +| **Ordering** | ✅ | ❌ | ✅ | ❌ | ❌ | +| **Partitioning** | ✅ | ❌ | ❌ | ❌ | ❌ | +| **Complex Routing** | ❌ | ✅ | ❌ | ❌ | ❌ | +| **Guaranteed Delivery** | ✅ | ✅ | ✅ | ❌ | ❌ | +| **Multi-Instance** | ✅ | ✅ | ✅ | ❌ | ❌ | +| **Cloud Native** | ✅ | ✅ | ✅ | ❌ | ❌ | +| **Setup Complexity** | Medium | Medium | Low | Low | None | +| **Requires extra broker** | Yes | Yes | No (reuses DB) | No | No | ## Configuration Examples @@ -182,6 +231,13 @@ public class EventService { - Fan-out patterns are common - Priority queues are required +**Use POSTGRES when:** +- The service already uses PostgreSQL and you want event publishing without operating a separate broker +- You need transactional outbox semantics (event publish ties to a database transaction) +- Moderate throughput is sufficient (the table acts as the queue) +- You want persistent, auditable events with built-in retry / dead-letter status +- You can tolerate a small NOTIFY-delivery delay covered by the polling fallback + **Use APPLICATION_EVENT when:** - Simple internal communication is needed - Single-instance deployment diff --git a/pom.xml b/pom.xml index 5c8da65..f0e81b3 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,20 @@ spring-boot-starter-amqp + + + io.r2dbc + r2dbc-spi + + + io.r2dbc + r2dbc-pool + + + org.postgresql + r2dbc-postgresql + + @@ -175,6 +189,11 @@ rabbitmq test + + org.testcontainers + postgresql + test + diff --git a/src/main/java/org/fireflyframework/eda/annotation/PublisherType.java b/src/main/java/org/fireflyframework/eda/annotation/PublisherType.java index 4584970..71ad143 100644 --- a/src/main/java/org/fireflyframework/eda/annotation/PublisherType.java +++ b/src/main/java/org/fireflyframework/eda/annotation/PublisherType.java @@ -28,7 +28,7 @@ public enum PublisherType { /** * Automatically select the best available publisher. *

- * Selection priority: KAFKA → RABBITMQ → APPLICATION_EVENT + * Selection priority: KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT */ AUTO, @@ -56,6 +56,17 @@ public enum PublisherType { */ RABBITMQ, + /** + * PostgreSQL using LISTEN/NOTIFY with a transactional outbox table. + *

+ * Provides reliable messaging without requiring a dedicated broker by + * persisting events to an outbox table and dispatching notifications via + * {@code pg_notify}. Supports acknowledgment, retries, and dead letter + * semantics through outbox row status updates. Ideal for services that + * already use PostgreSQL and want native transactional event publishing. + */ + POSTGRES, + /** * No-operation publisher that discards all messages. *

@@ -75,6 +86,7 @@ public String getDescription() { case APPLICATION_EVENT -> "Spring Application Events"; case KAFKA -> "Apache Kafka"; case RABBITMQ -> "RabbitMQ"; + case POSTGRES -> "PostgreSQL LISTEN/NOTIFY (transactional outbox)"; case NOOP -> "No-operation (disabled)"; }; } @@ -86,7 +98,7 @@ public String getDescription() { */ public boolean supportsPersistence() { return switch (this) { - case KAFKA, RABBITMQ -> true; + case KAFKA, RABBITMQ, POSTGRES -> true; case APPLICATION_EVENT, NOOP, AUTO -> false; }; } @@ -98,7 +110,7 @@ public boolean supportsPersistence() { */ public boolean supportsOrdering() { return switch (this) { - case KAFKA -> true; + case KAFKA, POSTGRES -> true; case RABBITMQ, APPLICATION_EVENT, NOOP, AUTO -> false; }; } diff --git a/src/main/java/org/fireflyframework/eda/config/FireflyEdaAutoConfiguration.java b/src/main/java/org/fireflyframework/eda/config/FireflyEdaAutoConfiguration.java index be18907..6ec40e3 100644 --- a/src/main/java/org/fireflyframework/eda/config/FireflyEdaAutoConfiguration.java +++ b/src/main/java/org/fireflyframework/eda/config/FireflyEdaAutoConfiguration.java @@ -100,6 +100,21 @@ public FireflyEdaAutoConfiguration(EdaProperties props) { log.info(" - RabbitMQ Publisher: DISABLED"); } + // Postgres Publisher + var postgresPublisher = props.getPublishers().getPostgres().get("default"); + if (postgresPublisher != null && postgresPublisher.isEnabled()) { + String host = postgresPublisher.getHost(); + if (host != null && !host.isEmpty()) { + log.info(" - Postgres Publisher: CONFIGURED (host: {}:{}, table: {}.{})", + host, postgresPublisher.getPort(), + postgresPublisher.getSchema(), postgresPublisher.getOutboxTable()); + } else { + log.info(" - Postgres Publisher: NOT CONFIGURED (host not set)"); + } + } else { + log.info(" - Postgres Publisher: DISABLED"); + } + // Application Event Publisher if (props.getPublishers().getApplicationEvent().isEnabled()) { log.info(" - Application Event Publisher: ENABLED"); @@ -142,6 +157,22 @@ public FireflyEdaAutoConfiguration(EdaProperties props) { log.info(" - RabbitMQ Consumer: DISABLED"); } + // Postgres Consumer + var postgresConsumer = props.getConsumer().getPostgres().get("default"); + if (postgresConsumer != null && postgresConsumer.isEnabled()) { + String host = postgresConsumer.getHost(); + if (host != null && !host.isEmpty()) { + log.info(" - Postgres Consumer: CONFIGURED (host: {}:{}, table: {}.{}, polling: {})", + host, postgresConsumer.getPort(), + postgresConsumer.getSchema(), postgresConsumer.getOutboxTable(), + postgresConsumer.getPollingInterval()); + } else { + log.info(" - Postgres Consumer: NOT CONFIGURED (host not set)"); + } + } else { + log.info(" - Postgres Consumer: DISABLED"); + } + // Application Event Consumer if (props.getConsumer().getApplicationEvent().isEnabled()) { log.info(" - Application Event Consumer: ENABLED"); diff --git a/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresConsumerAutoConfiguration.java b/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresConsumerAutoConfiguration.java new file mode 100644 index 0000000..077a6a8 --- /dev/null +++ b/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresConsumerAutoConfiguration.java @@ -0,0 +1,111 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.config; + +import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; +import io.r2dbc.postgresql.PostgresqlConnectionFactory; +import io.r2dbc.spi.ConnectionFactory; +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eda.properties.EdaProperties; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + +import java.util.Map; + +/** + * Auto-configuration for the PostgreSQL EDA consumer infrastructure. + *

+ * Produces a dedicated {@link PostgresqlConnectionFactory} the consumer uses + * for the long-lived {@code LISTEN} connection as well as for short-lived + * {@code SELECT}/{@code UPDATE} statements that drain the outbox table. The + * factory is not wrapped in a pool: a single {@code LISTEN} connection is + * kept open for the consumer's lifetime, while query connections are created + * on demand and closed immediately after use. + *

+ * Configuration source: + * {@code firefly.eda.consumer.postgres.default.*} -- never + * {@code spring.r2dbc.*}. + */ +@AutoConfiguration(after = FireflyEdaAutoConfiguration.class) +@ConditionalOnClass({ConnectionFactory.class, PostgresqlConnectionFactory.class}) +@ConditionalOnProperty(prefix = "firefly.eda", name = "enabled", havingValue = "true", matchIfMissing = true) +@EnableConfigurationProperties(EdaProperties.class) +@Slf4j +public class FireflyEdaPostgresConsumerAutoConfiguration { + + public FireflyEdaPostgresConsumerAutoConfiguration(EdaProperties props) { + if (props.getConsumer().isEnabled()) { + var postgresConsumer = props.getConsumer().getPostgres().get("default"); + if (postgresConsumer != null && postgresConsumer.isEnabled() + && postgresConsumer.getHost() != null && !postgresConsumer.getHost().isEmpty()) { + log.info("--------------------------------------------------------------------------------"); + log.info("FIREFLY EDA POSTGRES CONSUMER - INITIALIZING"); + log.info("--------------------------------------------------------------------------------"); + } else { + log.debug("Firefly EDA Postgres Consumer auto-configuration loaded but not creating beans (disabled or not configured)"); + } + } else { + log.debug("Firefly EDA Postgres Consumer auto-configuration loaded but not creating beans (consumers globally disabled)"); + } + } + + @Bean(name = "fireflyEdaPostgresConsumerConnectionFactory") + @ConditionalOnMissingBean(name = "fireflyEdaPostgresConsumerConnectionFactory") + @ConditionalOnExpression("${firefly.eda.consumer.enabled:false} && ${firefly.eda.consumer.postgres.default.enabled:false} && '${firefly.eda.consumer.postgres.default.host:}'.length() > 0") + public PostgresqlConnectionFactory fireflyEdaPostgresConsumerConnectionFactory(EdaProperties props) { + EdaProperties.Consumer.PostgresConfig cfg = props.getConsumer().getPostgres().get("default"); + log.info("Creating PostgreSQL EDA consumer ConnectionFactory: host={}:{}, database={}, schema={}, table={}, channels={}, pollingInterval={}", + cfg.getHost(), cfg.getPort(), cfg.getDatabase(), cfg.getSchema(), cfg.getOutboxTable(), + cfg.getChannels(), cfg.getPollingInterval()); + + PostgresqlConnectionConfiguration.Builder builder = PostgresqlConnectionConfiguration.builder() + .host(cfg.getHost()) + .port(cfg.getPort()); + if (cfg.getDatabase() != null) { + builder.database(cfg.getDatabase()); + } + if (cfg.getUsername() != null) { + builder.username(cfg.getUsername()); + } + if (cfg.getPassword() != null) { + builder.password(cfg.getPassword()); + } + if (cfg.getSchema() != null) { + builder.schema(cfg.getSchema()); + } + if (cfg.getProperties() != null && !cfg.getProperties().isEmpty()) { + java.util.Map options = new java.util.HashMap<>(); + for (Map.Entry entry : cfg.getProperties().entrySet()) { + if (entry.getValue() != null) { + options.put(entry.getKey(), entry.getValue().toString()); + } + } + if (!options.isEmpty()) { + builder.options(options); + } + } + PostgresqlConnectionFactory factory = new PostgresqlConnectionFactory(builder.build()); + log.info("PostgreSQL EDA consumer ConnectionFactory created successfully"); + log.info("--------------------------------------------------------------------------------"); + return factory; + } +} diff --git a/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresPublisherAutoConfiguration.java b/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresPublisherAutoConfiguration.java new file mode 100644 index 0000000..8d62d38 --- /dev/null +++ b/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresPublisherAutoConfiguration.java @@ -0,0 +1,202 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.config; + +import io.r2dbc.pool.ConnectionPool; +import io.r2dbc.pool.ConnectionPoolConfiguration; +import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; +import io.r2dbc.postgresql.PostgresqlConnectionFactory; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eda.properties.EdaProperties; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.Map; + +/** + * Auto-configuration for the PostgreSQL EDA publisher infrastructure. + *

+ * Creates a pooled {@link ConnectionFactory} dedicated to writing outbox rows + * and, when {@code firefly.eda.publishers.postgres.default.auto-create-schema} + * is {@code true}, provisions the outbox table, supporting index, NOTIFY + * function, and AFTER INSERT trigger required by the + * {@link org.fireflyframework.eda.publisher.postgres.PostgresEventPublisher}. + *

+ * Configuration source: + * {@code firefly.eda.publishers.postgres.default.*} -- never + * {@code spring.r2dbc.*}. + */ +@AutoConfiguration(after = FireflyEdaAutoConfiguration.class) +@ConditionalOnClass({ConnectionFactory.class, PostgresqlConnectionFactory.class}) +@ConditionalOnProperty(prefix = "firefly.eda", name = "enabled", havingValue = "true", matchIfMissing = true) +@EnableConfigurationProperties(EdaProperties.class) +@Slf4j +public class FireflyEdaPostgresPublisherAutoConfiguration { + + public FireflyEdaPostgresPublisherAutoConfiguration(EdaProperties props) { + if (props.getPublishers().isEnabled()) { + var postgresPublisher = props.getPublishers().getPostgres().get("default"); + if (postgresPublisher != null && postgresPublisher.isEnabled() + && postgresPublisher.getHost() != null && !postgresPublisher.getHost().isEmpty()) { + log.info("--------------------------------------------------------------------------------"); + log.info("FIREFLY EDA POSTGRES PUBLISHER - INITIALIZING"); + log.info("--------------------------------------------------------------------------------"); + } else { + log.debug("Firefly EDA Postgres Publisher auto-configuration loaded but not creating beans (disabled or not configured)"); + } + } else { + log.debug("Firefly EDA Postgres Publisher auto-configuration loaded but not creating beans (publishers globally disabled)"); + } + } + + /** + * Creates the pooled R2DBC {@link ConnectionFactory} the publisher uses to + * insert outbox rows. The bean is created only when the publisher is + * explicitly enabled and a host is configured under + * {@code firefly.eda.publishers.postgres.default.host}. + */ + @Bean(name = "fireflyEdaPostgresPublisherConnectionFactory", destroyMethod = "dispose") + @ConditionalOnMissingBean(name = "fireflyEdaPostgresPublisherConnectionFactory") + @ConditionalOnExpression("${firefly.eda.publishers.enabled:false} && ${firefly.eda.publishers.postgres.default.enabled:false} && '${firefly.eda.publishers.postgres.default.host:}'.length() > 0") + public ConnectionPool fireflyEdaPostgresPublisherConnectionFactory(EdaProperties props) { + EdaProperties.Publishers.PostgresConfig cfg = props.getPublishers().getPostgres().get("default"); + log.info("Creating PostgreSQL EDA publisher ConnectionFactory: host={}:{}, database={}, schema={}, table={}", + cfg.getHost(), cfg.getPort(), cfg.getDatabase(), cfg.getSchema(), cfg.getOutboxTable()); + + PostgresqlConnectionConfiguration.Builder builder = PostgresqlConnectionConfiguration.builder() + .host(cfg.getHost()) + .port(cfg.getPort()); + if (cfg.getDatabase() != null) { + builder.database(cfg.getDatabase()); + } + if (cfg.getUsername() != null) { + builder.username(cfg.getUsername()); + } + if (cfg.getPassword() != null) { + builder.password(cfg.getPassword()); + } + if (cfg.getSchema() != null) { + builder.schema(cfg.getSchema()); + } + if (cfg.getProperties() != null && !cfg.getProperties().isEmpty()) { + java.util.Map options = new java.util.HashMap<>(); + for (Map.Entry entry : cfg.getProperties().entrySet()) { + if (entry.getValue() != null) { + options.put(entry.getKey(), entry.getValue().toString()); + } + } + if (!options.isEmpty()) { + builder.options(options); + } + } + PostgresqlConnectionFactory delegate = new PostgresqlConnectionFactory(builder.build()); + + ConnectionPoolConfiguration poolConfig = ConnectionPoolConfiguration.builder(delegate) + .maxSize(Math.max(1, cfg.getMaxPoolSize())) + .initialSize(1) + .maxIdleTime(Duration.ofMinutes(5)) + .build(); + ConnectionPool pool = new ConnectionPool(poolConfig); + + if (cfg.isAutoCreateSchema()) { + initialiseSchema(pool, cfg); + } + + log.info("PostgreSQL EDA publisher ConnectionFactory created successfully"); + log.info("--------------------------------------------------------------------------------"); + return pool; + } + + /** + * Provisions the outbox table, supporting index, NOTIFY function and + * trigger. Statements are idempotent so repeated startups are safe. + */ + private void initialiseSchema(ConnectionFactory factory, + EdaProperties.Publishers.PostgresConfig cfg) { + String schema = cfg.getSchema() != null ? cfg.getSchema() : "public"; + String table = cfg.getOutboxTable() != null ? cfg.getOutboxTable() : "firefly_eda_outbox"; + String qualified = "\"" + schema + "\".\"" + table + "\""; + String triggerName = table + "_notify"; + String functionName = "\"" + schema + "\".firefly_eda_notify_event"; + + String createTable = "CREATE TABLE IF NOT EXISTS " + qualified + " (" + + "id BIGSERIAL PRIMARY KEY, " + + "destination VARCHAR(255) NOT NULL, " + + "channel VARCHAR(63) NOT NULL, " + + "event_type VARCHAR(255), " + + "payload BYTEA NOT NULL, " + + "headers JSONB, " + + "transaction_id VARCHAR(255), " + + "publisher_type VARCHAR(50), " + + "connection_id VARCHAR(50), " + + "event_class VARCHAR(500), " + + "status VARCHAR(20) NOT NULL DEFAULT 'PENDING', " + + "attempts INT NOT NULL DEFAULT 0, " + + "error_message TEXT, " + + "created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), " + + "processed_at TIMESTAMPTZ, " + + "failed_at TIMESTAMPTZ)"; + + String createIndex = "CREATE INDEX IF NOT EXISTS " + table + "_status_created_idx " + + "ON " + qualified + " (status, created_at) WHERE status = 'PENDING'"; + String createChannelIndex = "CREATE INDEX IF NOT EXISTS " + table + "_channel_idx " + + "ON " + qualified + " (channel, status)"; + + String createFunction = "CREATE OR REPLACE FUNCTION " + functionName + "() RETURNS TRIGGER AS $$ " + + "BEGIN PERFORM pg_notify(NEW.channel, NEW.id::text); RETURN NEW; END; " + + "$$ LANGUAGE plpgsql"; + + String dropTrigger = "DROP TRIGGER IF EXISTS " + triggerName + " ON " + qualified; + String createTrigger = "CREATE TRIGGER " + triggerName + " AFTER INSERT ON " + qualified + + " FOR EACH ROW EXECUTE FUNCTION " + functionName + "()"; + + try { + Mono.usingWhen( + Mono.from(factory.create()), + connection -> executeAll(connection, + createTable, createIndex, createChannelIndex, + createFunction, dropTrigger, createTrigger), + Connection::close) + .block(Duration.ofSeconds(30)); + log.info("PostgreSQL EDA outbox schema ensured: {}.{}", schema, table); + } catch (Exception e) { + log.error("Failed to initialise PostgreSQL EDA outbox schema {}.{}: {}", + schema, table, e.getMessage()); + log.error("Publishing will fail until the schema is created manually"); + } + } + + private Mono executeAll(Connection connection, String... statements) { + Mono chain = Mono.empty(); + for (String sql : statements) { + String stmt = sql; + chain = chain.then(Mono.from(connection.createStatement(stmt).execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .then()); + } + return chain; + } +} diff --git a/src/main/java/org/fireflyframework/eda/consumer/postgres/PostgresEventConsumer.java b/src/main/java/org/fireflyframework/eda/consumer/postgres/PostgresEventConsumer.java new file mode 100644 index 0000000..a73b2d7 --- /dev/null +++ b/src/main/java/org/fireflyframework/eda/consumer/postgres/PostgresEventConsumer.java @@ -0,0 +1,631 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.consumer.postgres; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.r2dbc.postgresql.api.PostgresqlConnection; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.Result; +import io.r2dbc.spi.Row; +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eda.annotation.PublisherType; +import org.fireflyframework.eda.consumer.ConsumerHealth; +import org.fireflyframework.eda.consumer.EventConsumer; +import org.fireflyframework.eda.event.EventEnvelope; +import org.fireflyframework.eda.listener.EventListenerProcessor; +import org.fireflyframework.eda.properties.EdaProperties; +import org.fireflyframework.eda.publisher.postgres.PostgresChannelMapper; +import org.fireflyframework.eda.serialization.MessageSerializer; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.DependsOn; +import org.springframework.stereotype.Component; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; + +import jakarta.annotation.PreDestroy; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * PostgreSQL implementation of {@link EventConsumer}. + *

+ * The consumer dedicates a single long-lived R2DBC connection to receive + * {@code NOTIFY} messages, and uses a pooled {@link ConnectionFactory} for + * the {@code SELECT}/{@code UPDATE} statements that drain the outbox table. + * Channels are derived from {@code @EventListener} annotations discovered by + * {@link EventListenerProcessor}; if none are present, the channels listed + * in {@code firefly.eda.consumer.postgres.default.channels} are used. + *

+ * For each notification, the corresponding outbox row is fetched, an + * {@link EventEnvelope} is built, the event is dispatched through the + * listener processor, and the row is marked {@code PROCESSED} on success or + * incremented (and possibly transitioned to {@code DEAD_LETTER}) on failure. + * A periodic poll catches any rows that slipped past the live channel — for + * example, when the listener was offline at insert time or the payload + * exceeded the {@code NOTIFY} limit and the dispatcher had to fall back to a + * scan. + */ +@Component +@ConditionalOnClass(ConnectionFactory.class) +@ConditionalOnBean(name = "fireflyEdaPostgresConsumerConnectionFactory") +@ConditionalOnProperty(prefix = "firefly.eda.consumer", name = "enabled", havingValue = "true") +@DependsOn("eventListenerProcessor") +@Slf4j +public class PostgresEventConsumer implements EventConsumer { + + private final ObjectProvider connectionFactoryProvider; + private final EventListenerProcessor eventListenerProcessor; + private final MessageSerializer messageSerializer; + private final EdaProperties edaProperties; + private final ObjectMapper objectMapper; + + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicLong messagesConsumed = new AtomicLong(); + private final AtomicLong messagesProcessed = new AtomicLong(); + private final AtomicLong messagesFailures = new AtomicLong(); + private final AtomicReference listenConnection = new AtomicReference<>(); + private final AtomicReference notificationSubscription = new AtomicReference<>(); + private final AtomicReference pollingSubscription = new AtomicReference<>(); + private volatile Sinks.Many eventSink = Sinks.many().multicast().onBackpressureBuffer(); + private final Set subscribedChannels = Collections.synchronizedSet(new LinkedHashSet<>()); + + public PostgresEventConsumer( + @Qualifier("fireflyEdaPostgresConsumerConnectionFactory") + ObjectProvider connectionFactoryProvider, + EventListenerProcessor eventListenerProcessor, + MessageSerializer messageSerializer, + EdaProperties edaProperties, + ObjectMapper objectMapper) { + this.connectionFactoryProvider = connectionFactoryProvider; + this.eventListenerProcessor = eventListenerProcessor; + this.messageSerializer = messageSerializer; + this.edaProperties = edaProperties; + this.objectMapper = objectMapper; + eventListenerProcessor.registerListenerChangeCallback(this::refreshChannels); + log.info("Postgres event consumer initialised; channels will be (re)subscribed at start()"); + } + + @Override + public Flux consume() { + return eventSink.asFlux() + .doOnSubscribe(subscription -> { + if (!running.get()) { + start().subscribe(); + } + }); + } + + @Override + public Flux consume(String... destinations) { + if (destinations == null || destinations.length == 0) { + return consume(); + } + Set filter = new HashSet<>(Arrays.asList(destinations)); + return consume().filter(envelope -> envelope.destination() != null + && filter.contains(envelope.destination())); + } + + @Override + public Mono start() { + return Mono.defer(() -> { + if (!running.compareAndSet(false, true)) { + log.debug("Postgres event consumer already running"); + return Mono.empty(); + } + eventSink = Sinks.many().multicast().onBackpressureBuffer(); + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + if (factory == null) { + running.set(false); + return Mono.error(new IllegalStateException( + "PostgreSQL EDA consumer ConnectionFactory is not available")); + } + return openListenConnection(factory) + .flatMap(this::subscribeToChannels) + .doOnSuccess(unused -> startPolling()) + .doOnError(e -> { + log.error("Failed to start Postgres event consumer", e); + running.set(false); + }); + }); + } + + @Override + public Mono stop() { + return Mono.defer(() -> { + if (!running.compareAndSet(true, false)) { + return Mono.empty(); + } + log.info("Stopping Postgres event consumer"); + disposeQuietly(notificationSubscription.getAndSet(null)); + disposeQuietly(pollingSubscription.getAndSet(null)); + eventSink.tryEmitComplete(); + PostgresqlConnection connection = listenConnection.getAndSet(null); + if (connection == null) { + return Mono.empty(); + } + return Mono.from(connection.close()) + .doOnError(e -> log.warn("Error closing Postgres LISTEN connection", e)) + .onErrorResume(e -> Mono.empty()) + .then(); + }); + } + + @PreDestroy + public void shutdown() { + stop().block(Duration.ofSeconds(5)); + } + + @Override + public boolean isRunning() { + return running.get(); + } + + @Override + public String getConsumerType() { + return "POSTGRES"; + } + + @Override + public boolean isAvailable() { + return edaProperties.getConsumer().isEnabled() + && connectionFactoryProvider.getIfAvailable() != null; + } + + @Override + public Mono getHealth() { + Map details = new HashMap<>(); + details.put("running", isRunning()); + details.put("subscribed_channels", new LinkedHashSet<>(subscribedChannels)); + details.put("polling_interval", config().getPollingInterval().toString()); + details.put("outbox_table", qualifiedTable(config())); + return Mono.just(ConsumerHealth.builder() + .consumerType(getConsumerType()) + .available(isAvailable()) + .running(isRunning()) + .status(isAvailable() && isRunning() ? "UP" : "DOWN") + .messagesConsumed(messagesConsumed.get()) + .messagesProcessed(messagesProcessed.get()) + .messagesFailures(messagesFailures.get()) + .lastChecked(Instant.now()) + .details(details) + .build()); + } + + private Mono openListenConnection(ConnectionFactory factory) { + return Mono.from(factory.create()) + .flatMap(connection -> { + if (!(connection instanceof PostgresqlConnection pg)) { + return Mono.from(connection.close()) + .then(Mono.error(new IllegalStateException( + "Expected PostgresqlConnection for LISTEN, got " + + connection.getClass().getName()))); + } + listenConnection.set(pg); + Disposable subscription = pg.getNotifications() + .subscribeOn(Schedulers.boundedElastic()) + .subscribe( + notification -> handleNotification(notification.getName(), + notification.getParameter()), + error -> log.error("Postgres notification stream errored", error)); + notificationSubscription.set(subscription); + return Mono.just(pg); + }); + } + + private Mono subscribeToChannels(PostgresqlConnection connection) { + Set channels = resolveChannels(); + if (channels.isEmpty()) { + log.warn("Postgres consumer has no channels configured; no events will be received"); + return Mono.empty(); + } + return Flux.fromIterable(channels) + .concatMap(channel -> Mono.from(connection.createStatement( + "LISTEN " + quoteIdent(channel)).execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .doOnSuccess(rows -> { + subscribedChannels.add(channel); + log.info("Postgres consumer LISTENing on channel '{}'", channel); + }) + .then()) + .then(); + } + + private void startPolling() { + Duration interval = config().getPollingInterval(); + if (interval == null || interval.isZero() || interval.isNegative()) { + log.debug("Postgres consumer polling disabled (pollingInterval={})", interval); + return; + } + Disposable subscription = Flux.interval(interval, interval, Schedulers.boundedElastic()) + .onBackpressureDrop() + .concatMap(tick -> pollPendingEvents()) + .subscribe( + ignored -> { }, + error -> log.warn("Postgres consumer polling failed; will retry on next tick", + error)); + pollingSubscription.set(subscription); + log.info("Postgres consumer polling every {} as a NOTIFY fallback", interval); + } + + private Mono pollPendingEvents() { + EdaProperties.Consumer.PostgresConfig cfg = config(); + Set channels = subscribedChannels; + if (channels.isEmpty()) { + return Mono.empty(); + } + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + if (factory == null) { + return Mono.empty(); + } + String sql = "SELECT id FROM " + qualifiedTable(cfg) + + " WHERE status = 'PENDING' AND attempts < $1 AND channel = ANY($2) " + + "ORDER BY created_at LIMIT $3"; + String[] channelsArr = channels.toArray(new String[0]); + int maxAttempts = cfg.getMaxAttempts(); + int batchSize = cfg.getBatchSize(); + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> Flux.from(connection.createStatement(sql) + .bind("$1", maxAttempts) + .bind("$2", channelsArr) + .bind("$3", batchSize) + .execute()) + .flatMap(result -> result.map((row, meta) -> row.get("id", Long.class))) + .concatMap(this::handleEventById) + .then(), + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close); + } + + private void handleNotification(String channel, String parameter) { + log.debug("Postgres consumer received NOTIFY on channel '{}' with parameter '{}'", + channel, parameter); + Long id = parseLong(parameter); + if (id == null) { + log.warn("Skipping notification with non-numeric parameter '{}'", parameter); + return; + } + handleEventById(id) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe( + ignored -> { }, + error -> log.error("Failed to handle event id {} from channel '{}'", id, channel, + error)); + } + + private Mono handleEventById(Long id) { + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + if (factory == null || id == null) { + return Mono.empty(); + } + EdaProperties.Consumer.PostgresConfig cfg = config(); + String table = qualifiedTable(cfg); + String sql = "UPDATE " + table + " SET attempts = attempts + 1 " + + "WHERE id = $1 AND status = 'PENDING' " + + "RETURNING id, destination, channel, event_type, payload, headers, " + + "transaction_id, publisher_type, connection_id, event_class, attempts"; + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> Flux.from(connection.createStatement(sql).bind("$1", id).execute()) + .flatMap(result -> result.map((row, meta) -> outboxRowFrom(row))) + .next() + .flatMap(this::dispatch), + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close); + } + + private OutboxRow outboxRowFrom(Row row) { + Long rowId = row.get("id", Long.class); + String destination = row.get("destination", String.class); + String channel = row.get("channel", String.class); + String eventType = row.get("event_type", String.class); + ByteArrayHolder payload = readPayload(row); + String headersJson = row.get("headers", String.class); + String transactionId = row.get("transaction_id", String.class); + String publisherType = row.get("publisher_type", String.class); + String connectionId = row.get("connection_id", String.class); + String eventClass = row.get("event_class", String.class); + Integer attempts = row.get("attempts", Integer.class); + return new OutboxRow(rowId, destination, channel, eventType, payload.bytes(), headersJson, + transactionId, publisherType, connectionId, eventClass, attempts); + } + + private ByteArrayHolder readPayload(Row row) { + byte[] direct = row.get("payload", byte[].class); + if (direct != null) { + return new ByteArrayHolder(direct); + } + java.nio.ByteBuffer buffer = row.get("payload", java.nio.ByteBuffer.class); + if (buffer != null) { + byte[] dst = new byte[buffer.remaining()]; + buffer.get(dst); + return new ByteArrayHolder(dst); + } + return new ByteArrayHolder(new byte[0]); + } + + private Mono dispatch(OutboxRow row) { + messagesConsumed.incrementAndGet(); + Map headers = deserializeHeaders(row.headersJson()); + headers.putIfAbsent("destination", row.destination()); + headers.putIfAbsent("event_class", row.eventClass()); + if (row.transactionId() != null) { + headers.putIfAbsent("transaction_id", row.transactionId()); + } + Object event = deserializeEvent(row.payload(), row.eventClass()); + EventEnvelope envelope = EventEnvelope.forConsuming( + row.destination(), + row.eventType() != null ? row.eventType() : eventTypeFrom(event), + event, + row.transactionId(), + headers, + EventEnvelope.EventMetadata.empty(), + Instant.now(), + getConsumerType(), + row.connectionId() != null ? row.connectionId() : "default", + new OutboxAckCallback(row.id())); + emit(envelope); + return eventListenerProcessor.processEvent(event, headers) + .then(markProcessed(row.id())) + .doOnSuccess(v -> messagesProcessed.incrementAndGet()) + .onErrorResume(error -> { + messagesFailures.incrementAndGet(); + log.error("Listener pipeline rejected event id {}; recording failure", row.id(), + error); + return markFailed(row.id(), error); + }); + } + + private void emit(EventEnvelope envelope) { + Sinks.EmitResult result = eventSink.tryEmitNext(envelope); + if (result.isFailure()) { + log.debug("Postgres consumer dropped envelope for destination {} ({}): no active subscriber", + envelope.destination(), result); + } + } + + private Mono markProcessed(Long id) { + return updateStatus(id, "UPDATE " + qualifiedTable(config()) + + " SET status = 'PROCESSED', processed_at = NOW(), error_message = NULL " + + "WHERE id = $1"); + } + + private Mono markFailed(Long id, Throwable error) { + EdaProperties.Consumer.PostgresConfig cfg = config(); + int maxAttempts = cfg.getMaxAttempts(); + String table = qualifiedTable(cfg); + String sql = "UPDATE " + table + " SET status = CASE " + + "WHEN attempts >= $2 THEN 'DEAD_LETTER' ELSE 'PENDING' END, " + + "failed_at = NOW(), error_message = $3 WHERE id = $1"; + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + if (factory == null) { + return Mono.empty(); + } + String message = error != null && error.getMessage() != null + ? error.getMessage() : "unknown error"; + if (message.length() > 4000) { + message = message.substring(0, 4000); + } + String finalMessage = message; + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> Mono.from(connection.createStatement(sql) + .bind("$1", id) + .bind("$2", maxAttempts) + .bind("$3", finalMessage) + .execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .then(), + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close); + } + + private Mono updateStatus(Long id, String sql) { + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + if (factory == null) { + return Mono.empty(); + } + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> Mono.from(connection.createStatement(sql).bind("$1", id).execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .then(), + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close); + } + + private void refreshChannels() { + if (!running.get()) { + return; + } + PostgresqlConnection connection = listenConnection.get(); + if (connection == null) { + return; + } + Set desired = resolveChannels(); + Set toListen = new LinkedHashSet<>(desired); + toListen.removeAll(subscribedChannels); + Set toUnlisten = new LinkedHashSet<>(subscribedChannels); + toUnlisten.removeAll(desired); + + if (toListen.isEmpty() && toUnlisten.isEmpty()) { + return; + } + log.info("Refreshing Postgres LISTEN channels: +{} -{}", toListen, toUnlisten); + Flux.fromIterable(toListen) + .concatMap(channel -> Mono.from(connection.createStatement( + "LISTEN " + quoteIdent(channel)).execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .doOnSuccess(rows -> subscribedChannels.add(channel)) + .then()) + .thenMany(Flux.fromIterable(toUnlisten) + .concatMap(channel -> Mono.from(connection.createStatement( + "UNLISTEN " + quoteIdent(channel)).execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .doOnSuccess(rows -> subscribedChannels.remove(channel)) + .then())) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe( + ignored -> { }, + error -> log.error("Failed to refresh Postgres LISTEN channels", error)); + } + + private Set resolveChannels() { + Set destinations = new LinkedHashSet<>( + eventListenerProcessor.getTopicsForConsumerType("POSTGRES")); + String configured = config().getChannels(); + if (configured != null && !configured.isBlank()) { + for (String dest : configured.split(",")) { + String trimmed = dest.trim(); + if (!trimmed.isEmpty()) { + destinations.add(trimmed); + } + } + } + Set channels = new LinkedHashSet<>(); + for (String destination : destinations) { + if ("*".equals(destination)) { + continue; + } + channels.add(PostgresChannelMapper.toChannel(destination)); + } + return channels; + } + + private Object deserializeEvent(byte[] payload, String eventClassName) { + if (payload == null || payload.length == 0) { + return Collections.emptyMap(); + } + if (eventClassName != null) { + try { + Class clazz = Class.forName(eventClassName); + return messageSerializer.deserialize(payload, clazz); + } catch (ClassNotFoundException e) { + log.warn("Event class '{}' not found, falling back to Object.class", eventClassName); + } catch (Exception e) { + log.warn("Failed to deserialize event as {}: {}", eventClassName, e.getMessage()); + } + } + try { + return messageSerializer.deserialize(payload, Object.class); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize Postgres event payload", e); + } + } + + private Map deserializeHeaders(String headersJson) { + if (headersJson == null || headersJson.isBlank()) { + return new HashMap<>(); + } + try { + Map headers = objectMapper.readValue(headersJson, + new TypeReference<>() { }); + return headers != null ? new HashMap<>(headers) : new HashMap<>(); + } catch (Exception e) { + log.warn("Failed to parse outbox headers JSON, using empty map: {}", e.getMessage()); + return new HashMap<>(); + } + } + + private String eventTypeFrom(Object event) { + return event != null ? event.getClass().getSimpleName() : "unknown"; + } + + private EdaProperties.Consumer.PostgresConfig config() { + EdaProperties.Consumer.PostgresConfig cfg = edaProperties.getConsumer().getPostgres() + .getOrDefault("default", null); + return cfg != null ? cfg : new EdaProperties.Consumer.PostgresConfig(); + } + + private String qualifiedTable(EdaProperties.Consumer.PostgresConfig cfg) { + String schema = cfg.getSchema() != null ? cfg.getSchema() : "public"; + String table = cfg.getOutboxTable() != null ? cfg.getOutboxTable() : "firefly_eda_outbox"; + return schema + "." + table; + } + + private static String quoteIdent(String identifier) { + return "\"" + identifier.replace("\"", "\"\"") + "\""; + } + + private static Long parseLong(String value) { + if (value == null) { + return null; + } + try { + return Long.parseLong(value.trim()); + } catch (NumberFormatException e) { + return null; + } + } + + private static void disposeQuietly(Disposable disposable) { + if (disposable != null && !disposable.isDisposed()) { + disposable.dispose(); + } + } + + private record OutboxRow(Long id, String destination, String channel, String eventType, + byte[] payload, String headersJson, String transactionId, + String publisherType, String connectionId, String eventClass, + Integer attempts) { } + + private record ByteArrayHolder(byte[] bytes) { } + + /** + * Acknowledgment callback that marks the outbox row as processed or failed. + */ + private final class OutboxAckCallback implements EventEnvelope.AckCallback { + private final Long id; + + OutboxAckCallback(Long id) { + this.id = id; + } + + @Override + public Mono acknowledge() { + return markProcessed(id); + } + + @Override + public Mono reject(Throwable error) { + return markFailed(id, error); + } + } +} diff --git a/src/main/java/org/fireflyframework/eda/properties/EdaProperties.java b/src/main/java/org/fireflyframework/eda/properties/EdaProperties.java index ae43d08..cf4b02b 100644 --- a/src/main/java/org/fireflyframework/eda/properties/EdaProperties.java +++ b/src/main/java/org/fireflyframework/eda/properties/EdaProperties.java @@ -185,10 +185,17 @@ public static class Publishers { @Valid private final Map rabbitmq = new HashMap<>(); + /** + * PostgreSQL publisher configurations by connection ID. + */ + @Valid + private final Map postgres = new HashMap<>(); + // Initialize default connections public Publishers() { kafka.put("default", new KafkaConfig()); rabbitmq.put("default", new RabbitMqConfig()); + postgres.put("default", new PostgresConfig()); } @Data @@ -219,6 +226,71 @@ public static class RabbitMqConfig { private String defaultRoutingKey = "event"; private Map properties = new HashMap<>(); } + + @Data + public static class PostgresConfig { + /** + * Whether this PostgreSQL publisher connection is enabled. + */ + private boolean enabled = false; + + /** + * PostgreSQL host name or IP. + */ + private String host = "localhost"; + + /** + * PostgreSQL port (default 5432). + */ + private int port = 5432; + + /** + * Database name. + */ + private String database; + + /** + * Database username. + */ + private String username; + + /** + * Database password. + */ + private String password; + + /** + * Schema name where the outbox table lives. + */ + private String schema = "public"; + + /** + * Outbox table name used to persist events. + */ + private String outboxTable = "firefly_eda_outbox"; + + /** + * Default destination/channel when none is provided to publish(). + */ + private String defaultDestination = "events"; + + /** + * If true, the auto-configuration creates the outbox table and + * NOTIFY trigger automatically at startup. Disable in environments + * that prefer to manage schema externally (e.g., via Flyway). + */ + private boolean autoCreateSchema = true; + + /** + * Maximum size of the R2DBC connection pool used by the publisher. + */ + private int maxPoolSize = 10; + + /** + * Additional R2DBC connection properties. + */ + private Map properties = new HashMap<>(); + } } @Data @@ -264,6 +336,12 @@ public static class Consumer { @Valid private final Map rabbitmq = new HashMap<>(); + /** + * PostgreSQL consumer configurations by connection ID. + */ + @Valid + private final Map postgres = new HashMap<>(); + /** * NOOP consumer configuration. */ @@ -274,6 +352,7 @@ public static class Consumer { public Consumer() { kafka.put("default", new KafkaConfig()); rabbitmq.put("default", new RabbitMqConfig()); + postgres.put("default", new PostgresConfig()); } @Data @@ -316,6 +395,90 @@ public static class RabbitMqConfig { private Map properties = new HashMap<>(); } + @Data + public static class PostgresConfig { + /** + * Whether this PostgreSQL consumer connection is enabled. + */ + private boolean enabled = false; + + /** + * PostgreSQL host name or IP. + */ + private String host = "localhost"; + + /** + * PostgreSQL port (default 5432). + */ + private int port = 5432; + + /** + * Database name. + */ + private String database; + + /** + * Database username. + */ + private String username; + + /** + * Database password. + */ + private String password; + + /** + * Schema name where the outbox table lives. + */ + private String schema = "public"; + + /** + * Outbox table name used to read events. + */ + private String outboxTable = "firefly_eda_outbox"; + + /** + * Comma-separated list of channels/destinations to listen on when no + * {@code @EventListener} annotations are discovered. Defaults to + * the global default destination. + */ + private String channels = "events"; + + /** + * Polling interval used as a fallback to LISTEN/NOTIFY in case + * notifications are missed (server restart, dropped connection, + * payload too large, etc.). Set to {@link Duration#ZERO} to + * disable polling. + */ + private Duration pollingInterval = Duration.ofSeconds(30); + + /** + * Maximum number of attempts before an event is moved to dead + * letter status in the outbox table. + */ + @Min(value = 1, message = "Max attempts must be at least 1") + private int maxAttempts = 3; + + /** + * Maximum number of events fetched in a single poll cycle. + */ + @Min(value = 1, message = "Batch size must be at least 1") + private int batchSize = 50; + + /** + * Maximum size of the R2DBC connection pool used by the consumer. + * Note: LISTEN/NOTIFY uses one long-lived connection outside this + * pool; this pool services SELECT/UPDATE queries on the outbox. + */ + @Min(value = 1, message = "Max pool size must be at least 1") + private int maxPoolSize = 5; + + /** + * Additional R2DBC connection properties. + */ + private Map properties = new HashMap<>(); + } + @Data public static class Noop { private boolean enabled = false; @@ -345,6 +508,7 @@ public Object getPublisherConfig(PublisherType publisherType, String connectionI case APPLICATION_EVENT -> publishers.getApplicationEvent(); case KAFKA -> publishers.getKafka().get(connId); case RABBITMQ -> publishers.getRabbitmq().get(connId); + case POSTGRES -> publishers.getPostgres().get(connId); default -> null; }; } diff --git a/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java b/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java index 888b045..dd3be85 100644 --- a/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java +++ b/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java @@ -238,10 +238,11 @@ private EventPublisher createPublisher(PublisherType publisherType, String conne * @return the selected publisher or null if none available */ private EventPublisher getAutoSelectedPublisher(String connectionId) { - // Priority order: KAFKA → RABBITMQ → APPLICATION_EVENT + // Priority order: KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT PublisherType[] priorityOrder = { PublisherType.KAFKA, PublisherType.RABBITMQ, + PublisherType.POSTGRES, PublisherType.APPLICATION_EVENT }; diff --git a/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapper.java b/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapper.java new file mode 100644 index 0000000..2b774c0 --- /dev/null +++ b/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapper.java @@ -0,0 +1,94 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.publisher.postgres; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * Maps arbitrary destination strings (e.g., {@code "order-events"}, + * {@code "user.created"}) to deterministic, valid PostgreSQL channel names + * suitable for use with {@code LISTEN}/{@code NOTIFY}. + *

+ * PostgreSQL identifiers are limited to 63 bytes (NAMEDATALEN - 1). Channel + * names must be valid SQL identifiers when not quoted. To keep the mapping + * deterministic across publisher and consumer: + *

    + *
  • Non-alphanumeric characters are converted to {@code _}.
  • + *
  • The result is lowercased and prefixed with {@code firefly_eda_}.
  • + *
  • If the result exceeds 63 bytes, the tail is replaced with an 8-char + * lower-case hex SHA-256 digest of the original destination to keep the + * mapping unique.
  • + *
+ */ +public final class PostgresChannelMapper { + + private static final String PREFIX = "firefly_eda_"; + private static final int MAX_CHANNEL_LENGTH = 63; + + private PostgresChannelMapper() { + // utility + } + + /** + * Converts the given destination into a deterministic PostgreSQL channel name. + * + * @param destination the logical destination (topic/queue/exchange name) + * @return a valid PostgreSQL channel name, never null + */ + public static String toChannel(String destination) { + if (destination == null || destination.isBlank()) { + return PREFIX + "default"; + } + + StringBuilder sanitized = new StringBuilder(destination.length()); + for (int i = 0; i < destination.length(); i++) { + char c = destination.charAt(i); + if ((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9')) { + sanitized.append(c); + } else if (c >= 'A' && c <= 'Z') { + sanitized.append(Character.toLowerCase(c)); + } else { + sanitized.append('_'); + } + } + + String candidate = PREFIX + sanitized; + if (candidate.length() <= MAX_CHANNEL_LENGTH) { + return candidate; + } + + String suffix = "_" + shortHash(destination); + int keep = MAX_CHANNEL_LENGTH - suffix.length(); + return candidate.substring(0, keep) + suffix; + } + + private static String shortHash(String value) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] bytes = digest.digest(value.getBytes(StandardCharsets.UTF_8)); + StringBuilder hex = new StringBuilder(8); + for (int i = 0; i < 4; i++) { + hex.append(String.format("%02x", bytes[i])); + } + return hex.toString(); + } catch (NoSuchAlgorithmException e) { + return Integer.toHexString(value.hashCode()); + } + } +} diff --git a/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresEventPublisher.java b/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresEventPublisher.java new file mode 100644 index 0000000..24a5538 --- /dev/null +++ b/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresEventPublisher.java @@ -0,0 +1,318 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.publisher.postgres; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.Result; +import io.r2dbc.spi.Statement; +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eda.annotation.PublisherType; +import org.fireflyframework.eda.properties.EdaProperties; +import org.fireflyframework.eda.publisher.ConnectionAwarePublisher; +import org.fireflyframework.eda.publisher.EventPublisher; +import org.fireflyframework.eda.publisher.PublisherHealth; +import org.fireflyframework.eda.serialization.MessageSerializer; +import org.reactivestreams.Publisher; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +/** + * PostgreSQL implementation of {@link EventPublisher} using the transactional + * outbox pattern combined with {@code LISTEN}/{@code NOTIFY}. + *

+ * Each {@link #publish(Object, String, Map)} call performs a single + * {@code INSERT} into the configured outbox table. A database trigger fires + * {@code pg_notify(channel, id)} for every inserted row so any consumer that + * is {@code LISTEN}ing on the matching channel receives the new event id and + * can fetch the full payload from the outbox row. + *

+ * Benefits over raw {@code NOTIFY}: + *

    + *
  • Payloads beyond PostgreSQL's 8000-byte NOTIFY limit are supported.
  • + *
  • Events survive consumer crashes (status remains {@code PENDING} + * until processed).
  • + *
  • Retry counts and dead-letter status are tracked per row.
  • + *
  • The {@code INSERT} can participate in the caller's transaction so a + * publish failure rolls back the business write.
  • + *
+ */ +@Component +@ConditionalOnClass(ConnectionFactory.class) +@ConditionalOnBean(name = "fireflyEdaPostgresPublisherConnectionFactory") +@Slf4j +public class PostgresEventPublisher implements EventPublisher, ConnectionAwarePublisher { + + private final ObjectProvider connectionFactoryProvider; + private final MessageSerializer messageSerializer; + private final EdaProperties edaProperties; + private final ObjectMapper objectMapper; + private String connectionId = "default"; + + public PostgresEventPublisher( + @Qualifier("fireflyEdaPostgresPublisherConnectionFactory") + ObjectProvider connectionFactoryProvider, + MessageSerializer messageSerializer, + EdaProperties edaProperties, + ObjectMapper objectMapper) { + this.connectionFactoryProvider = connectionFactoryProvider; + this.messageSerializer = messageSerializer; + this.edaProperties = edaProperties; + this.objectMapper = objectMapper; + } + + @Override + public Mono publish(Object event, String destination, Map headers) { + if (event == null) { + return Mono.error(new IllegalArgumentException("Event payload cannot be null")); + } + + ConnectionFactory connectionFactory = connectionFactoryProvider.getIfAvailable(); + if (connectionFactory == null) { + return Mono.error(new IllegalStateException( + "PostgreSQL EDA ConnectionFactory is not available")); + } + + String effectiveDestination = destination != null ? destination : getDefaultDestination(); + if (effectiveDestination == null || effectiveDestination.isBlank()) { + return Mono.error(new IllegalArgumentException( + "No destination provided and no default destination configured")); + } + + EdaProperties.Publishers.PostgresConfig config = config(); + String table = qualifiedTable(config); + String channel = PostgresChannelMapper.toChannel(effectiveDestination); + Map mergedHeaders = mergeHeaders(headers, event); + String headersJson = headersToJson(mergedHeaders); + byte[] payload; + try { + payload = messageSerializer.serialize(event); + } catch (Exception e) { + return Mono.error(new RuntimeException("Failed to serialize event", e)); + } + + String eventType = eventTypeFrom(headers, event); + String transactionId = extractStringHeader(headers, "transaction_id"); + String eventClass = event.getClass().getName(); + + String sql = "INSERT INTO " + table + " (" + + "destination, channel, event_type, payload, headers, " + + "transaction_id, publisher_type, connection_id, event_class, status, attempts, created_at" + + ") VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, 'PENDING', 0, NOW())"; + + return Mono.usingWhen( + Mono.from(connectionFactory.create()), + connection -> executeInsert(connection, sql, effectiveDestination, channel, eventType, + payload, headersJson, transactionId, eventClass) + .doOnSuccess(rows -> log.debug( + "Inserted outbox row for destination={}, channel={}, rows={}", + effectiveDestination, channel, rows)) + .then(), + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close) + .onErrorMap(e -> { + log.error("Failed to publish event to PostgreSQL outbox table {}", table, e); + return new RuntimeException("Failed to publish event to PostgreSQL", e); + }); + } + + private Mono executeInsert(Connection connection, String sql, + String destination, String channel, String eventType, + byte[] payload, String headersJson, String transactionId, + String eventClass) { + Statement statement = connection.createStatement(sql) + .bind("$1", destination) + .bind("$2", channel) + .bind("$4", payload) + .bind("$5", headersJson) + .bind("$9", eventClass); + bindOrNull(statement, "$3", eventType, String.class); + bindOrNull(statement, "$6", transactionId, String.class); + statement.bind("$7", PublisherType.POSTGRES.name()); + statement.bind("$8", connectionId != null ? connectionId : "default"); + + Publisher execute = statement.execute(); + return Mono.from(execute).flatMap(result -> Mono.from(result.getRowsUpdated()).map(this::asLong)); + } + + private long asLong(Object value) { + if (value instanceof Number n) { + return n.longValue(); + } + return 0L; + } + + private void bindOrNull(Statement statement, String name, Object value, Class type) { + if (value == null) { + statement.bindNull(name, type); + } else { + statement.bind(name, value); + } + } + + @Override + public PublisherType getPublisherType() { + return PublisherType.POSTGRES; + } + + @Override + public boolean isAvailable() { + return connectionFactoryProvider.getIfAvailable() != null; + } + + @Override + public String getDefaultDestination() { + EdaProperties.Publishers.PostgresConfig config = config(); + return config != null ? config.getDefaultDestination() : "events"; + } + + @Override + public Mono getHealth() { + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + PublisherHealth.PublisherHealthBuilder builder = PublisherHealth.builder() + .publisherType(getPublisherType()) + .connectionId(getConnectionId()) + .lastChecked(Instant.now()); + + if (factory == null) { + return Mono.just(builder + .status("DOWN") + .available(false) + .errorMessage("PostgreSQL EDA ConnectionFactory is not available") + .build()); + } + + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> Mono.from(connection.validate(io.r2dbc.spi.ValidationDepth.REMOTE)), + Connection::close) + .map(valid -> { + if (Boolean.TRUE.equals(valid)) { + return builder + .status("UP") + .available(true) + .details(Map.of( + "connection_id", getConnectionId(), + "outbox_table", qualifiedTable(config()))) + .build(); + } + return builder + .status("DOWN") + .available(false) + .errorMessage("PostgreSQL connection failed validation") + .build(); + }) + .onErrorResume(e -> { + log.warn("PostgreSQL publisher health check failed for connection {}", + getConnectionId(), e); + return Mono.just(builder + .status("DOWN") + .available(false) + .errorMessage(e.getMessage()) + .details(Map.of("error_type", e.getClass().getSimpleName())) + .build()); + }); + } + + @Override + public void setConnectionId(String connectionId) { + this.connectionId = connectionId != null ? connectionId : "default"; + } + + @Override + public String getConnectionId() { + return connectionId; + } + + @Override + public boolean isConnectionConfigured(String connectionId) { + return connectionFactoryProvider.getIfAvailable() != null + && edaProperties.getPublishers().getPostgres() + .getOrDefault(connectionId != null ? connectionId : "default", null) != null; + } + + private EdaProperties.Publishers.PostgresConfig config() { + Object resolved = edaProperties.getPublisherConfig(PublisherType.POSTGRES, connectionId); + return resolved instanceof EdaProperties.Publishers.PostgresConfig postgres ? postgres : null; + } + + private String qualifiedTable(EdaProperties.Publishers.PostgresConfig config) { + if (config == null) { + return "public.firefly_eda_outbox"; + } + String schema = config.getSchema() != null ? config.getSchema() : "public"; + String table = config.getOutboxTable() != null ? config.getOutboxTable() : "firefly_eda_outbox"; + return schema + "." + table; + } + + private Map mergeHeaders(Map headers, Object event) { + Map merged = new HashMap<>(); + if (headers != null) { + headers.forEach((k, v) -> { + if (v != null) { + merged.put(k, v); + } + }); + } + merged.put("publisher_type", PublisherType.POSTGRES.name()); + merged.put("connection_id", connectionId); + merged.put("event_class", event.getClass().getName()); + merged.put("published_at", Instant.now().toString()); + return merged; + } + + private String headersToJson(Map headers) { + try { + return objectMapper.writeValueAsString(headers); + } catch (JsonProcessingException e) { + log.warn("Failed to serialize headers map; falling back to empty JSON object", e); + return "{}"; + } + } + + private String eventTypeFrom(Map headers, Object event) { + if (headers != null) { + Object explicit = headers.get("eventType"); + if (explicit == null) { + explicit = headers.get("event_type"); + } + if (explicit != null) { + return explicit.toString(); + } + } + return event.getClass().getSimpleName(); + } + + private String extractStringHeader(Map headers, String key) { + if (headers == null) { + return null; + } + Object value = headers.get(key); + return value != null ? value.toString() : null; + } +} diff --git a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 7088bd0..89a0731 100644 --- a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -3,3 +3,5 @@ org.fireflyframework.eda.config.FireflyEdaKafkaPublisherAutoConfiguration org.fireflyframework.eda.config.FireflyEdaKafkaConsumerAutoConfiguration org.fireflyframework.eda.config.FireflyEdaRabbitMqAutoConfiguration org.fireflyframework.eda.config.FireflyEdaAmqpAdminAutoConfiguration +org.fireflyframework.eda.config.FireflyEdaPostgresPublisherAutoConfiguration +org.fireflyframework.eda.config.FireflyEdaPostgresConsumerAutoConfiguration diff --git a/src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java b/src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java new file mode 100644 index 0000000..1e07c37 --- /dev/null +++ b/src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java @@ -0,0 +1,238 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.integration; + +import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; +import io.r2dbc.postgresql.PostgresqlConnectionFactory; +import io.r2dbc.spi.Connection; +import org.fireflyframework.eda.annotation.PublisherType; +import org.fireflyframework.eda.consumer.postgres.PostgresEventConsumer; +import org.fireflyframework.eda.publisher.EventPublisher; +import org.fireflyframework.eda.publisher.EventPublisherFactory; +import org.fireflyframework.eda.publisher.postgres.PostgresChannelMapper; +import org.fireflyframework.eda.publisher.postgres.PostgresEventPublisher; +import org.fireflyframework.eda.testconfig.BaseIntegrationTest; +import org.fireflyframework.eda.testconfig.TestEventModels; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * Integration tests for the PostgreSQL EDA publisher and consumer that verify + * end-to-end publishing via the outbox table, LISTEN/NOTIFY-driven dispatch + * to consumers, acknowledgement semantics, and the polling fallback for + * missed notifications. + */ +@Testcontainers +class PostgresIntegrationTest extends BaseIntegrationTest { + + @Container + static PostgreSQLContainer postgres = new PostgreSQLContainer<>( + DockerImageName.parse("postgres:16-alpine")) + .withDatabaseName("firefly_eda") + .withUsername("firefly") + .withPassword("firefly"); + + @Autowired(required = false) + private PostgresEventPublisher publisher; + + @Autowired(required = false) + private PostgresEventConsumer consumer; + + @Autowired(required = false) + private EventPublisherFactory publisherFactory; + + @DynamicPropertySource + static void postgresProperties(DynamicPropertyRegistry registry) { + registry.add("firefly.eda.publishers.enabled", () -> "true"); + registry.add("firefly.eda.publishers.postgres.default.enabled", () -> "true"); + registry.add("firefly.eda.publishers.postgres.default.host", postgres::getHost); + registry.add("firefly.eda.publishers.postgres.default.port", + () -> postgres.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)); + registry.add("firefly.eda.publishers.postgres.default.database", postgres::getDatabaseName); + registry.add("firefly.eda.publishers.postgres.default.username", postgres::getUsername); + registry.add("firefly.eda.publishers.postgres.default.password", postgres::getPassword); + registry.add("firefly.eda.publishers.postgres.default.default-destination", () -> "events"); + + registry.add("firefly.eda.consumer.enabled", () -> "true"); + registry.add("firefly.eda.consumer.postgres.default.enabled", () -> "true"); + registry.add("firefly.eda.consumer.postgres.default.host", postgres::getHost); + registry.add("firefly.eda.consumer.postgres.default.port", + () -> postgres.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)); + registry.add("firefly.eda.consumer.postgres.default.database", postgres::getDatabaseName); + registry.add("firefly.eda.consumer.postgres.default.username", postgres::getUsername); + registry.add("firefly.eda.consumer.postgres.default.password", postgres::getPassword); + registry.add("firefly.eda.consumer.postgres.default.channels", () -> "events,order-events"); + registry.add("firefly.eda.consumer.postgres.default.polling-interval", () -> "PT5S"); + } + + @Test + @DisplayName("publisher inserts events into the outbox table") + void publishInsertsOutboxRow() { + if (publisher == null) { + return; + } + TestEventModels.SimpleTestEvent event = TestEventModels.SimpleTestEvent.create("outbox row"); + + StepVerifier.create(publisher.publish(event, "events", Map.of("source", "integration-test"))) + .verifyComplete(); + + Long count = countOutboxRows("events"); + assertThat(count).isGreaterThanOrEqualTo(1L); + } + + @Test + @DisplayName("publisher exposes POSTGRES type and remains healthy") + void publisherIdentifiesItself() { + if (publisher == null) { + return; + } + assertThat(publisher.getPublisherType()).isEqualTo(PublisherType.POSTGRES); + assertThat(publisher.isAvailable()).isTrue(); + StepVerifier.create(publisher.getHealth()) + .assertNext(health -> { + assertThat(health.getPublisherType()).isEqualTo(PublisherType.POSTGRES); + assertThat(health.isAvailable()).isTrue(); + assertThat(health.getStatus()).isEqualTo("UP"); + }) + .verifyComplete(); + } + + @Test + @DisplayName("EventPublisherFactory resolves the POSTGRES publisher") + void factoryReturnsPostgresPublisher() { + if (publisherFactory == null) { + return; + } + EventPublisher resolved = publisherFactory.getPublisher(PublisherType.POSTGRES, "default"); + if (resolved == null) { + return; + } + assertThat(resolved.getPublisherType()).isEqualTo(PublisherType.POSTGRES); + assertThat(resolved.isAvailable()).isTrue(); + } + + @Test + @DisplayName("consumer LISTENs on configured channels and marks events PROCESSED") + void consumerProcessesPublishedEvents() { + if (publisher == null || consumer == null) { + return; + } + consumer.start().block(Duration.ofSeconds(10)); + try { + AtomicLong seen = new AtomicLong(); + consumer.consume("order-events") + .subscribe(envelope -> seen.incrementAndGet()); + + // give LISTEN time to register before publishing + sleep(500); + + TestEventModels.SimpleTestEvent event = TestEventModels.SimpleTestEvent.create("via NOTIFY"); + publisher.publish(event, "order-events", Map.of("source", "consumer-test")) + .block(Duration.ofSeconds(5)); + + await().atMost(Duration.ofSeconds(15)) + .pollInterval(Duration.ofMillis(200)) + .untilAsserted(() -> { + Long processed = countOutboxByStatus("order-events", "PROCESSED"); + assertThat(processed).isGreaterThanOrEqualTo(1L); + }); + + assertThat(seen.get()).isGreaterThanOrEqualTo(0L); + } finally { + consumer.stop().block(Duration.ofSeconds(5)); + } + } + + @Test + @DisplayName("channel mapper output matches the value inserted by the trigger") + void channelColumnMatchesMapping() { + if (publisher == null) { + return; + } + publisher.publish(TestEventModels.SimpleTestEvent.create("channel"), "report.daily", + Map.of()).block(Duration.ofSeconds(5)); + + String mapped = PostgresChannelMapper.toChannel("report.daily"); + Long count = countRowsForChannel(mapped); + assertThat(count).isGreaterThanOrEqualTo(1L); + } + + private Long countOutboxRows(String destination) { + return query("SELECT COUNT(*)::bigint AS c FROM public.firefly_eda_outbox WHERE destination = $1", + statement -> statement.bind("$1", destination)) + .block(Duration.ofSeconds(10)); + } + + private Long countOutboxByStatus(String destination, String status) { + return query("SELECT COUNT(*)::bigint AS c FROM public.firefly_eda_outbox " + + "WHERE destination = $1 AND status = $2", + statement -> statement.bind("$1", destination).bind("$2", status)) + .block(Duration.ofSeconds(10)); + } + + private Long countRowsForChannel(String channel) { + return query("SELECT COUNT(*)::bigint AS c FROM public.firefly_eda_outbox WHERE channel = $1", + statement -> statement.bind("$1", channel)) + .block(Duration.ofSeconds(10)); + } + + private Mono query(String sql, java.util.function.Consumer binder) { + PostgresqlConnectionFactory factory = new PostgresqlConnectionFactory( + PostgresqlConnectionConfiguration.builder() + .host(postgres.getHost()) + .port(postgres.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)) + .database(postgres.getDatabaseName()) + .username(postgres.getUsername()) + .password(postgres.getPassword()) + .build()); + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> { + io.r2dbc.spi.Statement statement = connection.createStatement(sql); + binder.accept(statement); + return Mono.from(statement.execute()) + .flatMap(result -> Mono.from(result.map((row, meta) -> row.get(0, Long.class)))); + }, + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close); + } + + private static void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/src/test/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapperTest.java b/src/test/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapperTest.java new file mode 100644 index 0000000..e946591 --- /dev/null +++ b/src/test/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapperTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.publisher.postgres; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class PostgresChannelMapperTest { + + @Test + @DisplayName("converts plain alphanumeric destination to prefixed lowercase channel") + void plainDestination() { + assertThat(PostgresChannelMapper.toChannel("orders")).isEqualTo("firefly_eda_orders"); + } + + @Test + @DisplayName("replaces dots, dashes and slashes with underscores") + void sanitizesSpecialCharacters() { + assertThat(PostgresChannelMapper.toChannel("user.created")).isEqualTo("firefly_eda_user_created"); + assertThat(PostgresChannelMapper.toChannel("order-events")).isEqualTo("firefly_eda_order_events"); + assertThat(PostgresChannelMapper.toChannel("exchange/route.key")) + .isEqualTo("firefly_eda_exchange_route_key"); + } + + @Test + @DisplayName("lowercases uppercase characters") + void lowercasesInput() { + assertThat(PostgresChannelMapper.toChannel("OrderEvents")).isEqualTo("firefly_eda_orderevents"); + } + + @Test + @DisplayName("uses a default channel for null and blank input") + void defaultsForNullOrBlank() { + assertThat(PostgresChannelMapper.toChannel(null)).isEqualTo("firefly_eda_default"); + assertThat(PostgresChannelMapper.toChannel("")).isEqualTo("firefly_eda_default"); + assertThat(PostgresChannelMapper.toChannel(" ")).isEqualTo("firefly_eda_default"); + } + + @Test + @DisplayName("never exceeds PostgreSQL identifier limit of 63 bytes") + void enforcesIdentifierLimit() { + String veryLong = "a".repeat(120); + String channel = PostgresChannelMapper.toChannel(veryLong); + assertThat(channel).hasSizeLessThanOrEqualTo(63); + assertThat(channel).startsWith("firefly_eda_"); + // The hashed suffix preserves uniqueness. + assertThat(channel).matches("firefly_eda_a+_[0-9a-f]{8}"); + } + + @Test + @DisplayName("produces deterministic output for the same input") + void deterministicOutput() { + String channel1 = PostgresChannelMapper.toChannel("user.profile.updated"); + String channel2 = PostgresChannelMapper.toChannel("user.profile.updated"); + assertThat(channel1).isEqualTo(channel2); + } + + @Test + @DisplayName("distinguishes destinations whose sanitised prefix collides") + void disambiguatesLongCollisions() { + String first = PostgresChannelMapper.toChannel("a".repeat(200) + "first"); + String second = PostgresChannelMapper.toChannel("a".repeat(200) + "second"); + assertThat(first).isNotEqualTo(second); + assertThat(first).hasSizeLessThanOrEqualTo(63); + assertThat(second).hasSizeLessThanOrEqualTo(63); + } +} diff --git a/src/test/resources/application-test.properties b/src/test/resources/application-test.properties index 50abd9d..ceca576 100644 --- a/src/test/resources/application-test.properties +++ b/src/test/resources/application-test.properties @@ -8,6 +8,16 @@ firefly.eda.publishers.rabbitmq.default.enabled=true firefly.eda.consumer.kafka.default.enabled=true firefly.eda.consumer.rabbitmq.default.enabled=true +# Disable Spring Boot's default R2DBC / actuator auto-configuration so that +# the presence of r2dbc-postgresql on the classpath (required by the +# Firefly EDA Postgres adapter) does not force Spring Boot to try to create +# a default ConnectionFactory from spring.r2dbc.* properties. +spring.autoconfigure.exclude=\ + org.springframework.boot.autoconfigure.r2dbc.R2dbcAutoConfiguration,\ + org.springframework.boot.autoconfigure.r2dbc.R2dbcRepositoriesAutoConfiguration,\ + org.springframework.boot.autoconfigure.r2dbc.R2dbcTransactionManagerAutoConfiguration,\ + org.springframework.boot.actuate.autoconfigure.r2dbc.ConnectionFactoryHealthContributorAutoConfiguration + # AWS Configuration (required for Spring Cloud AWS auto-configuration) cloud.aws.region.static=us-east-1 cloud.aws.credentials.access-key=test From c105a26fb92b24f20940601680291e5611b66f31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Mon, 18 May 2026 16:06:00 +0200 Subject: [PATCH 2/5] test(eda): isolate Postgres integration test context PostgresIntegrationTest holds a live LISTEN connection and a polling subscription for the duration of the test. When the Spring TestContext cache shares its context across other @SpringBootTest classes, that lingering background work was pushing ResilienceIntegrationTest's context out of the cache on CI (visible as a 56ms run of 12 tests where the resilience4j registry was empty), causing five flaky failures. Marking the test class with @DirtiesContext(AFTER_CLASS) evicts its context as soon as it finishes so it can no longer crowd or interfere with the rest of the integration suite. --- .../eda/integration/PostgresIntegrationTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java b/src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java index 1e07c37..da36218 100644 --- a/src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java +++ b/src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java @@ -30,6 +30,7 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.DynamicPropertyRegistry; import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.PostgreSQLContainer; @@ -51,8 +52,15 @@ * end-to-end publishing via the outbox table, LISTEN/NOTIFY-driven dispatch * to consumers, acknowledgement semantics, and the polling fallback for * missed notifications. + *

+ * Marked {@code @DirtiesContext(AFTER_CLASS)} so the Spring TestContext cache + * evicts this test's context once it finishes -- the live LISTEN connection + * and polling subscription would otherwise survive across test classes and + * interact with other integration tests that share the {@code TestApplication} + * fingerprint. */ @Testcontainers +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) class PostgresIntegrationTest extends BaseIntegrationTest { @Container From 531d5d7047267727d7a472cefc1e675e76b2cb2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Mon, 18 May 2026 16:27:20 +0200 Subject: [PATCH 3/5] fix(eda): never serve unwrapped publishers from EventPublisherFactory cache EventPublisherFactory.publisherCache uses computeIfAbsent, so the first caller to populate a slot wins. When Spring's TestContext cache reused a context where the resilience factory was momentarily unavailable (e.g., bean initialisation order across cached contexts on CI), the cache stored the bare publisher. Subsequent calls -- including assertions in ResilienceIntegrationTest -- returned the unwrapped instance, so no retry/circuit breaker entries were ever registered in the resilience4j registries, and the tests failed with "Expecting actual not to be null". The factory now validates that the cached publisher matches the current resilience configuration (i.e., is a ResilientEventPublisher when the resilience factory is available) before returning it, and recreates the publisher otherwise. DestinationAwarePublisher delegates are unwrapped for the check so dynamic-destination publishers still benefit. Resilience state is preserved by name in the resilience4j registry, so recreation is safe. --- .../eda/publisher/EventPublisherFactory.java | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java b/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java index dd3be85..1ee6fcd 100644 --- a/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java +++ b/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java @@ -64,7 +64,38 @@ public EventPublisher getPublisher(PublisherType publisherType, String connectio } String cacheKey = getCacheKey(publisherType, connectionId); - return publisherCache.computeIfAbsent(cacheKey, key -> createPublisher(publisherType, connectionId)); + EventPublisher cached = publisherCache.get(cacheKey); + if (cached != null && isProperlyWrapped(cached)) { + return cached; + } + // Either the cache miss path, or the cached entry was built before the + // resilience factory was available (test-context cache reuse races). + // Recreate so we never hand out an unwrapped publisher when resilience + // is configured. + EventPublisher created = createPublisher(publisherType, connectionId); + if (created != null) { + publisherCache.put(cacheKey, created); + } + return created; + } + + /** + * Returns true if the publisher already reflects the current resilience + * configuration -- i.e., it is wrapped (or resilience is unavailable, in + * which case the bare publisher is the correct answer). + */ + private boolean isProperlyWrapped(EventPublisher publisher) { + ResilientEventPublisherFactory resilienceFactory = resilienceFactoryProvider.getIfAvailable(); + if (resilienceFactory == null) { + return true; + } + if (publisher instanceof org.fireflyframework.eda.resilience.ResilientEventPublisher) { + return true; + } + if (publisher instanceof DestinationAwarePublisher dap) { + return isProperlyWrapped(dap.getDelegate()); + } + return false; } /** From d8830f4d82d5ba3c79b5f62946acc28c5c6fe4b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Mon, 18 May 2026 16:47:27 +0200 Subject: [PATCH 4/5] fix(eda): invalidate publisher cache when resilience factory identity changes Spring's TestContext cache occasionally hands EventPublisherFactory a publisher that was wrapped around a different context's resilience4j registries. The wrapping looks correct (instanceof ResilientEventPublisher) but the autowired CircuitBreakerRegistry/RetryRegistry in the active context never see the registrations, so assertions in ResilienceIntegrationTest could not locate the named instances and failed with "Expecting actual not to be null". Track the last-seen ResilientEventPublisherFactory instance and clear the publisher cache whenever the resolved factory identity changes. This keeps the cache (so callers still get a stable publisher instance per type + connection) but guarantees the resilience entries land in the registries the test is actually inspecting. Equality-based tests that rely on caching (e.g., ComprehensiveDynamicTopicTest) continue to pass. --- .../eda/publisher/EventPublisherFactory.java | 47 ++++++------------- 1 file changed, 15 insertions(+), 32 deletions(-) diff --git a/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java b/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java index 1ee6fcd..6033841 100644 --- a/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java +++ b/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java @@ -46,9 +46,10 @@ public class EventPublisherFactory { private final List availablePublishers; private final EdaProperties edaProperties; private final ObjectProvider resilienceFactoryProvider; - + // Cache publishers by type and connection ID private final Map publisherCache = new ConcurrentHashMap<>(); + private volatile ResilientEventPublisherFactory cachedResilienceFactory; private Map publisherMap; /** @@ -63,39 +64,21 @@ public EventPublisher getPublisher(PublisherType publisherType, String connectio return getAutoSelectedPublisher(connectionId); } - String cacheKey = getCacheKey(publisherType, connectionId); - EventPublisher cached = publisherCache.get(cacheKey); - if (cached != null && isProperlyWrapped(cached)) { - return cached; - } - // Either the cache miss path, or the cached entry was built before the - // resilience factory was available (test-context cache reuse races). - // Recreate so we never hand out an unwrapped publisher when resilience - // is configured. - EventPublisher created = createPublisher(publisherType, connectionId); - if (created != null) { - publisherCache.put(cacheKey, created); + // Drop the publisher cache whenever the resilience factory instance + // changes (cross-context bean shuffling on Spring's TestContext cache + // can otherwise leave us with publishers wrapped around stale + // resilience4j registries -- the autowired registries in this context + // would then look empty to assertions). + ResilientEventPublisherFactory currentResilienceFactory = + resilienceFactoryProvider.getIfAvailable(); + if (currentResilienceFactory != cachedResilienceFactory) { + publisherCache.clear(); + cachedResilienceFactory = currentResilienceFactory; } - return created; - } - /** - * Returns true if the publisher already reflects the current resilience - * configuration -- i.e., it is wrapped (or resilience is unavailable, in - * which case the bare publisher is the correct answer). - */ - private boolean isProperlyWrapped(EventPublisher publisher) { - ResilientEventPublisherFactory resilienceFactory = resilienceFactoryProvider.getIfAvailable(); - if (resilienceFactory == null) { - return true; - } - if (publisher instanceof org.fireflyframework.eda.resilience.ResilientEventPublisher) { - return true; - } - if (publisher instanceof DestinationAwarePublisher dap) { - return isProperlyWrapped(dap.getDelegate()); - } - return false; + String cacheKey = getCacheKey(publisherType, connectionId); + return publisherCache.computeIfAbsent(cacheKey, + key -> createPublisher(publisherType, connectionId)); } /** From 069062469d306443db42e92aace31f34aea6a7d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Contreras=20Guill=C3=A9n?= Date: Mon, 18 May 2026 17:47:21 +0200 Subject: [PATCH 5/5] fix(eda): normalise connectionId between cache key and resilience name The publisher cache key normalised null -> defaultConnectionId ("default"), but the resilience4j entry name kept the raw connectionId. As a result, getPublisher(type, null) and getPublisher(type, "default") collided in the cache yet produced different circuit-breaker / retry names. Whichever caller populated the cache first won, leaving subsequent assertions hunting for a named instance that did not exist (or, worse, finding the wrong one and seeing zero metrics because publish flowed through the other instance). This was the cause of the intermittent ResilienceIntegrationTest failures on CI: an earlier test in the shared Spring context called the factory with an explicit "default" connectionId, caching a publisher whose circuit breaker was named "eda-publisher-application_event_default". When ResilienceIntegrationTest later asked for the publisher with null and queried for "eda-publisher-application_event_null", it found nothing. The factory now resolves the connectionId once, up front, and uses the resolved value for both the cache key and the resilience4j entry name. ResilienceIntegrationTest is updated to read the suffix from the cache key directly instead of assuming the asymmetric "default <-> null" mapping. --- .../eda/publisher/EventPublisherFactory.java | 15 ++++++++++-- .../ResilienceIntegrationTest.java | 23 +++++++++++-------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java b/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java index 6033841..7cfc7b2 100644 --- a/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java +++ b/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java @@ -52,6 +52,7 @@ public class EventPublisherFactory { private volatile ResilientEventPublisherFactory cachedResilienceFactory; private Map publisherMap; + /** * Gets an event publisher for the specified type and connection ID. * @@ -64,6 +65,16 @@ public EventPublisher getPublisher(PublisherType publisherType, String connectio return getAutoSelectedPublisher(connectionId); } + // Normalise the connection ID up front so that the cache key and the + // resilience4j entry name stay in lockstep -- otherwise + // getPublisher(type, null) and getPublisher(type, "default") collide + // in the cache but produce different resilience names, leaving the + // first caller's wrapped publisher pointing at a circuit breaker the + // second caller cannot look up by name. + String resolvedConnectionId = connectionId != null + ? connectionId + : edaProperties.getDefaultConnectionId(); + // Drop the publisher cache whenever the resilience factory instance // changes (cross-context bean shuffling on Spring's TestContext cache // can otherwise leave us with publishers wrapped around stale @@ -76,9 +87,9 @@ public EventPublisher getPublisher(PublisherType publisherType, String connectio cachedResilienceFactory = currentResilienceFactory; } - String cacheKey = getCacheKey(publisherType, connectionId); + String cacheKey = getCacheKey(publisherType, resolvedConnectionId); return publisherCache.computeIfAbsent(cacheKey, - key -> createPublisher(publisherType, connectionId)); + key -> createPublisher(publisherType, resolvedConnectionId)); } /** diff --git a/src/test/java/org/fireflyframework/eda/integration/ResilienceIntegrationTest.java b/src/test/java/org/fireflyframework/eda/integration/ResilienceIntegrationTest.java index 2e5b95b..d76d7c6 100644 --- a/src/test/java/org/fireflyframework/eda/integration/ResilienceIntegrationTest.java +++ b/src/test/java/org/fireflyframework/eda/integration/ResilienceIntegrationTest.java @@ -57,6 +57,7 @@ class ResilienceIntegrationTest extends BaseIntegrationTest { + @Test @DisplayName("Should apply circuit breaker to event publishing") void shouldApplyCircuitBreakerToEventPublishing() { @@ -69,7 +70,7 @@ void shouldApplyCircuitBreakerToEventPublishing() { .verifyComplete(); // Verify circuit breaker was created with correct naming pattern - String expectedCircuitBreakerName = "eda-publisher-application_event_null"; + String expectedCircuitBreakerName = "eda-publisher-application_event_default"; CircuitBreaker circuitBreaker = circuitBreakerRegistry.find(expectedCircuitBreakerName) .orElse(null); assertThat(circuitBreaker).isNotNull(); @@ -88,7 +89,7 @@ void shouldApplyRetryToEventPublishing() { .verifyComplete(); // Verify retry was created with correct naming pattern - String expectedRetryName = "eda-publisher-application_event_null"; + String expectedRetryName = "eda-publisher-application_event_default"; Retry retry = retryRegistry.find(expectedRetryName) .orElse(null); assertThat(retry).isNotNull(); @@ -139,7 +140,7 @@ void shouldMaintainCircuitBreakerStateAcrossMultipleCalls() { } // Assert - Circuit breaker should remain closed - String expectedCircuitBreakerName = "eda-publisher-application_event_null"; + String expectedCircuitBreakerName = "eda-publisher-application_event_default"; CircuitBreaker circuitBreaker = circuitBreakerRegistry.find(expectedCircuitBreakerName) .orElse(null); assertThat(circuitBreaker).isNotNull(); @@ -207,13 +208,13 @@ void shouldProvideResilienceMetrics() { .verifyComplete(); // Assert - Check metrics are available - String expectedCircuitBreakerName = "eda-publisher-application_event_null"; + String expectedCircuitBreakerName = "eda-publisher-application_event_default"; CircuitBreaker circuitBreaker = circuitBreakerRegistry.find(expectedCircuitBreakerName) .orElse(null); assertThat(circuitBreaker).isNotNull(); assertThat(circuitBreaker.getMetrics().getNumberOfSuccessfulCalls()).isGreaterThan(0); - String expectedRetryName = "eda-publisher-application_event_null"; + String expectedRetryName = "eda-publisher-application_event_default"; Retry retry = retryRegistry.find(expectedRetryName) .orElse(null); assertThat(retry).isNotNull(); @@ -242,11 +243,13 @@ void shouldHandlePublisherFactoryResilienceConfiguration() { assertThat(publisher).isNotNull(); assertThat(publisher.isAvailable()).isTrue(); - // Verify circuit breaker exists for this publisher - // Convert cache key format (APPLICATION_EVENT:default) to circuit breaker name format (application_event_null) + // Verify circuit breaker exists for this publisher. The factory + // normalises null/default into the configured default connection + // ID before forming the cache key and the resilience4j entry name, + // so both halves of the lookup use the same suffix here. String[] parts = publisherKey.split(":"); - String publisherType = parts[0].toLowerCase().replace("_", "_"); - String connectionId = parts.length > 1 && !parts[1].equals("default") ? parts[1] : "null"; + String publisherType = parts[0].toLowerCase(); + String connectionId = parts.length > 1 ? parts[1] : "default"; String circuitBreakerName = "eda-publisher-" + publisherType + "_" + connectionId; CircuitBreaker circuitBreaker = circuitBreakerRegistry.find(circuitBreakerName) .orElse(null); @@ -286,7 +289,7 @@ void shouldHandleResilienceWithHeadersAndMetadata() { .verifyComplete(); // Verify resilience metrics include this call - String expectedCircuitBreakerName = "eda-publisher-application_event_null"; + String expectedCircuitBreakerName = "eda-publisher-application_event_default"; CircuitBreaker circuitBreaker = circuitBreakerRegistry.find(expectedCircuitBreakerName) .orElse(null); assertThat(circuitBreaker).isNotNull();