Skip to content

Latest commit

 

History

History
1110 lines (799 loc) · 33.5 KB

File metadata and controls

1110 lines (799 loc) · 33.5 KB

Testing Guide

This guide covers the testing utilities that PyFly provides out of the box: a base test case class, a test container factory, and event assertion helpers. Together, these tools make it straightforward to write unit tests, integration tests, and event-driven tests for PyFly applications.


Table of Contents

  1. Introduction
  2. PyFlyTestCase
  3. create_test_container()
  4. Event Assertions
  5. mock_bean()
  6. Test Slices
  7. PyFlyTestClient
  8. Testing Patterns
  9. Complete Example

Introduction

PyFly adopts a "testing is a first-class citizen" philosophy. The framework provides dedicated testing modules so you never have to build boilerplate setup from scratch.

from pyfly.testing import (
    PyFlyTestCase,
    create_test_container,
    assert_event_published,
    assert_no_events_published,
    mock_bean,
    WebTest, DataTest, ServiceTest,
    get_test_slice,
    PyFlyTestClient,
    TestResponse,
)

Source: src/pyfly/testing/__init__.py

Testing Pyramid

PyFly encourages the standard testing pyramid, where the number of tests decreases as the scope and cost of each test increases:

          /\
         /  \       End-to-end tests (few)
        /    \      - Full application stack with HTTP client
       /------\
      /        \    Integration tests (some)
     /          \   - Real components, in-memory adapters
    /------------\
   /              \ Unit tests (many)
  /                \ - Mocked dependencies, fast execution
 /------------------\
Level Dependencies Speed PyFly Tools
Unit Mocked Fast create_test_container(), unittest.mock
Integration In-memory adapters Medium PyFlyTestCase, InMemoryEventBus
E2E Full stack Slow create_app() + httpx.AsyncClient

PyFlyTestCase

PyFlyTestCase is a base class for integration tests that need PyFly framework infrastructure. It pre-configures an ApplicationContext and an InMemoryEventBus so your tests can focus on behavior rather than setup.

from pyfly.testing import PyFlyTestCase


class TestOrderWorkflow(PyFlyTestCase):

    async def test_full_order_lifecycle(self):
        await self.setup()

        # self.context is a fully initialized ApplicationContext
        # self.event_bus is an InMemoryEventBus ready for subscriptions

        # ... test logic ...

        await self.teardown()

Setup and Teardown

Both methods are async and must be awaited. Call setup() at the beginning of each test and teardown() at the end to ensure clean state between tests.

Method What It Does
setup() 1. Creates an ApplicationContext with an empty Config({}).
2. Creates a fresh InMemoryEventBus instance.
3. Calls await context.start() to initialize the context.
teardown() 1. Calls await context.stop() to clean up resources.

The internal implementation:

class PyFlyTestCase:
    context: ApplicationContext
    event_bus: InMemoryEventBus

    async def setup(self) -> None:
        self.context = ApplicationContext(Config({}))
        self.event_bus = InMemoryEventBus()
        await self.context.start()

    async def teardown(self) -> None:
        await self.context.stop()

Available Fixtures

After calling setup(), the following attributes are available:

Attribute Type Description
self.context ApplicationContext Pre-configured application context with empty config
self.event_bus InMemoryEventBus In-memory event bus for publishing and subscribing

The InMemoryEventBus supports wildcard pattern subscriptions (e.g., "order.*" matches "order.created" and "order.shipped"), making it easy to capture events in tests.

Source: src/pyfly/testing/fixtures.py


create_test_container()

create_test_container() creates a pre-configured DI Container for testing. It accepts an optional overrides dictionary that maps interface types to test implementations, enabling you to substitute real services with fakes or mocks.

Basic Usage

from pyfly.testing import create_test_container

# Create a container with no overrides
container = create_test_container()

Injecting Mocks and Fakes

The overrides parameter maps interface types to test implementations. Each override is registered as a SINGLETON and bound to the interface.

from pyfly.testing import create_test_container


class FakeOrderRepository:
    """In-memory order repository for testing."""

    def __init__(self):
        self.orders: dict[str, dict] = {}

    async def save(self, order: dict) -> None:
        self.orders[order["id"]] = order

    async def find_by_id(self, order_id: str) -> dict | None:
        return self.orders.get(order_id)


# Create container with the fake repository
container = create_test_container(overrides={
    OrderRepository: FakeOrderRepository,
})

create_test_container() Parameters:

