diff --git a/README.md b/README.md index c17f23b..5ee5365 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![Java](https://img.shields.io/badge/Java-21%2B-orange.svg)](https://openjdk.org) [![Spring Boot](https://img.shields.io/badge/Spring%20Boot-3.x-green.svg)](https://spring.io/projects/spring-boot) -> Unified event-driven architecture library with Kafka, RabbitMQ, and Spring Application Events support. +> Unified event-driven architecture library with Kafka, RabbitMQ, PostgreSQL (LISTEN/NOTIFY + outbox), and Spring Application Events support. --- @@ -23,7 +23,7 @@ ## Overview -Firefly Framework EDA provides a standardized messaging abstraction for event-driven architectures, supporting multiple broker implementations through a unified publisher/consumer API. It enables reactive event publishing and consumption with built-in support for Apache Kafka, RabbitMQ, and Spring Application Events as transport mechanisms. +Firefly Framework EDA provides a standardized messaging abstraction for event-driven architectures, supporting multiple broker implementations through a unified publisher/consumer API. It enables reactive event publishing and consumption with built-in support for Apache Kafka, RabbitMQ, PostgreSQL (via `LISTEN`/`NOTIFY` backed by a transactional outbox table), and Spring Application Events as transport mechanisms. The library features annotation-driven event publishing (`@EventPublisher`, `@PublishResult`), declarative event listeners (`@EventListener`), and a comprehensive set of event filtering, serialization, and error handling capabilities. It includes support for JSON, Avro, and Protobuf message serialization formats. @@ -31,7 +31,7 @@ The resilient publisher wrapper provides circuit breaker integration, while the ## Features -- Multi-broker support: Apache Kafka, RabbitMQ, Spring Application Events +- Multi-broker support: Apache Kafka, RabbitMQ, PostgreSQL (`LISTEN`/`NOTIFY` + outbox), Spring Application Events - Annotation-driven publishing: `@EventPublisher`, `@PublishResult` - Declarative event listening: `@EventListener` with SpEL-based filtering - Event envelope pattern with metadata propagation @@ -41,16 +41,17 @@ The resilient publisher wrapper provides circuit breaker integration, while the - Resilient publisher with circuit breaker support - Dynamic event listener registration at runtime - AMQP admin auto-configuration for RabbitMQ exchanges and queues +- Transactional outbox table for PostgreSQL with auto-created schema and trigger - Custom error handling strategies with metrics and notification handlers - Health indicators and metrics for Actuator integration -- Spring Boot auto-configuration for Kafka and RabbitMQ +- Spring Boot auto-configuration for Kafka, RabbitMQ, and PostgreSQL ## Requirements - Java 21+ - Spring Boot 3.x - Maven 3.9+ -- Apache Kafka or RabbitMQ (depending on chosen broker) +- Apache Kafka, RabbitMQ, or PostgreSQL 11+ (depending on chosen transport) ## Installation @@ -95,16 +96,76 @@ public class OrderEventHandler { ```yaml firefly: eda: - broker: kafka # kafka, rabbitmq, spring - kafka: - bootstrap-servers: localhost:9092 - consumer: - group-id: my-service - rabbitmq: - host: localhost - port: 5672 + enabled: true + default-publisher-type: AUTO # AUTO chooses KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT + publishers: + enabled: true + kafka: + default: + enabled: true + bootstrap-servers: localhost:9092 + rabbitmq: + default: + enabled: true + host: localhost + port: 5672 + postgres: + default: + enabled: true + host: localhost + port: 5432 + database: app + username: app + password: secret + schema: public + outbox-table: firefly_eda_outbox + default-destination: events + auto-create-schema: true # provision outbox table + NOTIFY trigger at startup + consumer: + enabled: true + group-id: my-service + kafka: + default: + enabled: true + bootstrap-servers: localhost:9092 + rabbitmq: + default: + enabled: true + host: localhost + port: 5672 + queues: events-queue + postgres: + default: + enabled: true + host: localhost + port: 5432 + database: app + username: app + password: secret + channels: events,order-events # destinations to LISTEN on + polling-interval: 30s # NOTIFY-loss fallback poll cadence + max-attempts: 3 # outbox row moves to DEAD_LETTER after N failures ``` +### PostgreSQL transport at a glance + +- Each `publish()` performs a single `INSERT` into `firefly_eda_outbox`. A + database trigger fires `pg_notify(channel, id)` for every inserted row. +- The consumer holds a dedicated R2DBC connection that runs `LISTEN ` + for every subscribed destination. Notifications carry only the outbox row + id so payloads can be arbitrarily large. +- On dispatch, the listener pipeline either marks the row `PROCESSED` or + increments `attempts`; once `attempts` reaches `max-attempts`, the row + moves to `DEAD_LETTER` status. +- A periodic poll (`polling-interval`) catches rows that slipped past the + live channel (e.g., consumer offline at insert time, payload too large, + connection reset). Set it to `0s` to disable polling. +- Channel names are derived deterministically from destinations via the + built-in mapper: non-alphanumeric characters become `_`, the result is + lower-cased, prefixed with `firefly_eda_`, and truncated to fit + PostgreSQL's 63-byte identifier limit with a stable hash suffix when + needed. + ## Documentation Additional documentation is available in the [docs/](docs/) directory: diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 34aea82..57c03ec 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -179,6 +179,54 @@ firefly: | `default-routing-key` | string | `"event"` | Default routing key | | `properties` | map | `{}` | Additional RabbitMQ connection properties | +### PostgreSQL Publisher + +The PostgreSQL publisher uses an outbox table together with `pg_notify` to deliver events. Configuration lives under `firefly.eda.publishers.postgres.` -- never under `spring.r2dbc.*`. + +```yaml +firefly: + eda: + publishers: + postgres: + default: # Connection ID + enabled: true + host: "localhost" + port: 5432 + database: "app" + username: "app" + password: "secret" + schema: "public" + outbox-table: "firefly_eda_outbox" + default-destination: "events" + auto-create-schema: true + max-pool-size: 10 + properties: + statement_timeout: "30000" +``` + +| Property | Type | Default | Description | +|----------|------|---------|-------------| +| `enabled` | boolean | `false` | Whether this PostgreSQL publisher connection is enabled | +| `host` | string | `"localhost"` | PostgreSQL host | +| `port` | int | `5432` | PostgreSQL port | +| `database` | string | `null` | Database name | +| `username` | string | `null` | Database username | +| `password` | string | `null` | Database password | +| `schema` | string | `"public"` | Schema containing the outbox table | +| `outbox-table` | string | `"firefly_eda_outbox"` | Name of the outbox table | +| `default-destination` | string | `"events"` | Default destination when none is provided to `publish()` | +| `auto-create-schema` | boolean | `true` | Provision the outbox table, index, NOTIFY function and trigger at startup | +| `max-pool-size` | int | `10` | Maximum R2DBC connection pool size used for outbox inserts | +| `properties` | map | `{}` | Additional R2DBC PostgreSQL startup options (e.g., statement timeout) | + +When `auto-create-schema: true`, the publisher provisions: + +- `firefly_eda_outbox` table with `id`, `destination`, `channel`, `payload (BYTEA)`, `headers (JSONB)`, `status`, `attempts`, `error_message`, and timestamp columns +- An index on `(status, created_at)` filtered to `status = 'PENDING'` +- An index on `(channel, status)` +- A `firefly_eda_notify_event()` trigger function that calls `pg_notify(NEW.channel, NEW.id::text)` +- An `AFTER INSERT` trigger on the outbox table that runs the function + ## Consumer Configuration Consumers are configured under `firefly.eda.consumer`. @@ -289,6 +337,51 @@ firefly: **Important**: RabbitMQ queues must be pre-declared and configured here. The `@EventListener` destinations are used for filtering messages after they are consumed from these queues, not for determining which queues to subscribe to. +### PostgreSQL Consumer + +The PostgreSQL consumer holds one long-lived R2DBC connection to receive `NOTIFY` messages and creates short-lived connections to drain the outbox table. Configuration lives under `firefly.eda.consumer.postgres.`. Channels are derived from `@EventListener` annotations with `consumerType=POSTGRES` or `consumerType=AUTO`, and additionally from the `channels` property. + +```yaml +firefly: + eda: + consumer: + postgres: + default: + enabled: true + host: "localhost" + port: 5432 + database: "app" + username: "app" + password: "secret" + schema: "public" + outbox-table: "firefly_eda_outbox" + channels: "events,order-events" # comma-separated destinations to LISTEN on + polling-interval: 30s # NOTIFY-loss fallback poll cadence; set to 0s to disable + max-attempts: 3 # rows beyond this attempt count are marked DEAD_LETTER + batch-size: 50 # max rows fetched per poll cycle + max-pool-size: 5 + properties: {} +``` + +| Property | Type | Default | Description | +|----------|------|---------|-------------| +| `enabled` | boolean | `false` | Whether this PostgreSQL consumer connection is enabled | +| `host` | string | `"localhost"` | PostgreSQL host | +| `port` | int | `5432` | PostgreSQL port | +| `database` | string | `null` | Database name | +| `username` | string | `null` | Database username | +| `password` | string | `null` | Database password | +| `schema` | string | `"public"` | Schema containing the outbox table | +| `outbox-table` | string | `"firefly_eda_outbox"` | Outbox table the consumer reads from | +| `channels` | string | `"events"` | Comma-separated destinations to LISTEN on (combined with any from `@EventListener`) | +| `polling-interval` | duration | `30s` | Fallback poll cadence for rows missed by NOTIFY; `0s` disables polling | +| `max-attempts` | int | `3` | Failure attempts before a row transitions to `DEAD_LETTER` | +| `batch-size` | int | `50` | Maximum rows fetched per poll cycle | +| `max-pool-size` | int | `5` | Connection pool size for outbox queries; LISTEN uses one connection on top of this budget | +| `properties` | map | `{}` | Additional R2DBC PostgreSQL startup options | + +**How acknowledgement works**: On successful dispatch, the row is updated to `status='PROCESSED', processed_at=NOW()`. On failure, `attempts` is incremented and `error_message` is recorded; once `attempts >= max-attempts`, the row moves to `status='DEAD_LETTER'`. Use the outbox table directly to inspect, reset, or requeue events. + ### Application Event Consumer ```yaml diff --git a/docs/PUBLISHER_TYPES.md b/docs/PUBLISHER_TYPES.md index 64d5a1e..b81dcb5 100644 --- a/docs/PUBLISHER_TYPES.md +++ b/docs/PUBLISHER_TYPES.md @@ -8,7 +8,7 @@ The `PublisherType` enum defines the following supported messaging platforms: ### AUTO **Description**: Automatically select the best available publisher -**Selection Priority**: KAFKA → RABBITMQ → APPLICATION_EVENT → NOOP +**Selection Priority**: KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT → NOOP **Use Case**: Let the system choose the optimal publisher based on availability and configuration ```java @@ -60,6 +60,53 @@ firefly: default-exchange: events ``` +### POSTGRES +**Description**: PostgreSQL `LISTEN`/`NOTIFY` with a transactional outbox table +**Features**: Persistent storage, payload sizes beyond `NOTIFY`'s 8 kB limit, retry counts, dead-letter status, polling fallback for missed notifications +**Best For**: Services that already use PostgreSQL and want reliable event publishing without an additional broker; outbox-style transactional event publishing +**Persistence**: ✅ Yes (outbox table rows) +**Ordering**: ✅ Yes (per destination -- rows are ordered by `created_at` / `id`) +**Cloud Service**: ❌ No (self-hosted; works with managed PostgreSQL services) + +```yaml +firefly: + eda: + publishers: + postgres: + default: + enabled: true + host: localhost + port: 5432 + database: app + username: app + password: secret + schema: public + outbox-table: firefly_eda_outbox + default-destination: events + auto-create-schema: true # provision outbox table + NOTIFY trigger at startup + consumer: + postgres: + default: + enabled: true + host: localhost + port: 5432 + database: app + username: app + password: secret + channels: events,order-events # destinations to LISTEN on + polling-interval: 30s + max-attempts: 3 +``` + +**How it works**: Each `publish()` performs a single `INSERT` into the +configured outbox table. A trigger fires `pg_notify(channel, id)` for the +new row, so any consumer that has issued `LISTEN` on the matching channel +receives the row id and fetches the full payload from the table. The +consumer marks rows as `PROCESSED` after successful dispatch, increments +`attempts` on failure, and transitions to `DEAD_LETTER` once `max-attempts` +is reached. A periodic poll catches rows that slipped past the live +channel (consumer offline, payload too large, etc.). + ### APPLICATION_EVENT **Description**: Spring Application Event Bus (in-memory) **Features**: Synchronous processing, JVM-local, simple integration @@ -99,23 +146,25 @@ class ServiceTest { When using `PublisherType.AUTO`, the system selects publishers in this priority order: 1. **KAFKA** - If Kafka is configured and available -2. **RABBITMQ** - If RabbitMQ is configured and available -3. **APPLICATION_EVENT** - If Spring context is available (always true) -4. **NOOP** - If explicitly enabled for testing +2. **RABBITMQ** - If RabbitMQ is configured and available +3. **POSTGRES** - If a PostgreSQL EDA connection is configured and available +4. **APPLICATION_EVENT** - If Spring context is available (always true) +5. **NOOP** - If explicitly enabled for testing ## Feature Comparison Matrix -| Feature | KAFKA | RABBITMQ | APPLICATION_EVENT | NOOP | -|---------|-------|----------|------------------|------| -| **Throughput** | Very High | High | High | N/A | -| **Persistence** | ✅ | ✅ | ❌ | ❌ | -| **Ordering** | ✅ | ❌ | ❌ | ❌ | -| **Partitioning** | ✅ | ❌ | ❌ | ❌ | -| **Complex Routing** | ❌ | ✅ | ❌ | ❌ | -| **Guaranteed Delivery** | ✅ | ✅ | ❌ | ❌ | -| **Multi-Instance** | ✅ | ✅ | ❌ | ❌ | -| **Cloud Native** | ✅ | ✅ | ❌ | ❌ | -| **Setup Complexity** | Medium | Medium | Low | None | +| Feature | KAFKA | RABBITMQ | POSTGRES | APPLICATION_EVENT | NOOP | +|---------|-------|----------|----------|------------------|------| +| **Throughput** | Very High | High | Medium | High | N/A | +| **Persistence** | ✅ | ✅ | ✅ | ❌ | ❌ | +| **Ordering** | ✅ | ❌ | ✅ | ❌ | ❌ | +| **Partitioning** | ✅ | ❌ | ❌ | ❌ | ❌ | +| **Complex Routing** | ❌ | ✅ | ❌ | ❌ | ❌ | +| **Guaranteed Delivery** | ✅ | ✅ | ✅ | ❌ | ❌ | +| **Multi-Instance** | ✅ | ✅ | ✅ | ❌ | ❌ | +| **Cloud Native** | ✅ | ✅ | ✅ | ❌ | ❌ | +| **Setup Complexity** | Medium | Medium | Low | Low | None | +| **Requires extra broker** | Yes | Yes | No (reuses DB) | No | No | ## Configuration Examples @@ -182,6 +231,13 @@ public class EventService { - Fan-out patterns are common - Priority queues are required +**Use POSTGRES when:** +- The service already uses PostgreSQL and you want event publishing without operating a separate broker +- You need transactional outbox semantics (event publish ties to a database transaction) +- Moderate throughput is sufficient (the table acts as the queue) +- You want persistent, auditable events with built-in retry / dead-letter status +- You can tolerate a small NOTIFY-delivery delay covered by the polling fallback + **Use APPLICATION_EVENT when:** - Simple internal communication is needed - Single-instance deployment diff --git a/pom.xml b/pom.xml index a99f9db..f0e81b3 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.fireflyframework fireflyframework-parent - 26.04.01 + 26.05.05 fireflyframework-eda @@ -103,6 +103,20 @@ spring-boot-starter-amqp + + + io.r2dbc + r2dbc-spi + + + io.r2dbc + r2dbc-pool + + + org.postgresql + r2dbc-postgresql + + @@ -175,6 +189,11 @@ rabbitmq test + + org.testcontainers + postgresql + test + diff --git a/src/main/java/org/fireflyframework/eda/annotation/PublisherType.java b/src/main/java/org/fireflyframework/eda/annotation/PublisherType.java index 185ecb4..d517ce3 100644 --- a/src/main/java/org/fireflyframework/eda/annotation/PublisherType.java +++ b/src/main/java/org/fireflyframework/eda/annotation/PublisherType.java @@ -28,7 +28,7 @@ public enum PublisherType { /** * Automatically select the best available publisher. *

- * Selection priority: KAFKA → RABBITMQ → APPLICATION_EVENT + * Selection priority: KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT */ AUTO, @@ -56,6 +56,17 @@ public enum PublisherType { */ RABBITMQ, + /** + * PostgreSQL using LISTEN/NOTIFY with a transactional outbox table. + *

+ * Provides reliable messaging without requiring a dedicated broker by + * persisting events to an outbox table and dispatching notifications via + * {@code pg_notify}. Supports acknowledgment, retries, and dead letter + * semantics through outbox row status updates. Ideal for services that + * already use PostgreSQL and want native transactional event publishing. + */ + POSTGRES, + /** * No-operation publisher that discards all messages. *

@@ -75,6 +86,7 @@ public String getDescription() { case APPLICATION_EVENT -> "Spring Application Events"; case KAFKA -> "Apache Kafka"; case RABBITMQ -> "RabbitMQ"; + case POSTGRES -> "PostgreSQL LISTEN/NOTIFY (transactional outbox)"; case NOOP -> "No-operation (disabled)"; }; } @@ -86,7 +98,7 @@ public String getDescription() { */ public boolean supportsPersistence() { return switch (this) { - case KAFKA, RABBITMQ -> true; + case KAFKA, RABBITMQ, POSTGRES -> true; case APPLICATION_EVENT, NOOP, AUTO -> false; }; } @@ -98,7 +110,7 @@ public boolean supportsPersistence() { */ public boolean supportsOrdering() { return switch (this) { - case KAFKA -> true; + case KAFKA, POSTGRES -> true; case RABBITMQ, APPLICATION_EVENT, NOOP, AUTO -> false; }; } diff --git a/src/main/java/org/fireflyframework/eda/config/FireflyEdaAutoConfiguration.java b/src/main/java/org/fireflyframework/eda/config/FireflyEdaAutoConfiguration.java index 0f8b653..1298582 100644 --- a/src/main/java/org/fireflyframework/eda/config/FireflyEdaAutoConfiguration.java +++ b/src/main/java/org/fireflyframework/eda/config/FireflyEdaAutoConfiguration.java @@ -17,7 +17,6 @@ package org.fireflyframework.eda.config; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.fireflyframework.eda.properties.EdaProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -26,6 +25,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; import org.springframework.core.Ordered; /** @@ -100,6 +100,21 @@ public FireflyEdaAutoConfiguration(EdaProperties props) { log.info(" - RabbitMQ Publisher: DISABLED"); } + // Postgres Publisher + var postgresPublisher = props.getPublishers().getPostgres().get("default"); + if (postgresPublisher != null && postgresPublisher.isEnabled()) { + String host = postgresPublisher.getHost(); + if (host != null && !host.isEmpty()) { + log.info(" - Postgres Publisher: CONFIGURED (host: {}:{}, table: {}.{})", + host, postgresPublisher.getPort(), + postgresPublisher.getSchema(), postgresPublisher.getOutboxTable()); + } else { + log.info(" - Postgres Publisher: NOT CONFIGURED (host not set)"); + } + } else { + log.info(" - Postgres Publisher: DISABLED"); + } + // Application Event Publisher if (props.getPublishers().getApplicationEvent().isEnabled()) { log.info(" - Application Event Publisher: ENABLED"); @@ -142,6 +157,22 @@ public FireflyEdaAutoConfiguration(EdaProperties props) { log.info(" - RabbitMQ Consumer: DISABLED"); } + // Postgres Consumer + var postgresConsumer = props.getConsumer().getPostgres().get("default"); + if (postgresConsumer != null && postgresConsumer.isEnabled()) { + String host = postgresConsumer.getHost(); + if (host != null && !host.isEmpty()) { + log.info(" - Postgres Consumer: CONFIGURED (host: {}:{}, table: {}.{}, polling: {})", + host, postgresConsumer.getPort(), + postgresConsumer.getSchema(), postgresConsumer.getOutboxTable(), + postgresConsumer.getPollingInterval()); + } else { + log.info(" - Postgres Consumer: NOT CONFIGURED (host not set)"); + } + } else { + log.info(" - Postgres Consumer: DISABLED"); + } + // Application Event Consumer if (props.getConsumer().getApplicationEvent().isEnabled()) { log.info(" - Application Event Consumer: ENABLED"); @@ -158,19 +189,36 @@ public FireflyEdaAutoConfiguration(EdaProperties props) { } /** - * Provides a default ObjectMapper configured for EDA serialization. + * Provides the application's primary {@link ObjectMapper} bean. + *

+ * This is the general-purpose mapper consumed by controllers, EDA serializers, + * CQRS handlers, and any component that injects {@code ObjectMapper} without a + * qualifier. It is marked {@link Primary @Primary} so it wins disambiguation + * over module-specific qualified mappers (e.g. {@code cacheObjectMapper}, + * {@code orchestrationPersistenceObjectMapper}, {@code sagaObjectMapper}) that + * coexist in the Spring context for internal framework use. + *

+ * Built through Spring Boot's autoconfigured {@link Jackson2ObjectMapperBuilder}, + * so it honours every {@code spring.jackson.*} property the application sets in + * its {@code application.yaml} — most importantly + * {@code spring.jackson.serialization.write-dates-as-timestamps=false} which + * makes {@link java.time.Instant} / {@link java.time.LocalDateTime} fields + * serialize as ISO-8601 strings instead of numeric epochs in WebFlux responses. + * The builder also auto-registers every Jackson module on the classpath + * ({@code jackson-datatype-jsr310}, {@code jackson-datatype-jdk8}, + * {@code jackson-module-parameter-names}, …). *

- * This bean is only created if no other ObjectMapper bean exists in the context. - * It includes JavaTimeModule for proper Java 8 date/time serialization. + * The {@link ConditionalOnMissingBean @ConditionalOnMissingBean} guard preserves + * compatibility with applications that already declare their own primary mapper. * + * @param builder Spring Boot's autoconfigured {@code Jackson2ObjectMapperBuilder} * @return configured ObjectMapper instance */ @Bean + @Primary @ConditionalOnMissingBean - public ObjectMapper objectMapper() { - ObjectMapper mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - return mapper; + public ObjectMapper objectMapper(org.springframework.http.converter.json.Jackson2ObjectMapperBuilder builder) { + return builder.build(); } } diff --git a/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresConsumerAutoConfiguration.java b/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresConsumerAutoConfiguration.java new file mode 100644 index 0000000..077a6a8 --- /dev/null +++ b/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresConsumerAutoConfiguration.java @@ -0,0 +1,111 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.config; + +import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; +import io.r2dbc.postgresql.PostgresqlConnectionFactory; +import io.r2dbc.spi.ConnectionFactory; +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eda.properties.EdaProperties; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + +import java.util.Map; + +/** + * Auto-configuration for the PostgreSQL EDA consumer infrastructure. + *

+ * Produces a dedicated {@link PostgresqlConnectionFactory} the consumer uses + * for the long-lived {@code LISTEN} connection as well as for short-lived + * {@code SELECT}/{@code UPDATE} statements that drain the outbox table. The + * factory is not wrapped in a pool: a single {@code LISTEN} connection is + * kept open for the consumer's lifetime, while query connections are created + * on demand and closed immediately after use. + *

+ * Configuration source: + * {@code firefly.eda.consumer.postgres.default.*} -- never + * {@code spring.r2dbc.*}. + */ +@AutoConfiguration(after = FireflyEdaAutoConfiguration.class) +@ConditionalOnClass({ConnectionFactory.class, PostgresqlConnectionFactory.class}) +@ConditionalOnProperty(prefix = "firefly.eda", name = "enabled", havingValue = "true", matchIfMissing = true) +@EnableConfigurationProperties(EdaProperties.class) +@Slf4j +public class FireflyEdaPostgresConsumerAutoConfiguration { + + public FireflyEdaPostgresConsumerAutoConfiguration(EdaProperties props) { + if (props.getConsumer().isEnabled()) { + var postgresConsumer = props.getConsumer().getPostgres().get("default"); + if (postgresConsumer != null && postgresConsumer.isEnabled() + && postgresConsumer.getHost() != null && !postgresConsumer.getHost().isEmpty()) { + log.info("--------------------------------------------------------------------------------"); + log.info("FIREFLY EDA POSTGRES CONSUMER - INITIALIZING"); + log.info("--------------------------------------------------------------------------------"); + } else { + log.debug("Firefly EDA Postgres Consumer auto-configuration loaded but not creating beans (disabled or not configured)"); + } + } else { + log.debug("Firefly EDA Postgres Consumer auto-configuration loaded but not creating beans (consumers globally disabled)"); + } + } + + @Bean(name = "fireflyEdaPostgresConsumerConnectionFactory") + @ConditionalOnMissingBean(name = "fireflyEdaPostgresConsumerConnectionFactory") + @ConditionalOnExpression("${firefly.eda.consumer.enabled:false} && ${firefly.eda.consumer.postgres.default.enabled:false} && '${firefly.eda.consumer.postgres.default.host:}'.length() > 0") + public PostgresqlConnectionFactory fireflyEdaPostgresConsumerConnectionFactory(EdaProperties props) { + EdaProperties.Consumer.PostgresConfig cfg = props.getConsumer().getPostgres().get("default"); + log.info("Creating PostgreSQL EDA consumer ConnectionFactory: host={}:{}, database={}, schema={}, table={}, channels={}, pollingInterval={}", + cfg.getHost(), cfg.getPort(), cfg.getDatabase(), cfg.getSchema(), cfg.getOutboxTable(), + cfg.getChannels(), cfg.getPollingInterval()); + + PostgresqlConnectionConfiguration.Builder builder = PostgresqlConnectionConfiguration.builder() + .host(cfg.getHost()) + .port(cfg.getPort()); + if (cfg.getDatabase() != null) { + builder.database(cfg.getDatabase()); + } + if (cfg.getUsername() != null) { + builder.username(cfg.getUsername()); + } + if (cfg.getPassword() != null) { + builder.password(cfg.getPassword()); + } + if (cfg.getSchema() != null) { + builder.schema(cfg.getSchema()); + } + if (cfg.getProperties() != null && !cfg.getProperties().isEmpty()) { + java.util.Map options = new java.util.HashMap<>(); + for (Map.Entry entry : cfg.getProperties().entrySet()) { + if (entry.getValue() != null) { + options.put(entry.getKey(), entry.getValue().toString()); + } + } + if (!options.isEmpty()) { + builder.options(options); + } + } + PostgresqlConnectionFactory factory = new PostgresqlConnectionFactory(builder.build()); + log.info("PostgreSQL EDA consumer ConnectionFactory created successfully"); + log.info("--------------------------------------------------------------------------------"); + return factory; + } +} diff --git a/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresPublisherAutoConfiguration.java b/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresPublisherAutoConfiguration.java new file mode 100644 index 0000000..8d62d38 --- /dev/null +++ b/src/main/java/org/fireflyframework/eda/config/FireflyEdaPostgresPublisherAutoConfiguration.java @@ -0,0 +1,202 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.config; + +import io.r2dbc.pool.ConnectionPool; +import io.r2dbc.pool.ConnectionPoolConfiguration; +import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; +import io.r2dbc.postgresql.PostgresqlConnectionFactory; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eda.properties.EdaProperties; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.Map; + +/** + * Auto-configuration for the PostgreSQL EDA publisher infrastructure. + *

+ * Creates a pooled {@link ConnectionFactory} dedicated to writing outbox rows + * and, when {@code firefly.eda.publishers.postgres.default.auto-create-schema} + * is {@code true}, provisions the outbox table, supporting index, NOTIFY + * function, and AFTER INSERT trigger required by the + * {@link org.fireflyframework.eda.publisher.postgres.PostgresEventPublisher}. + *

+ * Configuration source: + * {@code firefly.eda.publishers.postgres.default.*} -- never + * {@code spring.r2dbc.*}. + */ +@AutoConfiguration(after = FireflyEdaAutoConfiguration.class) +@ConditionalOnClass({ConnectionFactory.class, PostgresqlConnectionFactory.class}) +@ConditionalOnProperty(prefix = "firefly.eda", name = "enabled", havingValue = "true", matchIfMissing = true) +@EnableConfigurationProperties(EdaProperties.class) +@Slf4j +public class FireflyEdaPostgresPublisherAutoConfiguration { + + public FireflyEdaPostgresPublisherAutoConfiguration(EdaProperties props) { + if (props.getPublishers().isEnabled()) { + var postgresPublisher = props.getPublishers().getPostgres().get("default"); + if (postgresPublisher != null && postgresPublisher.isEnabled() + && postgresPublisher.getHost() != null && !postgresPublisher.getHost().isEmpty()) { + log.info("--------------------------------------------------------------------------------"); + log.info("FIREFLY EDA POSTGRES PUBLISHER - INITIALIZING"); + log.info("--------------------------------------------------------------------------------"); + } else { + log.debug("Firefly EDA Postgres Publisher auto-configuration loaded but not creating beans (disabled or not configured)"); + } + } else { + log.debug("Firefly EDA Postgres Publisher auto-configuration loaded but not creating beans (publishers globally disabled)"); + } + } + + /** + * Creates the pooled R2DBC {@link ConnectionFactory} the publisher uses to + * insert outbox rows. The bean is created only when the publisher is + * explicitly enabled and a host is configured under + * {@code firefly.eda.publishers.postgres.default.host}. + */ + @Bean(name = "fireflyEdaPostgresPublisherConnectionFactory", destroyMethod = "dispose") + @ConditionalOnMissingBean(name = "fireflyEdaPostgresPublisherConnectionFactory") + @ConditionalOnExpression("${firefly.eda.publishers.enabled:false} && ${firefly.eda.publishers.postgres.default.enabled:false} && '${firefly.eda.publishers.postgres.default.host:}'.length() > 0") + public ConnectionPool fireflyEdaPostgresPublisherConnectionFactory(EdaProperties props) { + EdaProperties.Publishers.PostgresConfig cfg = props.getPublishers().getPostgres().get("default"); + log.info("Creating PostgreSQL EDA publisher ConnectionFactory: host={}:{}, database={}, schema={}, table={}", + cfg.getHost(), cfg.getPort(), cfg.getDatabase(), cfg.getSchema(), cfg.getOutboxTable()); + + PostgresqlConnectionConfiguration.Builder builder = PostgresqlConnectionConfiguration.builder() + .host(cfg.getHost()) + .port(cfg.getPort()); + if (cfg.getDatabase() != null) { + builder.database(cfg.getDatabase()); + } + if (cfg.getUsername() != null) { + builder.username(cfg.getUsername()); + } + if (cfg.getPassword() != null) { + builder.password(cfg.getPassword()); + } + if (cfg.getSchema() != null) { + builder.schema(cfg.getSchema()); + } + if (cfg.getProperties() != null && !cfg.getProperties().isEmpty()) { + java.util.Map options = new java.util.HashMap<>(); + for (Map.Entry entry : cfg.getProperties().entrySet()) { + if (entry.getValue() != null) { + options.put(entry.getKey(), entry.getValue().toString()); + } + } + if (!options.isEmpty()) { + builder.options(options); + } + } + PostgresqlConnectionFactory delegate = new PostgresqlConnectionFactory(builder.build()); + + ConnectionPoolConfiguration poolConfig = ConnectionPoolConfiguration.builder(delegate) + .maxSize(Math.max(1, cfg.getMaxPoolSize())) + .initialSize(1) + .maxIdleTime(Duration.ofMinutes(5)) + .build(); + ConnectionPool pool = new ConnectionPool(poolConfig); + + if (cfg.isAutoCreateSchema()) { + initialiseSchema(pool, cfg); + } + + log.info("PostgreSQL EDA publisher ConnectionFactory created successfully"); + log.info("--------------------------------------------------------------------------------"); + return pool; + } + + /** + * Provisions the outbox table, supporting index, NOTIFY function and + * trigger. Statements are idempotent so repeated startups are safe. + */ + private void initialiseSchema(ConnectionFactory factory, + EdaProperties.Publishers.PostgresConfig cfg) { + String schema = cfg.getSchema() != null ? cfg.getSchema() : "public"; + String table = cfg.getOutboxTable() != null ? cfg.getOutboxTable() : "firefly_eda_outbox"; + String qualified = "\"" + schema + "\".\"" + table + "\""; + String triggerName = table + "_notify"; + String functionName = "\"" + schema + "\".firefly_eda_notify_event"; + + String createTable = "CREATE TABLE IF NOT EXISTS " + qualified + " (" + + "id BIGSERIAL PRIMARY KEY, " + + "destination VARCHAR(255) NOT NULL, " + + "channel VARCHAR(63) NOT NULL, " + + "event_type VARCHAR(255), " + + "payload BYTEA NOT NULL, " + + "headers JSONB, " + + "transaction_id VARCHAR(255), " + + "publisher_type VARCHAR(50), " + + "connection_id VARCHAR(50), " + + "event_class VARCHAR(500), " + + "status VARCHAR(20) NOT NULL DEFAULT 'PENDING', " + + "attempts INT NOT NULL DEFAULT 0, " + + "error_message TEXT, " + + "created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), " + + "processed_at TIMESTAMPTZ, " + + "failed_at TIMESTAMPTZ)"; + + String createIndex = "CREATE INDEX IF NOT EXISTS " + table + "_status_created_idx " + + "ON " + qualified + " (status, created_at) WHERE status = 'PENDING'"; + String createChannelIndex = "CREATE INDEX IF NOT EXISTS " + table + "_channel_idx " + + "ON " + qualified + " (channel, status)"; + + String createFunction = "CREATE OR REPLACE FUNCTION " + functionName + "() RETURNS TRIGGER AS $$ " + + "BEGIN PERFORM pg_notify(NEW.channel, NEW.id::text); RETURN NEW; END; " + + "$$ LANGUAGE plpgsql"; + + String dropTrigger = "DROP TRIGGER IF EXISTS " + triggerName + " ON " + qualified; + String createTrigger = "CREATE TRIGGER " + triggerName + " AFTER INSERT ON " + qualified + + " FOR EACH ROW EXECUTE FUNCTION " + functionName + "()"; + + try { + Mono.usingWhen( + Mono.from(factory.create()), + connection -> executeAll(connection, + createTable, createIndex, createChannelIndex, + createFunction, dropTrigger, createTrigger), + Connection::close) + .block(Duration.ofSeconds(30)); + log.info("PostgreSQL EDA outbox schema ensured: {}.{}", schema, table); + } catch (Exception e) { + log.error("Failed to initialise PostgreSQL EDA outbox schema {}.{}: {}", + schema, table, e.getMessage()); + log.error("Publishing will fail until the schema is created manually"); + } + } + + private Mono executeAll(Connection connection, String... statements) { + Mono chain = Mono.empty(); + for (String sql : statements) { + String stmt = sql; + chain = chain.then(Mono.from(connection.createStatement(stmt).execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .then()); + } + return chain; + } +} diff --git a/src/main/java/org/fireflyframework/eda/consumer/postgres/PostgresEventConsumer.java b/src/main/java/org/fireflyframework/eda/consumer/postgres/PostgresEventConsumer.java new file mode 100644 index 0000000..a73b2d7 --- /dev/null +++ b/src/main/java/org/fireflyframework/eda/consumer/postgres/PostgresEventConsumer.java @@ -0,0 +1,631 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.consumer.postgres; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.r2dbc.postgresql.api.PostgresqlConnection; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.Result; +import io.r2dbc.spi.Row; +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eda.annotation.PublisherType; +import org.fireflyframework.eda.consumer.ConsumerHealth; +import org.fireflyframework.eda.consumer.EventConsumer; +import org.fireflyframework.eda.event.EventEnvelope; +import org.fireflyframework.eda.listener.EventListenerProcessor; +import org.fireflyframework.eda.properties.EdaProperties; +import org.fireflyframework.eda.publisher.postgres.PostgresChannelMapper; +import org.fireflyframework.eda.serialization.MessageSerializer; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.DependsOn; +import org.springframework.stereotype.Component; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; + +import jakarta.annotation.PreDestroy; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * PostgreSQL implementation of {@link EventConsumer}. + *

+ * The consumer dedicates a single long-lived R2DBC connection to receive + * {@code NOTIFY} messages, and uses a pooled {@link ConnectionFactory} for + * the {@code SELECT}/{@code UPDATE} statements that drain the outbox table. + * Channels are derived from {@code @EventListener} annotations discovered by + * {@link EventListenerProcessor}; if none are present, the channels listed + * in {@code firefly.eda.consumer.postgres.default.channels} are used. + *

+ * For each notification, the corresponding outbox row is fetched, an + * {@link EventEnvelope} is built, the event is dispatched through the + * listener processor, and the row is marked {@code PROCESSED} on success or + * incremented (and possibly transitioned to {@code DEAD_LETTER}) on failure. + * A periodic poll catches any rows that slipped past the live channel — for + * example, when the listener was offline at insert time or the payload + * exceeded the {@code NOTIFY} limit and the dispatcher had to fall back to a + * scan. + */ +@Component +@ConditionalOnClass(ConnectionFactory.class) +@ConditionalOnBean(name = "fireflyEdaPostgresConsumerConnectionFactory") +@ConditionalOnProperty(prefix = "firefly.eda.consumer", name = "enabled", havingValue = "true") +@DependsOn("eventListenerProcessor") +@Slf4j +public class PostgresEventConsumer implements EventConsumer { + + private final ObjectProvider connectionFactoryProvider; + private final EventListenerProcessor eventListenerProcessor; + private final MessageSerializer messageSerializer; + private final EdaProperties edaProperties; + private final ObjectMapper objectMapper; + + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicLong messagesConsumed = new AtomicLong(); + private final AtomicLong messagesProcessed = new AtomicLong(); + private final AtomicLong messagesFailures = new AtomicLong(); + private final AtomicReference listenConnection = new AtomicReference<>(); + private final AtomicReference notificationSubscription = new AtomicReference<>(); + private final AtomicReference pollingSubscription = new AtomicReference<>(); + private volatile Sinks.Many eventSink = Sinks.many().multicast().onBackpressureBuffer(); + private final Set subscribedChannels = Collections.synchronizedSet(new LinkedHashSet<>()); + + public PostgresEventConsumer( + @Qualifier("fireflyEdaPostgresConsumerConnectionFactory") + ObjectProvider connectionFactoryProvider, + EventListenerProcessor eventListenerProcessor, + MessageSerializer messageSerializer, + EdaProperties edaProperties, + ObjectMapper objectMapper) { + this.connectionFactoryProvider = connectionFactoryProvider; + this.eventListenerProcessor = eventListenerProcessor; + this.messageSerializer = messageSerializer; + this.edaProperties = edaProperties; + this.objectMapper = objectMapper; + eventListenerProcessor.registerListenerChangeCallback(this::refreshChannels); + log.info("Postgres event consumer initialised; channels will be (re)subscribed at start()"); + } + + @Override + public Flux consume() { + return eventSink.asFlux() + .doOnSubscribe(subscription -> { + if (!running.get()) { + start().subscribe(); + } + }); + } + + @Override + public Flux consume(String... destinations) { + if (destinations == null || destinations.length == 0) { + return consume(); + } + Set filter = new HashSet<>(Arrays.asList(destinations)); + return consume().filter(envelope -> envelope.destination() != null + && filter.contains(envelope.destination())); + } + + @Override + public Mono start() { + return Mono.defer(() -> { + if (!running.compareAndSet(false, true)) { + log.debug("Postgres event consumer already running"); + return Mono.empty(); + } + eventSink = Sinks.many().multicast().onBackpressureBuffer(); + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + if (factory == null) { + running.set(false); + return Mono.error(new IllegalStateException( + "PostgreSQL EDA consumer ConnectionFactory is not available")); + } + return openListenConnection(factory) + .flatMap(this::subscribeToChannels) + .doOnSuccess(unused -> startPolling()) + .doOnError(e -> { + log.error("Failed to start Postgres event consumer", e); + running.set(false); + }); + }); + } + + @Override + public Mono stop() { + return Mono.defer(() -> { + if (!running.compareAndSet(true, false)) { + return Mono.empty(); + } + log.info("Stopping Postgres event consumer"); + disposeQuietly(notificationSubscription.getAndSet(null)); + disposeQuietly(pollingSubscription.getAndSet(null)); + eventSink.tryEmitComplete(); + PostgresqlConnection connection = listenConnection.getAndSet(null); + if (connection == null) { + return Mono.empty(); + } + return Mono.from(connection.close()) + .doOnError(e -> log.warn("Error closing Postgres LISTEN connection", e)) + .onErrorResume(e -> Mono.empty()) + .then(); + }); + } + + @PreDestroy + public void shutdown() { + stop().block(Duration.ofSeconds(5)); + } + + @Override + public boolean isRunning() { + return running.get(); + } + + @Override + public String getConsumerType() { + return "POSTGRES"; + } + + @Override + public boolean isAvailable() { + return edaProperties.getConsumer().isEnabled() + && connectionFactoryProvider.getIfAvailable() != null; + } + + @Override + public Mono getHealth() { + Map details = new HashMap<>(); + details.put("running", isRunning()); + details.put("subscribed_channels", new LinkedHashSet<>(subscribedChannels)); + details.put("polling_interval", config().getPollingInterval().toString()); + details.put("outbox_table", qualifiedTable(config())); + return Mono.just(ConsumerHealth.builder() + .consumerType(getConsumerType()) + .available(isAvailable()) + .running(isRunning()) + .status(isAvailable() && isRunning() ? "UP" : "DOWN") + .messagesConsumed(messagesConsumed.get()) + .messagesProcessed(messagesProcessed.get()) + .messagesFailures(messagesFailures.get()) + .lastChecked(Instant.now()) + .details(details) + .build()); + } + + private Mono openListenConnection(ConnectionFactory factory) { + return Mono.from(factory.create()) + .flatMap(connection -> { + if (!(connection instanceof PostgresqlConnection pg)) { + return Mono.from(connection.close()) + .then(Mono.error(new IllegalStateException( + "Expected PostgresqlConnection for LISTEN, got " + + connection.getClass().getName()))); + } + listenConnection.set(pg); + Disposable subscription = pg.getNotifications() + .subscribeOn(Schedulers.boundedElastic()) + .subscribe( + notification -> handleNotification(notification.getName(), + notification.getParameter()), + error -> log.error("Postgres notification stream errored", error)); + notificationSubscription.set(subscription); + return Mono.just(pg); + }); + } + + private Mono subscribeToChannels(PostgresqlConnection connection) { + Set channels = resolveChannels(); + if (channels.isEmpty()) { + log.warn("Postgres consumer has no channels configured; no events will be received"); + return Mono.empty(); + } + return Flux.fromIterable(channels) + .concatMap(channel -> Mono.from(connection.createStatement( + "LISTEN " + quoteIdent(channel)).execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .doOnSuccess(rows -> { + subscribedChannels.add(channel); + log.info("Postgres consumer LISTENing on channel '{}'", channel); + }) + .then()) + .then(); + } + + private void startPolling() { + Duration interval = config().getPollingInterval(); + if (interval == null || interval.isZero() || interval.isNegative()) { + log.debug("Postgres consumer polling disabled (pollingInterval={})", interval); + return; + } + Disposable subscription = Flux.interval(interval, interval, Schedulers.boundedElastic()) + .onBackpressureDrop() + .concatMap(tick -> pollPendingEvents()) + .subscribe( + ignored -> { }, + error -> log.warn("Postgres consumer polling failed; will retry on next tick", + error)); + pollingSubscription.set(subscription); + log.info("Postgres consumer polling every {} as a NOTIFY fallback", interval); + } + + private Mono pollPendingEvents() { + EdaProperties.Consumer.PostgresConfig cfg = config(); + Set channels = subscribedChannels; + if (channels.isEmpty()) { + return Mono.empty(); + } + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + if (factory == null) { + return Mono.empty(); + } + String sql = "SELECT id FROM " + qualifiedTable(cfg) + + " WHERE status = 'PENDING' AND attempts < $1 AND channel = ANY($2) " + + "ORDER BY created_at LIMIT $3"; + String[] channelsArr = channels.toArray(new String[0]); + int maxAttempts = cfg.getMaxAttempts(); + int batchSize = cfg.getBatchSize(); + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> Flux.from(connection.createStatement(sql) + .bind("$1", maxAttempts) + .bind("$2", channelsArr) + .bind("$3", batchSize) + .execute()) + .flatMap(result -> result.map((row, meta) -> row.get("id", Long.class))) + .concatMap(this::handleEventById) + .then(), + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close); + } + + private void handleNotification(String channel, String parameter) { + log.debug("Postgres consumer received NOTIFY on channel '{}' with parameter '{}'", + channel, parameter); + Long id = parseLong(parameter); + if (id == null) { + log.warn("Skipping notification with non-numeric parameter '{}'", parameter); + return; + } + handleEventById(id) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe( + ignored -> { }, + error -> log.error("Failed to handle event id {} from channel '{}'", id, channel, + error)); + } + + private Mono handleEventById(Long id) { + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + if (factory == null || id == null) { + return Mono.empty(); + } + EdaProperties.Consumer.PostgresConfig cfg = config(); + String table = qualifiedTable(cfg); + String sql = "UPDATE " + table + " SET attempts = attempts + 1 " + + "WHERE id = $1 AND status = 'PENDING' " + + "RETURNING id, destination, channel, event_type, payload, headers, " + + "transaction_id, publisher_type, connection_id, event_class, attempts"; + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> Flux.from(connection.createStatement(sql).bind("$1", id).execute()) + .flatMap(result -> result.map((row, meta) -> outboxRowFrom(row))) + .next() + .flatMap(this::dispatch), + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close); + } + + private OutboxRow outboxRowFrom(Row row) { + Long rowId = row.get("id", Long.class); + String destination = row.get("destination", String.class); + String channel = row.get("channel", String.class); + String eventType = row.get("event_type", String.class); + ByteArrayHolder payload = readPayload(row); + String headersJson = row.get("headers", String.class); + String transactionId = row.get("transaction_id", String.class); + String publisherType = row.get("publisher_type", String.class); + String connectionId = row.get("connection_id", String.class); + String eventClass = row.get("event_class", String.class); + Integer attempts = row.get("attempts", Integer.class); + return new OutboxRow(rowId, destination, channel, eventType, payload.bytes(), headersJson, + transactionId, publisherType, connectionId, eventClass, attempts); + } + + private ByteArrayHolder readPayload(Row row) { + byte[] direct = row.get("payload", byte[].class); + if (direct != null) { + return new ByteArrayHolder(direct); + } + java.nio.ByteBuffer buffer = row.get("payload", java.nio.ByteBuffer.class); + if (buffer != null) { + byte[] dst = new byte[buffer.remaining()]; + buffer.get(dst); + return new ByteArrayHolder(dst); + } + return new ByteArrayHolder(new byte[0]); + } + + private Mono dispatch(OutboxRow row) { + messagesConsumed.incrementAndGet(); + Map headers = deserializeHeaders(row.headersJson()); + headers.putIfAbsent("destination", row.destination()); + headers.putIfAbsent("event_class", row.eventClass()); + if (row.transactionId() != null) { + headers.putIfAbsent("transaction_id", row.transactionId()); + } + Object event = deserializeEvent(row.payload(), row.eventClass()); + EventEnvelope envelope = EventEnvelope.forConsuming( + row.destination(), + row.eventType() != null ? row.eventType() : eventTypeFrom(event), + event, + row.transactionId(), + headers, + EventEnvelope.EventMetadata.empty(), + Instant.now(), + getConsumerType(), + row.connectionId() != null ? row.connectionId() : "default", + new OutboxAckCallback(row.id())); + emit(envelope); + return eventListenerProcessor.processEvent(event, headers) + .then(markProcessed(row.id())) + .doOnSuccess(v -> messagesProcessed.incrementAndGet()) + .onErrorResume(error -> { + messagesFailures.incrementAndGet(); + log.error("Listener pipeline rejected event id {}; recording failure", row.id(), + error); + return markFailed(row.id(), error); + }); + } + + private void emit(EventEnvelope envelope) { + Sinks.EmitResult result = eventSink.tryEmitNext(envelope); + if (result.isFailure()) { + log.debug("Postgres consumer dropped envelope for destination {} ({}): no active subscriber", + envelope.destination(), result); + } + } + + private Mono markProcessed(Long id) { + return updateStatus(id, "UPDATE " + qualifiedTable(config()) + + " SET status = 'PROCESSED', processed_at = NOW(), error_message = NULL " + + "WHERE id = $1"); + } + + private Mono markFailed(Long id, Throwable error) { + EdaProperties.Consumer.PostgresConfig cfg = config(); + int maxAttempts = cfg.getMaxAttempts(); + String table = qualifiedTable(cfg); + String sql = "UPDATE " + table + " SET status = CASE " + + "WHEN attempts >= $2 THEN 'DEAD_LETTER' ELSE 'PENDING' END, " + + "failed_at = NOW(), error_message = $3 WHERE id = $1"; + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + if (factory == null) { + return Mono.empty(); + } + String message = error != null && error.getMessage() != null + ? error.getMessage() : "unknown error"; + if (message.length() > 4000) { + message = message.substring(0, 4000); + } + String finalMessage = message; + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> Mono.from(connection.createStatement(sql) + .bind("$1", id) + .bind("$2", maxAttempts) + .bind("$3", finalMessage) + .execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .then(), + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close); + } + + private Mono updateStatus(Long id, String sql) { + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + if (factory == null) { + return Mono.empty(); + } + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> Mono.from(connection.createStatement(sql).bind("$1", id).execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .then(), + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close); + } + + private void refreshChannels() { + if (!running.get()) { + return; + } + PostgresqlConnection connection = listenConnection.get(); + if (connection == null) { + return; + } + Set desired = resolveChannels(); + Set toListen = new LinkedHashSet<>(desired); + toListen.removeAll(subscribedChannels); + Set toUnlisten = new LinkedHashSet<>(subscribedChannels); + toUnlisten.removeAll(desired); + + if (toListen.isEmpty() && toUnlisten.isEmpty()) { + return; + } + log.info("Refreshing Postgres LISTEN channels: +{} -{}", toListen, toUnlisten); + Flux.fromIterable(toListen) + .concatMap(channel -> Mono.from(connection.createStatement( + "LISTEN " + quoteIdent(channel)).execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .doOnSuccess(rows -> subscribedChannels.add(channel)) + .then()) + .thenMany(Flux.fromIterable(toUnlisten) + .concatMap(channel -> Mono.from(connection.createStatement( + "UNLISTEN " + quoteIdent(channel)).execute()) + .flatMap(result -> Mono.from(result.getRowsUpdated())) + .doOnSuccess(rows -> subscribedChannels.remove(channel)) + .then())) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe( + ignored -> { }, + error -> log.error("Failed to refresh Postgres LISTEN channels", error)); + } + + private Set resolveChannels() { + Set destinations = new LinkedHashSet<>( + eventListenerProcessor.getTopicsForConsumerType("POSTGRES")); + String configured = config().getChannels(); + if (configured != null && !configured.isBlank()) { + for (String dest : configured.split(",")) { + String trimmed = dest.trim(); + if (!trimmed.isEmpty()) { + destinations.add(trimmed); + } + } + } + Set channels = new LinkedHashSet<>(); + for (String destination : destinations) { + if ("*".equals(destination)) { + continue; + } + channels.add(PostgresChannelMapper.toChannel(destination)); + } + return channels; + } + + private Object deserializeEvent(byte[] payload, String eventClassName) { + if (payload == null || payload.length == 0) { + return Collections.emptyMap(); + } + if (eventClassName != null) { + try { + Class clazz = Class.forName(eventClassName); + return messageSerializer.deserialize(payload, clazz); + } catch (ClassNotFoundException e) { + log.warn("Event class '{}' not found, falling back to Object.class", eventClassName); + } catch (Exception e) { + log.warn("Failed to deserialize event as {}: {}", eventClassName, e.getMessage()); + } + } + try { + return messageSerializer.deserialize(payload, Object.class); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize Postgres event payload", e); + } + } + + private Map deserializeHeaders(String headersJson) { + if (headersJson == null || headersJson.isBlank()) { + return new HashMap<>(); + } + try { + Map headers = objectMapper.readValue(headersJson, + new TypeReference<>() { }); + return headers != null ? new HashMap<>(headers) : new HashMap<>(); + } catch (Exception e) { + log.warn("Failed to parse outbox headers JSON, using empty map: {}", e.getMessage()); + return new HashMap<>(); + } + } + + private String eventTypeFrom(Object event) { + return event != null ? event.getClass().getSimpleName() : "unknown"; + } + + private EdaProperties.Consumer.PostgresConfig config() { + EdaProperties.Consumer.PostgresConfig cfg = edaProperties.getConsumer().getPostgres() + .getOrDefault("default", null); + return cfg != null ? cfg : new EdaProperties.Consumer.PostgresConfig(); + } + + private String qualifiedTable(EdaProperties.Consumer.PostgresConfig cfg) { + String schema = cfg.getSchema() != null ? cfg.getSchema() : "public"; + String table = cfg.getOutboxTable() != null ? cfg.getOutboxTable() : "firefly_eda_outbox"; + return schema + "." + table; + } + + private static String quoteIdent(String identifier) { + return "\"" + identifier.replace("\"", "\"\"") + "\""; + } + + private static Long parseLong(String value) { + if (value == null) { + return null; + } + try { + return Long.parseLong(value.trim()); + } catch (NumberFormatException e) { + return null; + } + } + + private static void disposeQuietly(Disposable disposable) { + if (disposable != null && !disposable.isDisposed()) { + disposable.dispose(); + } + } + + private record OutboxRow(Long id, String destination, String channel, String eventType, + byte[] payload, String headersJson, String transactionId, + String publisherType, String connectionId, String eventClass, + Integer attempts) { } + + private record ByteArrayHolder(byte[] bytes) { } + + /** + * Acknowledgment callback that marks the outbox row as processed or failed. + */ + private final class OutboxAckCallback implements EventEnvelope.AckCallback { + private final Long id; + + OutboxAckCallback(Long id) { + this.id = id; + } + + @Override + public Mono acknowledge() { + return markProcessed(id); + } + + @Override + public Mono reject(Throwable error) { + return markFailed(id, error); + } + } +} diff --git a/src/main/java/org/fireflyframework/eda/properties/EdaProperties.java b/src/main/java/org/fireflyframework/eda/properties/EdaProperties.java index fc13217..3a534e9 100644 --- a/src/main/java/org/fireflyframework/eda/properties/EdaProperties.java +++ b/src/main/java/org/fireflyframework/eda/properties/EdaProperties.java @@ -185,10 +185,17 @@ public static class Publishers { @Valid private final Map rabbitmq = new HashMap<>(); + /** + * PostgreSQL publisher configurations by connection ID. + */ + @Valid + private final Map postgres = new HashMap<>(); + // Initialize default connections public Publishers() { kafka.put("default", new KafkaConfig()); rabbitmq.put("default", new RabbitMqConfig()); + postgres.put("default", new PostgresConfig()); } @Data @@ -219,6 +226,71 @@ public static class RabbitMqConfig { private String defaultRoutingKey = "event"; private Map properties = new HashMap<>(); } + + @Data + public static class PostgresConfig { + /** + * Whether this PostgreSQL publisher connection is enabled. + */ + private boolean enabled = false; + + /** + * PostgreSQL host name or IP. + */ + private String host = "localhost"; + + /** + * PostgreSQL port (default 5432). + */ + private int port = 5432; + + /** + * Database name. + */ + private String database; + + /** + * Database username. + */ + private String username; + + /** + * Database password. + */ + private String password; + + /** + * Schema name where the outbox table lives. + */ + private String schema = "public"; + + /** + * Outbox table name used to persist events. + */ + private String outboxTable = "firefly_eda_outbox"; + + /** + * Default destination/channel when none is provided to publish(). + */ + private String defaultDestination = "events"; + + /** + * If true, the auto-configuration creates the outbox table and + * NOTIFY trigger automatically at startup. Disable in environments + * that prefer to manage schema externally (e.g., via Flyway). + */ + private boolean autoCreateSchema = true; + + /** + * Maximum size of the R2DBC connection pool used by the publisher. + */ + private int maxPoolSize = 10; + + /** + * Additional R2DBC connection properties. + */ + private Map properties = new HashMap<>(); + } } @Data @@ -264,6 +336,12 @@ public static class Consumer { @Valid private final Map rabbitmq = new HashMap<>(); + /** + * PostgreSQL consumer configurations by connection ID. + */ + @Valid + private final Map postgres = new HashMap<>(); + /** * NOOP consumer configuration. */ @@ -274,6 +352,7 @@ public static class Consumer { public Consumer() { kafka.put("default", new KafkaConfig()); rabbitmq.put("default", new RabbitMqConfig()); + postgres.put("default", new PostgresConfig()); } @Data @@ -316,6 +395,90 @@ public static class RabbitMqConfig { private Map properties = new HashMap<>(); } + @Data + public static class PostgresConfig { + /** + * Whether this PostgreSQL consumer connection is enabled. + */ + private boolean enabled = false; + + /** + * PostgreSQL host name or IP. + */ + private String host = "localhost"; + + /** + * PostgreSQL port (default 5432). + */ + private int port = 5432; + + /** + * Database name. + */ + private String database; + + /** + * Database username. + */ + private String username; + + /** + * Database password. + */ + private String password; + + /** + * Schema name where the outbox table lives. + */ + private String schema = "public"; + + /** + * Outbox table name used to read events. + */ + private String outboxTable = "firefly_eda_outbox"; + + /** + * Comma-separated list of channels/destinations to listen on when no + * {@code @EventListener} annotations are discovered. Defaults to + * the global default destination. + */ + private String channels = "events"; + + /** + * Polling interval used as a fallback to LISTEN/NOTIFY in case + * notifications are missed (server restart, dropped connection, + * payload too large, etc.). Set to {@link Duration#ZERO} to + * disable polling. + */ + private Duration pollingInterval = Duration.ofSeconds(30); + + /** + * Maximum number of attempts before an event is moved to dead + * letter status in the outbox table. + */ + @Min(value = 1, message = "Max attempts must be at least 1") + private int maxAttempts = 3; + + /** + * Maximum number of events fetched in a single poll cycle. + */ + @Min(value = 1, message = "Batch size must be at least 1") + private int batchSize = 50; + + /** + * Maximum size of the R2DBC connection pool used by the consumer. + * Note: LISTEN/NOTIFY uses one long-lived connection outside this + * pool; this pool services SELECT/UPDATE queries on the outbox. + */ + @Min(value = 1, message = "Max pool size must be at least 1") + private int maxPoolSize = 5; + + /** + * Additional R2DBC connection properties. + */ + private Map properties = new HashMap<>(); + } + @Data public static class Noop { private boolean enabled = false; @@ -345,6 +508,7 @@ public Object getPublisherConfig(PublisherType publisherType, String connectionI case APPLICATION_EVENT -> publishers.getApplicationEvent(); case KAFKA -> publishers.getKafka().get(connId); case RABBITMQ -> publishers.getRabbitmq().get(connId); + case POSTGRES -> publishers.getPostgres().get(connId); default -> null; }; } diff --git a/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java b/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java index 69a2c52..963821d 100644 --- a/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java +++ b/src/main/java/org/fireflyframework/eda/publisher/EventPublisherFactory.java @@ -46,11 +46,13 @@ public class EventPublisherFactory { private final List availablePublishers; private final EdaProperties edaProperties; private final ObjectProvider resilienceFactoryProvider; - + // Cache publishers by type and connection ID private final Map publisherCache = new ConcurrentHashMap<>(); + private volatile ResilientEventPublisherFactory cachedResilienceFactory; private Map publisherMap; + /** * Gets an event publisher for the specified type and connection ID. * @@ -63,8 +65,31 @@ public EventPublisher getPublisher(PublisherType publisherType, String connectio return getAutoSelectedPublisher(connectionId); } - String cacheKey = getCacheKey(publisherType, connectionId); - return publisherCache.computeIfAbsent(cacheKey, key -> createPublisher(publisherType, connectionId)); + // Normalise the connection ID up front so that the cache key and the + // resilience4j entry name stay in lockstep -- otherwise + // getPublisher(type, null) and getPublisher(type, "default") collide + // in the cache but produce different resilience names, leaving the + // first caller's wrapped publisher pointing at a circuit breaker the + // second caller cannot look up by name. + String resolvedConnectionId = connectionId != null + ? connectionId + : edaProperties.getDefaultConnectionId(); + + // Drop the publisher cache whenever the resilience factory instance + // changes (cross-context bean shuffling on Spring's TestContext cache + // can otherwise leave us with publishers wrapped around stale + // resilience4j registries -- the autowired registries in this context + // would then look empty to assertions). + ResilientEventPublisherFactory currentResilienceFactory = + resilienceFactoryProvider.getIfAvailable(); + if (currentResilienceFactory != cachedResilienceFactory) { + publisherCache.clear(); + cachedResilienceFactory = currentResilienceFactory; + } + + String cacheKey = getCacheKey(publisherType, resolvedConnectionId); + return publisherCache.computeIfAbsent(cacheKey, + key -> createPublisher(publisherType, resolvedConnectionId)); } /** @@ -238,10 +263,11 @@ private EventPublisher createPublisher(PublisherType publisherType, String conne * @return the selected publisher or null if none available */ private EventPublisher getAutoSelectedPublisher(String connectionId) { - // Priority order: KAFKA → RABBITMQ → APPLICATION_EVENT + // Priority order: KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT PublisherType[] priorityOrder = { PublisherType.KAFKA, PublisherType.RABBITMQ, + PublisherType.POSTGRES, PublisherType.APPLICATION_EVENT }; diff --git a/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapper.java b/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapper.java new file mode 100644 index 0000000..2b774c0 --- /dev/null +++ b/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapper.java @@ -0,0 +1,94 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.publisher.postgres; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * Maps arbitrary destination strings (e.g., {@code "order-events"}, + * {@code "user.created"}) to deterministic, valid PostgreSQL channel names + * suitable for use with {@code LISTEN}/{@code NOTIFY}. + *

+ * PostgreSQL identifiers are limited to 63 bytes (NAMEDATALEN - 1). Channel + * names must be valid SQL identifiers when not quoted. To keep the mapping + * deterministic across publisher and consumer: + *

    + *
  • Non-alphanumeric characters are converted to {@code _}.
  • + *
  • The result is lowercased and prefixed with {@code firefly_eda_}.
  • + *
  • If the result exceeds 63 bytes, the tail is replaced with an 8-char + * lower-case hex SHA-256 digest of the original destination to keep the + * mapping unique.
  • + *
+ */ +public final class PostgresChannelMapper { + + private static final String PREFIX = "firefly_eda_"; + private static final int MAX_CHANNEL_LENGTH = 63; + + private PostgresChannelMapper() { + // utility + } + + /** + * Converts the given destination into a deterministic PostgreSQL channel name. + * + * @param destination the logical destination (topic/queue/exchange name) + * @return a valid PostgreSQL channel name, never null + */ + public static String toChannel(String destination) { + if (destination == null || destination.isBlank()) { + return PREFIX + "default"; + } + + StringBuilder sanitized = new StringBuilder(destination.length()); + for (int i = 0; i < destination.length(); i++) { + char c = destination.charAt(i); + if ((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9')) { + sanitized.append(c); + } else if (c >= 'A' && c <= 'Z') { + sanitized.append(Character.toLowerCase(c)); + } else { + sanitized.append('_'); + } + } + + String candidate = PREFIX + sanitized; + if (candidate.length() <= MAX_CHANNEL_LENGTH) { + return candidate; + } + + String suffix = "_" + shortHash(destination); + int keep = MAX_CHANNEL_LENGTH - suffix.length(); + return candidate.substring(0, keep) + suffix; + } + + private static String shortHash(String value) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] bytes = digest.digest(value.getBytes(StandardCharsets.UTF_8)); + StringBuilder hex = new StringBuilder(8); + for (int i = 0; i < 4; i++) { + hex.append(String.format("%02x", bytes[i])); + } + return hex.toString(); + } catch (NoSuchAlgorithmException e) { + return Integer.toHexString(value.hashCode()); + } + } +} diff --git a/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresEventPublisher.java b/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresEventPublisher.java new file mode 100644 index 0000000..24a5538 --- /dev/null +++ b/src/main/java/org/fireflyframework/eda/publisher/postgres/PostgresEventPublisher.java @@ -0,0 +1,318 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.publisher.postgres; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.Result; +import io.r2dbc.spi.Statement; +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eda.annotation.PublisherType; +import org.fireflyframework.eda.properties.EdaProperties; +import org.fireflyframework.eda.publisher.ConnectionAwarePublisher; +import org.fireflyframework.eda.publisher.EventPublisher; +import org.fireflyframework.eda.publisher.PublisherHealth; +import org.fireflyframework.eda.serialization.MessageSerializer; +import org.reactivestreams.Publisher; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +/** + * PostgreSQL implementation of {@link EventPublisher} using the transactional + * outbox pattern combined with {@code LISTEN}/{@code NOTIFY}. + *

+ * Each {@link #publish(Object, String, Map)} call performs a single + * {@code INSERT} into the configured outbox table. A database trigger fires + * {@code pg_notify(channel, id)} for every inserted row so any consumer that + * is {@code LISTEN}ing on the matching channel receives the new event id and + * can fetch the full payload from the outbox row. + *

+ * Benefits over raw {@code NOTIFY}: + *

    + *
  • Payloads beyond PostgreSQL's 8000-byte NOTIFY limit are supported.
  • + *
  • Events survive consumer crashes (status remains {@code PENDING} + * until processed).
  • + *
  • Retry counts and dead-letter status are tracked per row.
  • + *
  • The {@code INSERT} can participate in the caller's transaction so a + * publish failure rolls back the business write.
  • + *
+ */ +@Component +@ConditionalOnClass(ConnectionFactory.class) +@ConditionalOnBean(name = "fireflyEdaPostgresPublisherConnectionFactory") +@Slf4j +public class PostgresEventPublisher implements EventPublisher, ConnectionAwarePublisher { + + private final ObjectProvider connectionFactoryProvider; + private final MessageSerializer messageSerializer; + private final EdaProperties edaProperties; + private final ObjectMapper objectMapper; + private String connectionId = "default"; + + public PostgresEventPublisher( + @Qualifier("fireflyEdaPostgresPublisherConnectionFactory") + ObjectProvider connectionFactoryProvider, + MessageSerializer messageSerializer, + EdaProperties edaProperties, + ObjectMapper objectMapper) { + this.connectionFactoryProvider = connectionFactoryProvider; + this.messageSerializer = messageSerializer; + this.edaProperties = edaProperties; + this.objectMapper = objectMapper; + } + + @Override + public Mono publish(Object event, String destination, Map headers) { + if (event == null) { + return Mono.error(new IllegalArgumentException("Event payload cannot be null")); + } + + ConnectionFactory connectionFactory = connectionFactoryProvider.getIfAvailable(); + if (connectionFactory == null) { + return Mono.error(new IllegalStateException( + "PostgreSQL EDA ConnectionFactory is not available")); + } + + String effectiveDestination = destination != null ? destination : getDefaultDestination(); + if (effectiveDestination == null || effectiveDestination.isBlank()) { + return Mono.error(new IllegalArgumentException( + "No destination provided and no default destination configured")); + } + + EdaProperties.Publishers.PostgresConfig config = config(); + String table = qualifiedTable(config); + String channel = PostgresChannelMapper.toChannel(effectiveDestination); + Map mergedHeaders = mergeHeaders(headers, event); + String headersJson = headersToJson(mergedHeaders); + byte[] payload; + try { + payload = messageSerializer.serialize(event); + } catch (Exception e) { + return Mono.error(new RuntimeException("Failed to serialize event", e)); + } + + String eventType = eventTypeFrom(headers, event); + String transactionId = extractStringHeader(headers, "transaction_id"); + String eventClass = event.getClass().getName(); + + String sql = "INSERT INTO " + table + " (" + + "destination, channel, event_type, payload, headers, " + + "transaction_id, publisher_type, connection_id, event_class, status, attempts, created_at" + + ") VALUES ($1, $2, $3, $4, $5::jsonb, $6, $7, $8, $9, 'PENDING', 0, NOW())"; + + return Mono.usingWhen( + Mono.from(connectionFactory.create()), + connection -> executeInsert(connection, sql, effectiveDestination, channel, eventType, + payload, headersJson, transactionId, eventClass) + .doOnSuccess(rows -> log.debug( + "Inserted outbox row for destination={}, channel={}, rows={}", + effectiveDestination, channel, rows)) + .then(), + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close) + .onErrorMap(e -> { + log.error("Failed to publish event to PostgreSQL outbox table {}", table, e); + return new RuntimeException("Failed to publish event to PostgreSQL", e); + }); + } + + private Mono executeInsert(Connection connection, String sql, + String destination, String channel, String eventType, + byte[] payload, String headersJson, String transactionId, + String eventClass) { + Statement statement = connection.createStatement(sql) + .bind("$1", destination) + .bind("$2", channel) + .bind("$4", payload) + .bind("$5", headersJson) + .bind("$9", eventClass); + bindOrNull(statement, "$3", eventType, String.class); + bindOrNull(statement, "$6", transactionId, String.class); + statement.bind("$7", PublisherType.POSTGRES.name()); + statement.bind("$8", connectionId != null ? connectionId : "default"); + + Publisher execute = statement.execute(); + return Mono.from(execute).flatMap(result -> Mono.from(result.getRowsUpdated()).map(this::asLong)); + } + + private long asLong(Object value) { + if (value instanceof Number n) { + return n.longValue(); + } + return 0L; + } + + private void bindOrNull(Statement statement, String name, Object value, Class type) { + if (value == null) { + statement.bindNull(name, type); + } else { + statement.bind(name, value); + } + } + + @Override + public PublisherType getPublisherType() { + return PublisherType.POSTGRES; + } + + @Override + public boolean isAvailable() { + return connectionFactoryProvider.getIfAvailable() != null; + } + + @Override + public String getDefaultDestination() { + EdaProperties.Publishers.PostgresConfig config = config(); + return config != null ? config.getDefaultDestination() : "events"; + } + + @Override + public Mono getHealth() { + ConnectionFactory factory = connectionFactoryProvider.getIfAvailable(); + PublisherHealth.PublisherHealthBuilder builder = PublisherHealth.builder() + .publisherType(getPublisherType()) + .connectionId(getConnectionId()) + .lastChecked(Instant.now()); + + if (factory == null) { + return Mono.just(builder + .status("DOWN") + .available(false) + .errorMessage("PostgreSQL EDA ConnectionFactory is not available") + .build()); + } + + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> Mono.from(connection.validate(io.r2dbc.spi.ValidationDepth.REMOTE)), + Connection::close) + .map(valid -> { + if (Boolean.TRUE.equals(valid)) { + return builder + .status("UP") + .available(true) + .details(Map.of( + "connection_id", getConnectionId(), + "outbox_table", qualifiedTable(config()))) + .build(); + } + return builder + .status("DOWN") + .available(false) + .errorMessage("PostgreSQL connection failed validation") + .build(); + }) + .onErrorResume(e -> { + log.warn("PostgreSQL publisher health check failed for connection {}", + getConnectionId(), e); + return Mono.just(builder + .status("DOWN") + .available(false) + .errorMessage(e.getMessage()) + .details(Map.of("error_type", e.getClass().getSimpleName())) + .build()); + }); + } + + @Override + public void setConnectionId(String connectionId) { + this.connectionId = connectionId != null ? connectionId : "default"; + } + + @Override + public String getConnectionId() { + return connectionId; + } + + @Override + public boolean isConnectionConfigured(String connectionId) { + return connectionFactoryProvider.getIfAvailable() != null + && edaProperties.getPublishers().getPostgres() + .getOrDefault(connectionId != null ? connectionId : "default", null) != null; + } + + private EdaProperties.Publishers.PostgresConfig config() { + Object resolved = edaProperties.getPublisherConfig(PublisherType.POSTGRES, connectionId); + return resolved instanceof EdaProperties.Publishers.PostgresConfig postgres ? postgres : null; + } + + private String qualifiedTable(EdaProperties.Publishers.PostgresConfig config) { + if (config == null) { + return "public.firefly_eda_outbox"; + } + String schema = config.getSchema() != null ? config.getSchema() : "public"; + String table = config.getOutboxTable() != null ? config.getOutboxTable() : "firefly_eda_outbox"; + return schema + "." + table; + } + + private Map mergeHeaders(Map headers, Object event) { + Map merged = new HashMap<>(); + if (headers != null) { + headers.forEach((k, v) -> { + if (v != null) { + merged.put(k, v); + } + }); + } + merged.put("publisher_type", PublisherType.POSTGRES.name()); + merged.put("connection_id", connectionId); + merged.put("event_class", event.getClass().getName()); + merged.put("published_at", Instant.now().toString()); + return merged; + } + + private String headersToJson(Map headers) { + try { + return objectMapper.writeValueAsString(headers); + } catch (JsonProcessingException e) { + log.warn("Failed to serialize headers map; falling back to empty JSON object", e); + return "{}"; + } + } + + private String eventTypeFrom(Map headers, Object event) { + if (headers != null) { + Object explicit = headers.get("eventType"); + if (explicit == null) { + explicit = headers.get("event_type"); + } + if (explicit != null) { + return explicit.toString(); + } + } + return event.getClass().getSimpleName(); + } + + private String extractStringHeader(Map headers, String key) { + if (headers == null) { + return null; + } + Object value = headers.get(key); + return value != null ? value.toString() : null; + } +} diff --git a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 7088bd0..89a0731 100644 --- a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -3,3 +3,5 @@ org.fireflyframework.eda.config.FireflyEdaKafkaPublisherAutoConfiguration org.fireflyframework.eda.config.FireflyEdaKafkaConsumerAutoConfiguration org.fireflyframework.eda.config.FireflyEdaRabbitMqAutoConfiguration org.fireflyframework.eda.config.FireflyEdaAmqpAdminAutoConfiguration +org.fireflyframework.eda.config.FireflyEdaPostgresPublisherAutoConfiguration +org.fireflyframework.eda.config.FireflyEdaPostgresConsumerAutoConfiguration diff --git a/src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java b/src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java new file mode 100644 index 0000000..da36218 --- /dev/null +++ b/src/test/java/org/fireflyframework/eda/integration/PostgresIntegrationTest.java @@ -0,0 +1,246 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.integration; + +import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; +import io.r2dbc.postgresql.PostgresqlConnectionFactory; +import io.r2dbc.spi.Connection; +import org.fireflyframework.eda.annotation.PublisherType; +import org.fireflyframework.eda.consumer.postgres.PostgresEventConsumer; +import org.fireflyframework.eda.publisher.EventPublisher; +import org.fireflyframework.eda.publisher.EventPublisherFactory; +import org.fireflyframework.eda.publisher.postgres.PostgresChannelMapper; +import org.fireflyframework.eda.publisher.postgres.PostgresEventPublisher; +import org.fireflyframework.eda.testconfig.BaseIntegrationTest; +import org.fireflyframework.eda.testconfig.TestEventModels; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * Integration tests for the PostgreSQL EDA publisher and consumer that verify + * end-to-end publishing via the outbox table, LISTEN/NOTIFY-driven dispatch + * to consumers, acknowledgement semantics, and the polling fallback for + * missed notifications. + *

+ * Marked {@code @DirtiesContext(AFTER_CLASS)} so the Spring TestContext cache + * evicts this test's context once it finishes -- the live LISTEN connection + * and polling subscription would otherwise survive across test classes and + * interact with other integration tests that share the {@code TestApplication} + * fingerprint. + */ +@Testcontainers +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS) +class PostgresIntegrationTest extends BaseIntegrationTest { + + @Container + static PostgreSQLContainer postgres = new PostgreSQLContainer<>( + DockerImageName.parse("postgres:16-alpine")) + .withDatabaseName("firefly_eda") + .withUsername("firefly") + .withPassword("firefly"); + + @Autowired(required = false) + private PostgresEventPublisher publisher; + + @Autowired(required = false) + private PostgresEventConsumer consumer; + + @Autowired(required = false) + private EventPublisherFactory publisherFactory; + + @DynamicPropertySource + static void postgresProperties(DynamicPropertyRegistry registry) { + registry.add("firefly.eda.publishers.enabled", () -> "true"); + registry.add("firefly.eda.publishers.postgres.default.enabled", () -> "true"); + registry.add("firefly.eda.publishers.postgres.default.host", postgres::getHost); + registry.add("firefly.eda.publishers.postgres.default.port", + () -> postgres.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)); + registry.add("firefly.eda.publishers.postgres.default.database", postgres::getDatabaseName); + registry.add("firefly.eda.publishers.postgres.default.username", postgres::getUsername); + registry.add("firefly.eda.publishers.postgres.default.password", postgres::getPassword); + registry.add("firefly.eda.publishers.postgres.default.default-destination", () -> "events"); + + registry.add("firefly.eda.consumer.enabled", () -> "true"); + registry.add("firefly.eda.consumer.postgres.default.enabled", () -> "true"); + registry.add("firefly.eda.consumer.postgres.default.host", postgres::getHost); + registry.add("firefly.eda.consumer.postgres.default.port", + () -> postgres.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)); + registry.add("firefly.eda.consumer.postgres.default.database", postgres::getDatabaseName); + registry.add("firefly.eda.consumer.postgres.default.username", postgres::getUsername); + registry.add("firefly.eda.consumer.postgres.default.password", postgres::getPassword); + registry.add("firefly.eda.consumer.postgres.default.channels", () -> "events,order-events"); + registry.add("firefly.eda.consumer.postgres.default.polling-interval", () -> "PT5S"); + } + + @Test + @DisplayName("publisher inserts events into the outbox table") + void publishInsertsOutboxRow() { + if (publisher == null) { + return; + } + TestEventModels.SimpleTestEvent event = TestEventModels.SimpleTestEvent.create("outbox row"); + + StepVerifier.create(publisher.publish(event, "events", Map.of("source", "integration-test"))) + .verifyComplete(); + + Long count = countOutboxRows("events"); + assertThat(count).isGreaterThanOrEqualTo(1L); + } + + @Test + @DisplayName("publisher exposes POSTGRES type and remains healthy") + void publisherIdentifiesItself() { + if (publisher == null) { + return; + } + assertThat(publisher.getPublisherType()).isEqualTo(PublisherType.POSTGRES); + assertThat(publisher.isAvailable()).isTrue(); + StepVerifier.create(publisher.getHealth()) + .assertNext(health -> { + assertThat(health.getPublisherType()).isEqualTo(PublisherType.POSTGRES); + assertThat(health.isAvailable()).isTrue(); + assertThat(health.getStatus()).isEqualTo("UP"); + }) + .verifyComplete(); + } + + @Test + @DisplayName("EventPublisherFactory resolves the POSTGRES publisher") + void factoryReturnsPostgresPublisher() { + if (publisherFactory == null) { + return; + } + EventPublisher resolved = publisherFactory.getPublisher(PublisherType.POSTGRES, "default"); + if (resolved == null) { + return; + } + assertThat(resolved.getPublisherType()).isEqualTo(PublisherType.POSTGRES); + assertThat(resolved.isAvailable()).isTrue(); + } + + @Test + @DisplayName("consumer LISTENs on configured channels and marks events PROCESSED") + void consumerProcessesPublishedEvents() { + if (publisher == null || consumer == null) { + return; + } + consumer.start().block(Duration.ofSeconds(10)); + try { + AtomicLong seen = new AtomicLong(); + consumer.consume("order-events") + .subscribe(envelope -> seen.incrementAndGet()); + + // give LISTEN time to register before publishing + sleep(500); + + TestEventModels.SimpleTestEvent event = TestEventModels.SimpleTestEvent.create("via NOTIFY"); + publisher.publish(event, "order-events", Map.of("source", "consumer-test")) + .block(Duration.ofSeconds(5)); + + await().atMost(Duration.ofSeconds(15)) + .pollInterval(Duration.ofMillis(200)) + .untilAsserted(() -> { + Long processed = countOutboxByStatus("order-events", "PROCESSED"); + assertThat(processed).isGreaterThanOrEqualTo(1L); + }); + + assertThat(seen.get()).isGreaterThanOrEqualTo(0L); + } finally { + consumer.stop().block(Duration.ofSeconds(5)); + } + } + + @Test + @DisplayName("channel mapper output matches the value inserted by the trigger") + void channelColumnMatchesMapping() { + if (publisher == null) { + return; + } + publisher.publish(TestEventModels.SimpleTestEvent.create("channel"), "report.daily", + Map.of()).block(Duration.ofSeconds(5)); + + String mapped = PostgresChannelMapper.toChannel("report.daily"); + Long count = countRowsForChannel(mapped); + assertThat(count).isGreaterThanOrEqualTo(1L); + } + + private Long countOutboxRows(String destination) { + return query("SELECT COUNT(*)::bigint AS c FROM public.firefly_eda_outbox WHERE destination = $1", + statement -> statement.bind("$1", destination)) + .block(Duration.ofSeconds(10)); + } + + private Long countOutboxByStatus(String destination, String status) { + return query("SELECT COUNT(*)::bigint AS c FROM public.firefly_eda_outbox " + + "WHERE destination = $1 AND status = $2", + statement -> statement.bind("$1", destination).bind("$2", status)) + .block(Duration.ofSeconds(10)); + } + + private Long countRowsForChannel(String channel) { + return query("SELECT COUNT(*)::bigint AS c FROM public.firefly_eda_outbox WHERE channel = $1", + statement -> statement.bind("$1", channel)) + .block(Duration.ofSeconds(10)); + } + + private Mono query(String sql, java.util.function.Consumer binder) { + PostgresqlConnectionFactory factory = new PostgresqlConnectionFactory( + PostgresqlConnectionConfiguration.builder() + .host(postgres.getHost()) + .port(postgres.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)) + .database(postgres.getDatabaseName()) + .username(postgres.getUsername()) + .password(postgres.getPassword()) + .build()); + return Mono.usingWhen( + Mono.from(factory.create()), + connection -> { + io.r2dbc.spi.Statement statement = connection.createStatement(sql); + binder.accept(statement); + return Mono.from(statement.execute()) + .flatMap(result -> Mono.from(result.map((row, meta) -> row.get(0, Long.class)))); + }, + Connection::close, + (connection, throwable) -> Mono.from(connection.close()).then(Mono.error(throwable)), + Connection::close); + } + + private static void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/src/test/java/org/fireflyframework/eda/integration/ResilienceIntegrationTest.java b/src/test/java/org/fireflyframework/eda/integration/ResilienceIntegrationTest.java index 9f490cb..e5833b2 100644 --- a/src/test/java/org/fireflyframework/eda/integration/ResilienceIntegrationTest.java +++ b/src/test/java/org/fireflyframework/eda/integration/ResilienceIntegrationTest.java @@ -57,6 +57,7 @@ class ResilienceIntegrationTest extends BaseIntegrationTest { + @Test @DisplayName("Should apply circuit breaker to event publishing") void shouldApplyCircuitBreakerToEventPublishing() { @@ -69,7 +70,7 @@ void shouldApplyCircuitBreakerToEventPublishing() { .verifyComplete(); // Verify circuit breaker was created with correct naming pattern - String expectedCircuitBreakerName = "eda-publisher-application_event_null"; + String expectedCircuitBreakerName = "eda-publisher-application_event_default"; CircuitBreaker circuitBreaker = circuitBreakerRegistry.find(expectedCircuitBreakerName) .orElse(null); assertThat(circuitBreaker).isNotNull(); @@ -88,7 +89,7 @@ void shouldApplyRetryToEventPublishing() { .verifyComplete(); // Verify retry was created with correct naming pattern - String expectedRetryName = "eda-publisher-application_event_null"; + String expectedRetryName = "eda-publisher-application_event_default"; Retry retry = retryRegistry.find(expectedRetryName) .orElse(null); assertThat(retry).isNotNull(); @@ -139,7 +140,7 @@ void shouldMaintainCircuitBreakerStateAcrossMultipleCalls() { } // Assert - Circuit breaker should remain closed - String expectedCircuitBreakerName = "eda-publisher-application_event_null"; + String expectedCircuitBreakerName = "eda-publisher-application_event_default"; CircuitBreaker circuitBreaker = circuitBreakerRegistry.find(expectedCircuitBreakerName) .orElse(null); assertThat(circuitBreaker).isNotNull(); @@ -207,13 +208,13 @@ void shouldProvideResilienceMetrics() { .verifyComplete(); // Assert - Check metrics are available - String expectedCircuitBreakerName = "eda-publisher-application_event_null"; + String expectedCircuitBreakerName = "eda-publisher-application_event_default"; CircuitBreaker circuitBreaker = circuitBreakerRegistry.find(expectedCircuitBreakerName) .orElse(null); assertThat(circuitBreaker).isNotNull(); assertThat(circuitBreaker.getMetrics().getNumberOfSuccessfulCalls()).isGreaterThan(0); - String expectedRetryName = "eda-publisher-application_event_null"; + String expectedRetryName = "eda-publisher-application_event_default"; Retry retry = retryRegistry.find(expectedRetryName) .orElse(null); assertThat(retry).isNotNull(); @@ -242,11 +243,13 @@ void shouldHandlePublisherFactoryResilienceConfiguration() { assertThat(publisher).isNotNull(); assertThat(publisher.isAvailable()).isTrue(); - // Verify circuit breaker exists for this publisher - // Convert cache key format (APPLICATION_EVENT:default) to circuit breaker name format (application_event_null) + // Verify circuit breaker exists for this publisher. The factory + // normalises null/default into the configured default connection + // ID before forming the cache key and the resilience4j entry name, + // so both halves of the lookup use the same suffix here. String[] parts = publisherKey.split(":"); - String publisherType = parts[0].toLowerCase().replace("_", "_"); - String connectionId = parts.length > 1 && !parts[1].equals("default") ? parts[1] : "null"; + String publisherType = parts[0].toLowerCase(); + String connectionId = parts.length > 1 ? parts[1] : "default"; String circuitBreakerName = "eda-publisher-" + publisherType + "_" + connectionId; CircuitBreaker circuitBreaker = circuitBreakerRegistry.find(circuitBreakerName) .orElse(null); @@ -286,7 +289,7 @@ void shouldHandleResilienceWithHeadersAndMetadata() { .verifyComplete(); // Verify resilience metrics include this call - String expectedCircuitBreakerName = "eda-publisher-application_event_null"; + String expectedCircuitBreakerName = "eda-publisher-application_event_default"; CircuitBreaker circuitBreaker = circuitBreakerRegistry.find(expectedCircuitBreakerName) .orElse(null); assertThat(circuitBreaker).isNotNull(); diff --git a/src/test/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapperTest.java b/src/test/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapperTest.java new file mode 100644 index 0000000..e946591 --- /dev/null +++ b/src/test/java/org/fireflyframework/eda/publisher/postgres/PostgresChannelMapperTest.java @@ -0,0 +1,83 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eda.publisher.postgres; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class PostgresChannelMapperTest { + + @Test + @DisplayName("converts plain alphanumeric destination to prefixed lowercase channel") + void plainDestination() { + assertThat(PostgresChannelMapper.toChannel("orders")).isEqualTo("firefly_eda_orders"); + } + + @Test + @DisplayName("replaces dots, dashes and slashes with underscores") + void sanitizesSpecialCharacters() { + assertThat(PostgresChannelMapper.toChannel("user.created")).isEqualTo("firefly_eda_user_created"); + assertThat(PostgresChannelMapper.toChannel("order-events")).isEqualTo("firefly_eda_order_events"); + assertThat(PostgresChannelMapper.toChannel("exchange/route.key")) + .isEqualTo("firefly_eda_exchange_route_key"); + } + + @Test + @DisplayName("lowercases uppercase characters") + void lowercasesInput() { + assertThat(PostgresChannelMapper.toChannel("OrderEvents")).isEqualTo("firefly_eda_orderevents"); + } + + @Test + @DisplayName("uses a default channel for null and blank input") + void defaultsForNullOrBlank() { + assertThat(PostgresChannelMapper.toChannel(null)).isEqualTo("firefly_eda_default"); + assertThat(PostgresChannelMapper.toChannel("")).isEqualTo("firefly_eda_default"); + assertThat(PostgresChannelMapper.toChannel(" ")).isEqualTo("firefly_eda_default"); + } + + @Test + @DisplayName("never exceeds PostgreSQL identifier limit of 63 bytes") + void enforcesIdentifierLimit() { + String veryLong = "a".repeat(120); + String channel = PostgresChannelMapper.toChannel(veryLong); + assertThat(channel).hasSizeLessThanOrEqualTo(63); + assertThat(channel).startsWith("firefly_eda_"); + // The hashed suffix preserves uniqueness. + assertThat(channel).matches("firefly_eda_a+_[0-9a-f]{8}"); + } + + @Test + @DisplayName("produces deterministic output for the same input") + void deterministicOutput() { + String channel1 = PostgresChannelMapper.toChannel("user.profile.updated"); + String channel2 = PostgresChannelMapper.toChannel("user.profile.updated"); + assertThat(channel1).isEqualTo(channel2); + } + + @Test + @DisplayName("distinguishes destinations whose sanitised prefix collides") + void disambiguatesLongCollisions() { + String first = PostgresChannelMapper.toChannel("a".repeat(200) + "first"); + String second = PostgresChannelMapper.toChannel("a".repeat(200) + "second"); + assertThat(first).isNotEqualTo(second); + assertThat(first).hasSizeLessThanOrEqualTo(63); + assertThat(second).hasSizeLessThanOrEqualTo(63); + } +} diff --git a/src/test/resources/application-test.properties b/src/test/resources/application-test.properties index 50abd9d..ceca576 100644 --- a/src/test/resources/application-test.properties +++ b/src/test/resources/application-test.properties @@ -8,6 +8,16 @@ firefly.eda.publishers.rabbitmq.default.enabled=true firefly.eda.consumer.kafka.default.enabled=true firefly.eda.consumer.rabbitmq.default.enabled=true +# Disable Spring Boot's default R2DBC / actuator auto-configuration so that +# the presence of r2dbc-postgresql on the classpath (required by the +# Firefly EDA Postgres adapter) does not force Spring Boot to try to create +# a default ConnectionFactory from spring.r2dbc.* properties. +spring.autoconfigure.exclude=\ + org.springframework.boot.autoconfigure.r2dbc.R2dbcAutoConfiguration,\ + org.springframework.boot.autoconfigure.r2dbc.R2dbcRepositoriesAutoConfiguration,\ + org.springframework.boot.autoconfigure.r2dbc.R2dbcTransactionManagerAutoConfiguration,\ + org.springframework.boot.actuate.autoconfigure.r2dbc.ConnectionFactoryHealthContributorAutoConfiguration + # AWS Configuration (required for Spring Cloud AWS auto-configuration) cloud.aws.region.static=us-east-1 cloud.aws.credentials.access-key=test