Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 74 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![Java](https://img.shields.io/badge/Java-21%2B-orange.svg)](https://openjdk.org)
[![Spring Boot](https://img.shields.io/badge/Spring%20Boot-3.x-green.svg)](https://spring.io/projects/spring-boot)

> Unified event-driven architecture library with Kafka, RabbitMQ, and Spring Application Events support.
> Unified event-driven architecture library with Kafka, RabbitMQ, PostgreSQL (LISTEN/NOTIFY + outbox), and Spring Application Events support.

---

Expand All @@ -23,15 +23,15 @@

## Overview

Firefly Framework EDA provides a standardized messaging abstraction for event-driven architectures, supporting multiple broker implementations through a unified publisher/consumer API. It enables reactive event publishing and consumption with built-in support for Apache Kafka, RabbitMQ, and Spring Application Events as transport mechanisms.
Firefly Framework EDA provides a standardized messaging abstraction for event-driven architectures, supporting multiple broker implementations through a unified publisher/consumer API. It enables reactive event publishing and consumption with built-in support for Apache Kafka, RabbitMQ, PostgreSQL (via `LISTEN`/`NOTIFY` backed by a transactional outbox table), and Spring Application Events as transport mechanisms.

The library features annotation-driven event publishing (`@EventPublisher`, `@PublishResult`), declarative event listeners (`@EventListener`), and a comprehensive set of event filtering, serialization, and error handling capabilities. It includes support for JSON, Avro, and Protobuf message serialization formats.

The resilient publisher wrapper provides circuit breaker integration, while the dead letter queue handler ensures no events are lost during processing failures. Metrics collection and health indicators provide full observability into the messaging infrastructure.

## Features

- Multi-broker support: Apache Kafka, RabbitMQ, Spring Application Events
- Multi-broker support: Apache Kafka, RabbitMQ, PostgreSQL (`LISTEN`/`NOTIFY` + outbox), Spring Application Events
- Annotation-driven publishing: `@EventPublisher`, `@PublishResult`
- Declarative event listening: `@EventListener` with SpEL-based filtering
- Event envelope pattern with metadata propagation
Expand All @@ -41,16 +41,17 @@ The resilient publisher wrapper provides circuit breaker integration, while the
- Resilient publisher with circuit breaker support
- Dynamic event listener registration at runtime
- AMQP admin auto-configuration for RabbitMQ exchanges and queues
- Transactional outbox table for PostgreSQL with auto-created schema and trigger
- Custom error handling strategies with metrics and notification handlers
- Health indicators and metrics for Actuator integration
- Spring Boot auto-configuration for Kafka and RabbitMQ
- Spring Boot auto-configuration for Kafka, RabbitMQ, and PostgreSQL

## Requirements

- Java 21+
- Spring Boot 3.x
- Maven 3.9+
- Apache Kafka or RabbitMQ (depending on chosen broker)
- Apache Kafka, RabbitMQ, or PostgreSQL 11+ (depending on chosen transport)

## Installation

Expand Down Expand Up @@ -95,16 +96,76 @@ public class OrderEventHandler {
```yaml
firefly:
eda:
broker: kafka # kafka, rabbitmq, spring
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-service
rabbitmq:
host: localhost
port: 5672
enabled: true
default-publisher-type: AUTO # AUTO chooses KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT
publishers:
enabled: true
kafka:
default:
enabled: true
bootstrap-servers: localhost:9092
rabbitmq:
default:
enabled: true
host: localhost
port: 5672
postgres:
default:
enabled: true
host: localhost
port: 5432
database: app
username: app
password: secret
schema: public
outbox-table: firefly_eda_outbox
default-destination: events
auto-create-schema: true # provision outbox table + NOTIFY trigger at startup
consumer:
enabled: true
group-id: my-service
kafka:
default:
enabled: true
bootstrap-servers: localhost:9092
rabbitmq:
default:
enabled: true
host: localhost
port: 5672
queues: events-queue
postgres:
default:
enabled: true
host: localhost
port: 5432
database: app
username: app
password: secret
channels: events,order-events # destinations to LISTEN on
polling-interval: 30s # NOTIFY-loss fallback poll cadence
max-attempts: 3 # outbox row moves to DEAD_LETTER after N failures
```

### PostgreSQL transport at a glance

- Each `publish()` performs a single `INSERT` into `firefly_eda_outbox`. A
database trigger fires `pg_notify(channel, id)` for every inserted row.
- The consumer holds a dedicated R2DBC connection that runs `LISTEN <channel>`
for every subscribed destination. Notifications carry only the outbox row
id so payloads can be arbitrarily large.
- On dispatch, the listener pipeline either marks the row `PROCESSED` or
increments `attempts`; once `attempts` reaches `max-attempts`, the row
moves to `DEAD_LETTER` status.
- A periodic poll (`polling-interval`) catches rows that slipped past the
live channel (e.g., consumer offline at insert time, payload too large,
connection reset). Set it to `0s` to disable polling.
- Channel names are derived deterministically from destinations via the
built-in mapper: non-alphanumeric characters become `_`, the result is
lower-cased, prefixed with `firefly_eda_`, and truncated to fit
PostgreSQL's 63-byte identifier limit with a stable hash suffix when
needed.

## Documentation

Additional documentation is available in the [docs/](docs/) directory:
Expand Down
93 changes: 93 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,54 @@ firefly:
| `default-routing-key` | string | `"event"` | Default routing key |
| `properties` | map | `{}` | Additional RabbitMQ connection properties |

### PostgreSQL Publisher

The PostgreSQL publisher uses an outbox table together with `pg_notify` to deliver events. Configuration lives under `firefly.eda.publishers.postgres.<connection-id>` -- never under `spring.r2dbc.*`.

```yaml
firefly:
eda:
publishers:
postgres:
default: # Connection ID
enabled: true
host: "localhost"
port: 5432
database: "app"
username: "app"
password: "secret"
schema: "public"
outbox-table: "firefly_eda_outbox"
default-destination: "events"
auto-create-schema: true
max-pool-size: 10
properties:
statement_timeout: "30000"
```

| Property | Type | Default | Description |
|----------|------|---------|-------------|
| `enabled` | boolean | `false` | Whether this PostgreSQL publisher connection is enabled |
| `host` | string | `"localhost"` | PostgreSQL host |
| `port` | int | `5432` | PostgreSQL port |
| `database` | string | `null` | Database name |
| `username` | string | `null` | Database username |
| `password` | string | `null` | Database password |
| `schema` | string | `"public"` | Schema containing the outbox table |
| `outbox-table` | string | `"firefly_eda_outbox"` | Name of the outbox table |
| `default-destination` | string | `"events"` | Default destination when none is provided to `publish()` |
| `auto-create-schema` | boolean | `true` | Provision the outbox table, index, NOTIFY function and trigger at startup |
| `max-pool-size` | int | `10` | Maximum R2DBC connection pool size used for outbox inserts |
| `properties` | map | `{}` | Additional R2DBC PostgreSQL startup options (e.g., statement timeout) |

When `auto-create-schema: true`, the publisher provisions:

- `firefly_eda_outbox` table with `id`, `destination`, `channel`, `payload (BYTEA)`, `headers (JSONB)`, `status`, `attempts`, `error_message`, and timestamp columns
- An index on `(status, created_at)` filtered to `status = 'PENDING'`
- An index on `(channel, status)`
- A `firefly_eda_notify_event()` trigger function that calls `pg_notify(NEW.channel, NEW.id::text)`
- An `AFTER INSERT` trigger on the outbox table that runs the function

## Consumer Configuration

Consumers are configured under `firefly.eda.consumer`.
Expand Down Expand Up @@ -289,6 +337,51 @@ firefly:

**Important**: RabbitMQ queues must be pre-declared and configured here. The `@EventListener` destinations are used for filtering messages after they are consumed from these queues, not for determining which queues to subscribe to.

### PostgreSQL Consumer

The PostgreSQL consumer holds one long-lived R2DBC connection to receive `NOTIFY` messages and creates short-lived connections to drain the outbox table. Configuration lives under `firefly.eda.consumer.postgres.<connection-id>`. Channels are derived from `@EventListener` annotations with `consumerType=POSTGRES` or `consumerType=AUTO`, and additionally from the `channels` property.

```yaml
firefly:
eda:
consumer:
postgres:
default:
enabled: true
host: "localhost"
port: 5432
database: "app"
username: "app"
password: "secret"
schema: "public"
outbox-table: "firefly_eda_outbox"
channels: "events,order-events" # comma-separated destinations to LISTEN on
polling-interval: 30s # NOTIFY-loss fallback poll cadence; set to 0s to disable
max-attempts: 3 # rows beyond this attempt count are marked DEAD_LETTER
batch-size: 50 # max rows fetched per poll cycle
max-pool-size: 5
properties: {}
```

| Property | Type | Default | Description |
|----------|------|---------|-------------|
| `enabled` | boolean | `false` | Whether this PostgreSQL consumer connection is enabled |
| `host` | string | `"localhost"` | PostgreSQL host |
| `port` | int | `5432` | PostgreSQL port |
| `database` | string | `null` | Database name |
| `username` | string | `null` | Database username |
| `password` | string | `null` | Database password |
| `schema` | string | `"public"` | Schema containing the outbox table |
| `outbox-table` | string | `"firefly_eda_outbox"` | Outbox table the consumer reads from |
| `channels` | string | `"events"` | Comma-separated destinations to LISTEN on (combined with any from `@EventListener`) |
| `polling-interval` | duration | `30s` | Fallback poll cadence for rows missed by NOTIFY; `0s` disables polling |
| `max-attempts` | int | `3` | Failure attempts before a row transitions to `DEAD_LETTER` |
| `batch-size` | int | `50` | Maximum rows fetched per poll cycle |
| `max-pool-size` | int | `5` | Connection pool size for outbox queries; LISTEN uses one connection on top of this budget |
| `properties` | map | `{}` | Additional R2DBC PostgreSQL startup options |

**How acknowledgement works**: On successful dispatch, the row is updated to `status='PROCESSED', processed_at=NOW()`. On failure, `attempts` is incremented and `error_message` is recorded; once `attempts >= max-attempts`, the row moves to `status='DEAD_LETTER'`. Use the outbox table directly to inspect, reset, or requeue events.

### Application Event Consumer

```yaml
Expand Down
86 changes: 71 additions & 15 deletions docs/PUBLISHER_TYPES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The `PublisherType` enum defines the following supported messaging platforms:

### AUTO
**Description**: Automatically select the best available publisher
**Selection Priority**: KAFKA → RABBITMQ → APPLICATION_EVENT → NOOP
**Selection Priority**: KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT → NOOP
**Use Case**: Let the system choose the optimal publisher based on availability and configuration

```java
Expand Down Expand Up @@ -60,6 +60,53 @@ firefly:
default-exchange: events
```

### POSTGRES
**Description**: PostgreSQL `LISTEN`/`NOTIFY` with a transactional outbox table
**Features**: Persistent storage, payload sizes beyond `NOTIFY`'s 8 kB limit, retry counts, dead-letter status, polling fallback for missed notifications
**Best For**: Services that already use PostgreSQL and want reliable event publishing without an additional broker; outbox-style transactional event publishing
**Persistence**: ✅ Yes (outbox table rows)
**Ordering**: ✅ Yes (per destination -- rows are ordered by `created_at` / `id`)
**Cloud Service**: ❌ No (self-hosted; works with managed PostgreSQL services)

```yaml
firefly:
eda:
publishers:
postgres:
default:
enabled: true
host: localhost
port: 5432
database: app
username: app
password: secret
schema: public
outbox-table: firefly_eda_outbox
default-destination: events
auto-create-schema: true # provision outbox table + NOTIFY trigger at startup
consumer:
postgres:
default:
enabled: true
host: localhost
port: 5432
database: app
username: app
password: secret
channels: events,order-events # destinations to LISTEN on
polling-interval: 30s
max-attempts: 3
```

**How it works**: Each `publish()` performs a single `INSERT` into the
configured outbox table. A trigger fires `pg_notify(channel, id)` for the
new row, so any consumer that has issued `LISTEN` on the matching channel
receives the row id and fetches the full payload from the table. The
consumer marks rows as `PROCESSED` after successful dispatch, increments
`attempts` on failure, and transitions to `DEAD_LETTER` once `max-attempts`
is reached. A periodic poll catches rows that slipped past the live
channel (consumer offline, payload too large, etc.).

### APPLICATION_EVENT
**Description**: Spring Application Event Bus (in-memory)
**Features**: Synchronous processing, JVM-local, simple integration
Expand Down Expand Up @@ -99,23 +146,25 @@ class ServiceTest {
When using `PublisherType.AUTO`, the system selects publishers in this priority order:

1. **KAFKA** - If Kafka is configured and available
2. **RABBITMQ** - If RabbitMQ is configured and available
3. **APPLICATION_EVENT** - If Spring context is available (always true)
4. **NOOP** - If explicitly enabled for testing
2. **RABBITMQ** - If RabbitMQ is configured and available
3. **POSTGRES** - If a PostgreSQL EDA connection is configured and available
4. **APPLICATION_EVENT** - If Spring context is available (always true)
5. **NOOP** - If explicitly enabled for testing

## Feature Comparison Matrix

| Feature | KAFKA | RABBITMQ | APPLICATION_EVENT | NOOP |
|---------|-------|----------|------------------|------|
| **Throughput** | Very High | High | High | N/A |
| **Persistence** | ✅ | ✅ | ❌ | ❌ |
| **Ordering** | ✅ | ❌ | ❌ | ❌ |
| **Partitioning** | ✅ | ❌ | ❌ | ❌ |
| **Complex Routing** | ❌ | ✅ | ❌ | ❌ |
| **Guaranteed Delivery** | ✅ | ✅ | ❌ | ❌ |
| **Multi-Instance** | ✅ | ✅ | ❌ | ❌ |
| **Cloud Native** | ✅ | ✅ | ❌ | ❌ |
| **Setup Complexity** | Medium | Medium | Low | None |
| Feature | KAFKA | RABBITMQ | POSTGRES | APPLICATION_EVENT | NOOP |
|---------|-------|----------|----------|------------------|------|
| **Throughput** | Very High | High | Medium | High | N/A |
| **Persistence** | ✅ | ✅ | ✅ | ❌ | ❌ |
| **Ordering** | ✅ | ❌ | ✅ | ❌ | ❌ |
| **Partitioning** | ✅ | ❌ | ❌ | ❌ | ❌ |
| **Complex Routing** | ❌ | ✅ | ❌ | ❌ | ❌ |
| **Guaranteed Delivery** | ✅ | ✅ | ✅ | ❌ | ❌ |
| **Multi-Instance** | ✅ | ✅ | ✅ | ❌ | ❌ |
| **Cloud Native** | ✅ | ✅ | ✅ | ❌ | ❌ |
| **Setup Complexity** | Medium | Medium | Low | Low | None |
| **Requires extra broker** | Yes | Yes | No (reuses DB) | No | No |

## Configuration Examples

Expand Down Expand Up @@ -182,6 +231,13 @@ public class EventService {
- Fan-out patterns are common
- Priority queues are required

**Use POSTGRES when:**
- The service already uses PostgreSQL and you want event publishing without operating a separate broker
- You need transactional outbox semantics (event publish ties to a database transaction)
- Moderate throughput is sufficient (the table acts as the queue)
- You want persistent, auditable events with built-in retry / dead-letter status
- You can tolerate a small NOTIFY-delivery delay covered by the polling fallback

**Use APPLICATION_EVENT when:**
- Simple internal communication is needed
- Single-instance deployment
Expand Down
21 changes: 20 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>org.fireflyframework</groupId>
<artifactId>fireflyframework-parent</artifactId>
<version>26.04.01</version>
<version>26.05.05</version>
</parent>

<artifactId>fireflyframework-eda</artifactId>
Expand Down Expand Up @@ -103,6 +103,20 @@
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- PostgreSQL via R2DBC (LISTEN/NOTIFY + outbox transport) -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
</dependency>


<!-- Circuit Breaker for Resilience -->
<dependency>
Expand Down Expand Up @@ -175,6 +189,11 @@
<artifactId>rabbitmq</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>

<!-- Spring Boot Test Auto-configuration -->
<dependency>
Expand Down
Loading
Loading