Parameter Type Default Description
overrides dict[type, type] | None None Interface-to-implementation mappings

How overrides work internally:

For each (interface, impl) pair in the overrides dictionary:

  1. The implementation is registered with container.register(impl, scope=Scope.SINGLETON).
  2. If interface != impl, a binding is created with container.bind(interface, impl).

This means you can resolve() by the interface type and receive the test implementation:

repo = container.resolve(OrderRepository)
# Returns a FakeOrderRepository instance (singleton)

The full implementation:

def create_test_container(
    overrides: dict[type, type] | None = None,
) -> Container:
    container = Container()
    if overrides:
        for interface, impl in overrides.items():
            container.register(impl, scope=Scope.SINGLETON)
            if interface != impl:
                container.bind(interface, impl)
    return container

Source: src/pyfly/testing/containers.py

Resolving Services in Tests

Once the container is configured with overrides, register your service classes and use resolve() to get instances with their dependencies injected:

from pyfly.testing import create_test_container
from pyfly.container import Scope

container = create_test_container(overrides={
    OrderRepository: FakeOrderRepository,
})

# Also register the service that depends on the repository
container.register(OrderService, scope=Scope.SINGLETON)

# Resolve -- OrderService gets FakeOrderRepository injected via constructor
service = container.resolve(OrderService)

# The resolved service uses the fake repository
result = await service.create_order({"id": "ord-1", "customer": "Alice"})

Event Assertions

PyFly provides two assertion helpers for verifying event-driven behavior. They work with lists of EventEnvelope objects, which is the standard event wrapper in PyFly's EDA module.

An EventEnvelope contains:

Field Type Description
event_type str Event type identifier
payload dict[str, Any] Event data
destination str Target destination/topic
event_id str Auto-generated UUID
timestamp datetime Auto-generated UTC timestamp
headers dict[str, str] Optional metadata headers

assert_event_published()

Asserts that an event of a given type was published. Optionally verifies that the event payload contains specific key-value pairs.

from pyfly.testing import assert_event_published
from pyfly.eda.types import EventEnvelope

events = [
    EventEnvelope(
        event_type="order.created",
        payload={"order_id": "ord-123", "customer_id": "cust-42", "total": 99.99},
        destination="orders",
    ),
]

# Assert just the event type
event = assert_event_published(events, "order.created")

# Assert event type AND payload contents
event = assert_event_published(
    events,
    "order.created",
    payload_contains={"order_id": "ord-123", "customer_id": "cust-42"},
)

Parameters:

Parameter Type Default Description
events list[EventEnvelope] required List of captured event envelopes
event_type str required Expected event type string
payload_contains dict[str, Any] | None None Key-value pairs the payload must contain

Returns: The matching EventEnvelope instance.

Raises AssertionError when:

  • No event with the given type is found. The error message lists all published event types for debugging:
    Expected event 'order.created' to be published. Published events: ['user.updated']
    
  • A matching event is found but its payload is missing an expected key:
    Expected key 'order_id' in event payload
    
  • A matching event is found but a payload value does not match:
    Expected payload['order_id'] == 'ord-123', got 'ord-999'
    

How matching works internally:

  1. Filters events for those where event.event_type == event_type.
  2. Takes the first match (if there are multiple events of the same type).
  3. If payload_contains is provided, iterates over each key-value pair and asserts presence and equality against event.payload.

assert_no_events_published()

Asserts that no events were published at all. Use this to verify that a failed operation did not produce side effects.

from pyfly.testing import assert_no_events_published

events: list[EventEnvelope] = []

# Passes -- no events in the list
assert_no_events_published(events)

If events are present, the assertion fails with a descriptive message:

events.append(EventEnvelope(
    event_type="order.created",
    payload={"order_id": "ord-123"},
    destination="orders",
))
assert_no_events_published(events)
# AssertionError: Expected no events to be published. Got: ['order.created']

Parameters:

Parameter Type Description
events list[EventEnvelope] List of captured event envelopes

Source: src/pyfly/testing/assertions.py


mock_bean()

mock_bean() is a Python descriptor that provides fresh AsyncMock instances for each test. It replaces the manual boilerplate of creating AsyncMock(spec=...) fixtures.

from pyfly.testing import mock_bean, PyFlyTestCase


