Skip to content

FastAPI Integration

  • Back to Stream Handling Overview

    Return to the Stream Handling overview page with all topics.

    Back to Overview


Overview

StreamingRequestMediator is designed for use with Server-Sent Events (SSE) in FastAPI applications:

import fastapi
import json
from cqrs.requests import bootstrap

def streaming_mediator_factory() -> cqrs.StreamingRequestMediator:
    return bootstrap.bootstrap_streaming(
        di_container=container,
        commands_mapper=commands_mapper,
        domain_events_mapper=domain_events_mapper,
        message_broker=broker,
        max_concurrent_event_handlers=3,
        concurrent_event_handle_enable=True,
    )

@app.post("/process-files")
async def process_files_stream(
    command: ProcessFilesCommand,
    mediator: cqrs.StreamingRequestMediator = fastapi.Depends(
        streaming_mediator_factory
    ),
) -> fastapi.responses.StreamingResponse:
    async def generate_sse():
        yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\n\n"

        async for result in mediator.stream(command):
            if result is None:
                continue

            sse_data = {
                "type": "progress",
                "data": result.model_dump(),
            }
            yield f"data: {json.dumps(sse_data)}\n\n"

        yield f"data: {json.dumps({'type': 'complete'})}\n\n"

    return fastapi.responses.StreamingResponse(
        generate_sse(),
        media_type="text/event-stream",
    )

Here's a complete example demonstrating streaming handlers with parallel event processing:

import asyncio
import typing
from datetime import datetime
import di
import cqrs
from cqrs.requests import bootstrap
from cqrs.message_brokers import devnull

# Domain models
class ProcessOrdersCommand(cqrs.Request):
    order_ids: list[str]

class OrderProcessedResult(cqrs.Response):
    order_id: str
    status: str
    processed_at: datetime
    items_count: int

# Domain events
class OrderProcessedEvent(cqrs.DomainEvent, frozen=True):
    order_id: str
    customer_id: str
    total_amount: float

class OrderAnalyticsEvent(cqrs.DomainEvent, frozen=True):
    order_id: str
    category: str

# Streaming handler
class ProcessOrdersCommandHandler(
    cqrs.StreamingRequestHandler[ProcessOrdersCommand, OrderProcessedResult]
):
    def __init__(self):
        self._events: list[cqrs.Event] = []

    @property
    def events(self) -> list[cqrs.Event]:
        return self._events.copy()

    def clear_events(self) -> None:
        self._events.clear()

    async def handle(
        self, request: ProcessOrdersCommand
    ) -> typing.AsyncIterator[OrderProcessedResult]:
        for order_id in request.order_ids:
            # Simulate processing
            await asyncio.sleep(0.1)

            # Create result
            result = OrderProcessedResult(
                order_id=order_id,
                status="processed",
                processed_at=datetime.now(),
                items_count=3,
            )

            # Emit multiple events
            self._events.append(
                OrderProcessedEvent(
                    order_id=order_id,
                    customer_id=f"customer_{order_id}",
                    total_amount=100.0,
                )
            )
            self._events.append(
                OrderAnalyticsEvent(
                    order_id=order_id,
                    category="electronics",
                )
            )

            yield result

# Event handlers
class OrderProcessedEventHandler(cqrs.EventHandler[OrderProcessedEvent]):
    async def handle(self, event: OrderProcessedEvent) -> None:
        print(f"Order {event.order_id} processed for customer {event.customer_id}")

class OrderAnalyticsEventHandler(cqrs.EventHandler[OrderAnalyticsEvent]):
    async def handle(self, event: OrderAnalyticsEvent) -> None:
        print(f"Analytics updated for order {event.order_id} in category {event.category}")

# Mappers
def commands_mapper(mapper: cqrs.RequestMap) -> None:
    mapper.bind(ProcessOrdersCommand, ProcessOrdersCommandHandler)

def domain_events_mapper(mapper: cqrs.EventMap) -> None:
    mapper.bind(OrderProcessedEvent, OrderProcessedEventHandler)
    mapper.bind(OrderAnalyticsEvent, OrderAnalyticsEventHandler)

# Bootstrap
async def main():
    mediator = bootstrap.bootstrap_streaming(
        di_container=di.Container(),
        commands_mapper=commands_mapper,
        domain_events_mapper=domain_events_mapper,
        message_broker=devnull.DevnullMessageBroker(),
        max_concurrent_event_handlers=3,
        concurrent_event_handle_enable=True,
    )

    command = ProcessOrdersCommand(order_ids=["order1", "order2", "order3"])

    async for result in mediator.stream(command):
        print(f"Processed: {result.order_id}")

if __name__ == "__main__":
    asyncio.run(main())