Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ dev = [
web = "pyfly.web.auto_configuration:WebAutoConfiguration"
cache = "pyfly.cache.auto_configuration:CacheAutoConfiguration"
messaging = "pyfly.messaging.auto_configuration:MessagingAutoConfiguration"
eda = "pyfly.eda.auto_configuration:EdaAutoConfiguration"
eda-health = "pyfly.eda.auto_configuration:EdaHealthAutoConfiguration"
client = "pyfly.client.auto_configuration:ClientAutoConfiguration"
document = "pyfly.data.document.auto_configuration:DocumentAutoConfiguration"
relational = "pyfly.data.relational.auto_configuration:RelationalAutoConfiguration"
Expand Down
46 changes: 46 additions & 0 deletions src/pyfly/data/relational/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2026 Firefly Software Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""``HealthIndicator`` for SQLAlchemy async engines.

Pings the database with ``SELECT 1`` and reports the dialect on the
``details`` payload so the actuator response makes it obvious what is
being checked.
"""

from __future__ import annotations

from typing import Any

from pyfly.actuator.health import HealthStatus


class SqlAlchemyHealthIndicator:
"""Database health probe — ``UP`` iff ``SELECT 1`` succeeds."""

def __init__(self, engine: Any) -> None:
self._engine = engine

async def health(self) -> HealthStatus:
from sqlalchemy import text

try:
async with self._engine.connect() as conn:
await conn.execute(text("SELECT 1"))
except Exception as exc:
return HealthStatus(
status="DOWN",
details={"error": type(exc).__name__, "message": str(exc)[:200]},
)
dialect = getattr(getattr(self._engine, "dialect", None), "name", "unknown")
return HealthStatus(status="UP", details={"database": dialect})
12 changes: 11 additions & 1 deletion src/pyfly/eda/adapters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""EDA adapters — concrete implementations of event-driven ports."""
"""EDA adapters — concrete implementations of event-driven ports.

Concrete brokers are imported lazily so the adapter package stays
importable even when the optional broker libraries (aiokafka, redis,
asyncpg) are not installed. Reach the implementations through their
fully-qualified module path::

from pyfly.eda.adapters.kafka import KafkaEventBus
from pyfly.eda.adapters.postgres import PostgresEventBus
from pyfly.eda.adapters.redis import RedisStreamsEventBus
"""

from pyfly.eda.adapters.memory import InMemoryEventBus

Expand Down
168 changes: 168 additions & 0 deletions src/pyfly/eda/adapters/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Copyright 2026 Firefly Software Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Kafka-backed ``EventPublisher`` — wraps aiokafka.

``destination`` maps to a Kafka topic. Each subscriber registers an
``event_type`` pattern (``fnmatch`` style) against a fixed list of topics
the bus is configured to consume from; on every record the bus
deserialises the envelope and dispatches to every handler whose
pattern matches ``envelope.event_type``.

The adapter requires aiokafka to be installed (``pip install pyfly[kafka]``
or ``pip install pyfly[eda]``).
"""

from __future__ import annotations

import asyncio
import fnmatch
import logging
from typing import Any

from pyfly.eda.ports.outbound import EventHandler
from pyfly.eda.serializers import EventSerializer, JsonEventSerializer

logger = logging.getLogger(__name__)


class KafkaEventBus:
"""``EventPublisher`` backed by Apache Kafka.

Parameters
----------
bootstrap_servers:
Comma-separated ``host:port`` list for the producer and consumer.
topics:
Topics the consumer subscribes to. Subscribers register
``event_type`` patterns; the bus deserialises the envelope and
dispatches to any matching handler. Defaults to ``["pyfly.events"]``.
group:
Kafka consumer group. ``None`` means an isolated consumer (each
bus instance reads every record). Set to a stable string when
you want at-most-once delivery across replicas.
serializer:
``EventSerializer`` used to encode and decode envelopes.
Defaults to ``JsonEventSerializer``.
"""

def __init__(
self,
*,
bootstrap_servers: str = "localhost:9092",
topics: list[str] | None = None,
group: str | None = None,
serializer: EventSerializer | None = None,
) -> None:
self._bootstrap_servers = bootstrap_servers
self._topics = list(topics) if topics else ["pyfly.events"]
self._group = group
self._serializer: EventSerializer = serializer or JsonEventSerializer()
self._handlers: list[tuple[str, EventHandler]] = []
self._producer: Any = None
self._consumer: Any = None
self._consume_task: asyncio.Task[None] | None = None
self._started = False

def subscribe(self, event_type_pattern: str, handler: EventHandler) -> None:
self._handlers.append((event_type_pattern, handler))

async def publish(
self,
destination: str,
event_type: str,
payload: dict[str, Any],
headers: dict[str, str] | None = None,
) -> None:
if not self._started:
await self.start()
from pyfly.eda.types import EventEnvelope

envelope = EventEnvelope(
event_type=event_type,
payload=payload,
destination=destination,
headers=headers or {},
)
record_headers = [(k, v.encode()) for k, v in envelope.headers.items()]
await self._producer.send_and_wait(
destination,
value=self._serializer.serialize(envelope),
headers=record_headers or None,
)

async def start(self) -> None:
if self._started:
return
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer # type: ignore[import-untyped]

self._producer = AIOKafkaProducer(bootstrap_servers=self._bootstrap_servers)
await self._producer.start()

# Always attach the consumer — pyfly's ApplicationContext auto-
# starts adapter beans before application code calls subscribe(),
# so we cannot gate the consumer on handlers being present yet.
# _consume_loop iterates _handlers per-message; an empty list
# means messages are received-and-dropped (with auto-commit) but
# that's expected behaviour when no subscribers exist.
if self._topics:
self._consumer = AIOKafkaConsumer(
*self._topics,
bootstrap_servers=self._bootstrap_servers,
group_id=self._group,
enable_auto_commit=True,
auto_offset_reset="earliest",
)
await self._consumer.start()
self._consume_task = asyncio.create_task(self._consume_loop())

self._started = True

async def stop(self) -> None:
self._started = False
if self._consume_task is not None:
self._consume_task.cancel()
try:
await self._consume_task
except asyncio.CancelledError:
pass
self._consume_task = None
if self._consumer is not None:
await self._consumer.stop()
self._consumer = None
if self._producer is not None:
await self._producer.stop()
self._producer = None

async def _consume_loop(self) -> None:
try:
async for record in self._consumer:
try:
envelope = self._serializer.deserialize(record.value)
except Exception:
logger.exception(
"Failed to deserialize record from topic=%s offset=%s",
record.topic, record.offset,
)
continue
for pattern, handler in self._handlers:
if fnmatch.fnmatch(envelope.event_type, pattern):
try:
await handler(envelope)
except Exception:
logger.exception(
"Handler for pattern=%s raised on event_type=%s",
pattern, envelope.event_type,
)
except asyncio.CancelledError:
pass
Loading
Loading