class TestOrderService(PyFlyTestCase):
    order_repo = mock_bean(OrderRepository)
    event_publisher = mock_bean(EventPublisher)

    async def test_create_order(self):
        self.order_repo.save.return_value = Order(id="1")

        service = OrderService(self.order_repo, self.event_publisher)
        result = await service.create_order({"customer": "Alice"})

        self.order_repo.save.assert_called_once()

How It Works

mock_bean() returns a MockBeanDescriptor — a Python descriptor that lazily creates an AsyncMock(spec=bean_type) per test instance. Each test method gets a fresh mock because the mock is attached to the instance, not the class.

Parameters:

Parameter Type Description
bean_type type The type to mock (used as the spec argument to AsyncMock)

Returns: A descriptor that provides an AsyncMock instance when accessed on an instance.

Key behaviors:

  • Each test instance gets its own mock (no state leakage between tests)
  • The mock has the spec of the given type, so typos in method names are caught
  • The bean_type property on the descriptor is available for introspection
# Access the underlying type
TestOrderService.order_repo.bean_type  # OrderRepository

Source: src/pyfly/testing/mock.py


Test Slices

Test slices are class decorators that mark test classes for focused testing of specific application layers. They mirror Spring Boot's @WebMvcTest, @DataJpaTest, and @SpringBootTest annotations.

from pyfly.testing import WebTest, DataTest, ServiceTest, get_test_slice

@WebTest

Marks a test class as a web-layer test. Use this for testing controllers and filters in isolation, with the service layer mocked.

from pyfly.testing import WebTest, mock_bean


@WebTest
class TestUserController:
    user_service = mock_bean(UserService)

    async def test_get_users_returns_200(self):
        self.user_service.list_users.return_value = [{"id": "1", "name": "Alice"}]
        # ... test controller with mocked service

@DataTest

Marks a test class as a data-layer test. Use this for testing repositories with in-memory backends.

from pyfly.testing import DataTest


@DataTest
class TestUserRepository:
    async def test_save_and_find(self):
        # ... test repository operations

@ServiceTest

Marks a test class as a service-layer test. Use this for testing services with mocked repositories.

from pyfly.testing import ServiceTest, mock_bean


@ServiceTest
class TestOrderService:
    order_repo = mock_bean(OrderRepository)

    async def test_place_order(self):
        # ... test service logic

get_test_slice()

Inspect which slice a test class belongs to:

from pyfly.testing import get_test_slice, WebTest, DataTest

@WebTest
class MyWebTest: ...

@DataTest
class MyDataTest: ...

class PlainTest: ...

get_test_slice(MyWebTest)    # "web"
get_test_slice(MyDataTest)   # "data"
get_test_slice(PlainTest)    # None
Decorator Slice Value Purpose
@WebTest "web" Controllers and filters
@DataTest "data" Repositories and queries
@ServiceTest "service" Services and business logic

Source: src/pyfly/testing/slices.py


PyFlyTestClient

PyFlyTestClient is a test HTTP client that wraps Starlette's TestClient with fluent assertion methods. It makes controller tests more readable by chaining assertions directly on the response.

from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse
from pyfly.testing import PyFlyTestClient


def get_users(request):
    return JSONResponse([{"id": "1", "name": "Alice"}])

app = Starlette(routes=[Route("/api/users", get_users)])
client = PyFlyTestClient(app)

# Fluent assertion chaining
client.get("/api/users") \
    .assert_status(200) \
    .assert_json_path("$[0].name", value="Alice") \
    .assert_header("content-type", value="application/json")

Creating a Client

from pyfly.testing import PyFlyTestClient

# Pass any ASGI application
client = PyFlyTestClient(app)

The client wraps Starlette's TestClient with raise_server_exceptions=False, so server errors return as HTTP 500 responses instead of raising exceptions in test code.

HTTP Methods

Method Description
client.get(url, **kwargs) Send GET request
client.post(url, **kwargs) Send POST request
client.put(url, **kwargs) Send PUT request
client.delete(url, **kwargs) Send DELETE request
client.patch(url, **kwargs) Send PATCH request

All methods return a TestResponse instance. Pass any keyword arguments supported by Starlette's TestClient (e.g., json=, headers=, cookies=).

TestResponse

TestResponse wraps the HTTP response with fluent assertion methods. All assert methods return self for chaining.

Properties:

Property Type Description
status_code int HTTP status code
headers dict[str, str] Response headers (lowercase keys)
body bytes Raw response body

Methods:

Method Description
json() Parse and return body as JSON (cached)

Fluent Assertion Methods

assert_status(expected)

Assert the response status code:

client.get("/api/users").assert_status(200)
client.post("/api/orders", json={}).assert_status(422)

assert_json_path(path, *, value=..., exists=True)

Assert a JSON path exists (or not) and optionally matches a value. Uses JSONPath syntax via jsonpath_ng:

response = client.get("/api/users/1")

# Assert path exists
response.assert_json_path("$.name")

# Assert path has specific value
response.assert_json_path("$.name", value="Alice")

# Assert path does NOT exist
response.assert_json_path("$.deleted_at", exists=False)
Parameter Type Default Description
path str required JSONPath expression
value Any ... (unset) Expected value (only checked if provided)
exists bool True Whether the path should exist

assert_header(name, *, value=None, exists=True)

Assert a response header exists and optionally matches a value:

response.assert_header("content-type", value="application/json")
response.assert_header("x-custom", exists=False)

assert_body_contains(text)

Assert the response body contains a text substring:

client.get("/health").assert_body_contains("UP")

Complete Test Example

import pytest
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse

from pyfly.testing import PyFlyTestClient


def list_items(request):
    return JSONResponse(
        [{"id": "1", "name": "Widget", "price": 9.99}],
        headers={"X-Total-Count": "1"},
    )

def create_item(request):
    return JSONResponse({"id": "2", "name": "Gadget"}, status_code=201)


app = Starlette(routes=[
    Route("/api/items", list_items, methods=["GET"]),
    Route("/api/items", create_item, methods=["POST"]),
])


class TestItemsAPI:
    @pytest.fixture(autouse=True)
    def setup_client(self):
        self.client = PyFlyTestClient(app)

    def test_list_items(self):
        self.client.get("/api/items") \
            .assert_status(200) \
            .assert_json_path("$[0].name", value="Widget") \
            .assert_json_path("$[0].price", value=9.99) \
            .assert_header("x-total-count", value="1")

    def test_create_item(self):
        self.client.post("/api/items", json={"name": "Gadget"}) \
            .assert_status(201) \
            .assert_json_path("$.name", value="Gadget")

Source: src/pyfly/testing/client.py


Testing Patterns

Unit Testing Services

Unit tests mock all external dependencies and test a single class in isolation. Use unittest.mock.AsyncMock for async dependencies.

import pytest
from unittest.mock import AsyncMock
from pyfly.kernel.exceptions import ResourceNotFoundException


class TestOrderService:
    """Pure unit tests -- all dependencies are mocked."""

    @pytest.fixture
    def mock_repo(self):
        repo = AsyncMock()
        repo.find_by_id = AsyncMock(return_value=None)
        repo.save = AsyncMock()
        return repo

    @pytest.fixture
    def service(self, mock_repo):
        return OrderService(order_repository=mock_repo)

    async def test_create_order_saves_to_repository(self, service, mock_repo):
        result = await service.create_order({
            "customer_id": "cust-42",
            "items": [{"product": "Widget", "qty": 1}],
        })

        mock_repo.save.assert_called_once()
        assert result["status"] == "created"

    async def test_create_order_returns_order_id(self, service):
        result = await service.create_order({
            "customer_id": "cust-42",
            "items": [{"product": "Widget", "qty": 1}],
        })
        assert "order_id" in result

    async def test_get_order_raises_not_found(self, service, mock_repo):
        mock_repo.find_by_id.return_value = None

        with pytest.raises(ResourceNotFoundException):
            await service.get_order("nonexistent-id")

Integration Testing with In-Memory Adapters

Integration tests use PyFly's in-memory adapters (such as InMemoryEventBus) and fake repositories to test multiple components working together.

import pytest
from pyfly.testing import PyFlyTestCase, assert_event_published
from pyfly.eda.types import EventEnvelope


class TestOrderEventIntegration(PyFlyTestCase):
    """Tests that verify event publishing and subscription."""

    async def test_order_created_event_is_published_and_received(self):
        await self.setup()

        # Set up an event capture list
        captured_events: list[EventEnvelope] = []

        async def capture(envelope: EventEnvelope) -> None:
            captured_events.append(envelope)

        # Subscribe to order events using wildcard pattern
        self.event_bus.subscribe("order.*", capture)

        # Publish an event (simulating what the service would do)
        await self.event_bus.publish(
            destination="orders",
            event_type="order.created",
            payload={"order_id": "ord-123", "total": 59.99},
        )

        # Verify the event was published correctly
        event = assert_event_published(
            captured_events,
            "order.created",
            payload_contains={"order_id": "ord-123"},
        )
        assert event.payload["total"] == 59.99

        await self.teardown()

    async def test_wildcard_pattern_receives_multiple_event_types(self):
        await self.setup()

        captured: list[EventEnvelope] = []

        async def capture(envelope: EventEnvelope) -> None:
            captured.append(envelope)

        self.event_bus.subscribe("order.*", capture)

        await self.event_bus.publish("orders", "order.created", {"id": "1"})
        await self.event_bus.publish("orders", "order.shipped", {"id": "1"})
        await self.event_bus.publish("users", "user.created", {"id": "u1"})

        # Only order.* events should be captured
        assert len(captured) == 2
        assert_event_published(captured, "order.created")
        assert_event_published(captured, "order.shipped")

        await self.teardown()

Testing Controllers

Test controllers by creating a Starlette test client with create_app() and httpx.AsyncClient:

import pytest
from httpx import AsyncClient, ASGITransport
from pyfly.web.adapters.starlette import create_app
from pyfly.testing import create_test_container
from pyfly.container import Scope


class TestOrderController:
    """HTTP-level tests for the order controller."""

    @pytest.fixture
    async def client(self):
        # Set up the DI container with test overrides
        container = create_test_container(overrides={
            OrderRepository: FakeOrderRepository,
        })
        container.register(OrderService, scope=Scope.SINGLETON)
        container.register(OrderController, scope=Scope.SINGLETON)

        app = create_app(title="Test", version="0.0.1")
        transport = ASGITransport(app=app)
        async with AsyncClient(transport=transport, base_url="http://test") as c:
            yield c

    async def test_create_order_returns_201(self, client):
        response = await client.post("/api/orders", json={
            "customer_id": "cust-42",
            "items": [{"product_id": "SKU-001", "quantity": 2}],
        })
        assert response.status_code == 201
        data = response.json()
        assert data["status"] == "created"

    async def test_create_order_invalid_body_returns_422(self, client):
        response = await client.post("/api/orders", json={})
        assert response.status_code == 422
        error = response.json()["error"]
        assert error["code"] == "VALIDATION_ERROR"

    async def test_get_order_not_found_returns_404(self, client):
        response = await client.get("/api/orders/nonexistent")
        assert response.status_code == 404

Testing Event Handlers

Test event handlers by directly invoking them with constructed EventEnvelope objects:

import pytest
from unittest.mock import AsyncMock
from pyfly.eda.types import EventEnvelope


class TestOrderCreatedHandler:
    """Tests for the order.created event handler."""

    @pytest.fixture
    def notification_service(self):
        return AsyncMock()

    @pytest.fixture
    def handler(self, notification_service):
        return OrderCreatedHandler(
            notification_service=notification_service,
        )

    async def test_sends_confirmation_notification(self, handler, notification_service):
        envelope = EventEnvelope(
            event_type="order.created",
            payload={"order_id": "ord-123", "customer_id": "cust-42"},
            destination="orders",
        )

        await handler.handle(envelope)

        notification_service.send_confirmation.assert_called_once_with(
            customer_id="cust-42",
            order_id="ord-123",
        )

    async def test_handles_missing_customer_id_gracefully(self, handler):
        envelope = EventEnvelope(
            event_type="order.created",
            payload={"order_id": "ord-123"},  # no customer_id
            destination="orders",
        )

        # Should not raise -- handler should handle missing data gracefully
        await handler.handle(envelope)

Complete Example

The following example puts everything together: a service under test with a fake repository, DI container setup, and event assertions.

"""tests/test_order_service.py"""

import pytest
from unittest.mock import AsyncMock

from pyfly.testing import (
    PyFlyTestCase,
    create_test_container,
    assert_event_published,
    assert_no_events_published,
)
from pyfly.container import Scope
from pyfly.eda.types import EventEnvelope
from pyfly.kernel.exceptions import ResourceNotFoundException, ValidationException


# =========================================================================
# Test Doubles
# =========================================================================

class OrderRepository:
    """Interface for the order repository (used for type binding)."""
    pass


class FakeOrderRepository:
    """In-memory order repository for testing."""

    def __init__(self):
        self.orders: dict[str, dict] = {}

    async def save(self, order: dict) -> None:
        self.orders[order["id"]] = order

    async def find_by_id(self, order_id: str) -> dict | None:
        return self.orders.get(order_id)

    async def find_all(self) -> list[dict]:
        return list(self.orders.values())


# =========================================================================
# Service Under Test
# =========================================================================

class OrderService:
    def __init__(self, order_repository: OrderRepository) -> None:
        self._repo = order_repository

    async def create_order(self, data: dict) -> dict:
        if "customer_id" not in data:
            raise ValidationException("customer_id is required")
        order = {
            "id": "ord-001",
            "customer_id": data["customer_id"],
            "status": "created",
        }
        await self._repo.save(order)
        return order

    async def get_order(self, order_id: str) -> dict:
        order = await self._repo.find_by_id(order_id)
        if order is None:
            raise ResourceNotFoundException(
                f"Order {order_id} not found",
                code="ORDER_NOT_FOUND",
            )
        return order


# =========================================================================
# Unit Tests -- fast, isolated, mocked dependencies
# =========================================================================

class TestOrderServiceUnit:

    @pytest.fixture
    def mock_repo(self):
        return AsyncMock(spec=FakeOrderRepository)

    @pytest.fixture
    def service(self, mock_repo):
        return OrderService(order_repository=mock_repo)

    async def test_create_order_success(self, service, mock_repo):
        result = await service.create_order({"customer_id": "cust-42"})

        assert result["status"] == "created"
        assert result["customer_id"] == "cust-42"
        mock_repo.save.assert_called_once()

    async def test_create_order_missing_customer_raises(self, service):
        with pytest.raises(ValidationException, match="customer_id is required"):
            await service.create_order({})

    async def test_get_order_not_found_raises(self, service, mock_repo):
        mock_repo.find_by_id.return_value = None

        with pytest.raises(ResourceNotFoundException):
            await service.get_order("nonexistent")


# =========================================================================
# Integration Tests -- real components, in-memory adapters
# =========================================================================

class TestOrderServiceIntegration:

    @pytest.fixture
    def container(self):
        container = create_test_container(overrides={
            OrderRepository: FakeOrderRepository,
        })
        container.register(OrderService, scope=Scope.SINGLETON)
        return container

    @pytest.fixture
    def service(self, container) -> OrderService:
        return container.resolve(OrderService)

    @pytest.fixture
    def repo(self, container) -> FakeOrderRepository:
        return container.resolve(OrderRepository)

    async def test_create_and_retrieve_order(self, service, repo):
        # Create an order
        created = await service.create_order({"customer_id": "cust-42"})
        assert created["status"] == "created"

        # Retrieve it from the repository
        fetched = await service.get_order(created["id"])
        assert fetched["customer_id"] == "cust-42"

        # Verify it was persisted in the fake repository
        assert created["id"] in repo.orders

    async def test_repository_is_shared_singleton(self, container):
        repo1 = container.resolve(OrderRepository)
        repo2 = container.resolve(OrderRepository)
        assert repo1 is repo2  # Same singleton instance


# =========================================================================
# Event Tests -- verify event publishing behavior
# =========================================================================

class TestOrderEvents(PyFlyTestCase):

    async def test_order_created_event(self):
        await self.setup()

        captured: list[EventEnvelope] = []

        async def capture(envelope: EventEnvelope) -> None:
            captured.append(envelope)

        self.event_bus.subscribe("order.*", capture)

        # Simulate publishing an order created event
        await self.event_bus.publish(
            destination="orders",
            event_type="order.created",
            payload={
                "order_id": "ord-001",
                "customer_id": "cust-42",
                "total": 149.97,
            },
        )

        # Assert the event with payload matching
        event = assert_event_published(
            captured,
            "order.created",
            payload_contains={
                "order_id": "ord-001",
                "customer_id": "cust-42",
            },
        )
        assert event.payload["total"] == 149.97
        assert event.destination == "orders"

        await self.teardown()

    async def test_no_events_when_validation_fails(self):
        await self.setup()

        captured: list[EventEnvelope] = []

        async def capture(envelope: EventEnvelope) -> None:
            captured.append(envelope)

        self.event_bus.subscribe("order.*", capture)

        # Validation failure should not produce events
        # (no publish call was made)
        assert_no_events_published(captured)

        await self.teardown()

Run the tests:

# Run all tests with verbose output
pytest tests/test_order_service.py -v

# Run only unit tests
pytest tests/test_order_service.py::TestOrderServiceUnit -v

# Run only integration tests
pytest tests/test_order_service.py::TestOrderServiceIntegration -v

# Run with async support (if using pytest-asyncio)
pytest tests/test_order_service.py -v --asyncio-mode=auto