Skip to content

Streaming Mediator

  • Back to Bootstrap Overview

    Return to the Bootstrap overview page with all configuration options.

    Back to Overview


Overview

The StreamingRequestMediator processes requests incrementally, yielding results as they become available. Perfect for batch processing, file uploads, and real-time progress updates.

Basic Configuration

from cqrs.requests import bootstrap

def commands_mapper(mapper: cqrs.RequestMap) -> None:
    mapper.bind(ProcessFilesCommand, ProcessFilesCommandHandler)

def events_mapper(mapper: cqrs.EventMap) -> None:
    mapper.bind(FileProcessedEvent, FileProcessedEventHandler)

mediator = bootstrap.bootstrap_streaming(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    domain_events_mapper=events_mapper,
)

With Parallel Event Processing

# Streaming mediator defaults to parallel event processing
mediator = bootstrap.bootstrap_streaming(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    domain_events_mapper=events_mapper,
    max_concurrent_event_handlers=10,  # Default: 10
    concurrent_event_handle_enable=True,  # Default: True for streaming
)

With Message Broker

from cqrs.message_brokers import kafka
from cqrs.adapters.kafka import KafkaProducerAdapter

kafka_producer = KafkaProducerAdapter(
    bootstrap_servers=["localhost:9092"],
)

mediator = bootstrap.bootstrap_streaming(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    domain_events_mapper=events_mapper,
    message_broker=kafka.KafkaMessageBroker(producer=kafka_producer),
)

Complete Example

import typing
import asyncio
import di
import cqrs
from cqrs.requests import bootstrap
from cqrs.requests.request_handler import StreamingRequestHandler
from cqrs.message_brokers import devnull

class ProcessFilesCommand(cqrs.Request):
    file_ids: list[str]

class FileProcessedResult(cqrs.Response):
    file_id: str
    status: str

class FileProcessedEvent(cqrs.DomainEvent):
    file_id: str
    status: str

class ProcessFilesCommandHandler(
    StreamingRequestHandler[ProcessFilesCommand, FileProcessedResult]
):
    def __init__(self):
        self._events = []

    @property
    def events(self) -> list[cqrs.Event]:
        return self._events.copy()

    def clear_events(self) -> None:
        self._events.clear()

    async def handle(
        self, request: ProcessFilesCommand
    ) -> typing.AsyncIterator[FileProcessedResult]:
        for file_id in request.file_ids:
            # Simulate processing
            await asyncio.sleep(0.1)
            result = FileProcessedResult(file_id=file_id, status="completed")
            self._events.append(
                FileProcessedEvent(file_id=file_id, status="completed")
            )
            yield result

class FileProcessedEventHandler(cqrs.EventHandler[FileProcessedEvent]):
    async def handle(self, event: FileProcessedEvent) -> None:
        print(f"File {event.file_id} processed")

def commands_mapper(mapper: cqrs.RequestMap) -> None:
    mapper.bind(ProcessFilesCommand, ProcessFilesCommandHandler)

def events_mapper(mapper: cqrs.EventMap) -> None:
    mapper.bind(FileProcessedEvent, FileProcessedEventHandler)

mediator = bootstrap.bootstrap_streaming(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    domain_events_mapper=events_mapper,
    message_broker=devnull.DevnullMessageBroker(),
    max_concurrent_event_handlers=5,
    concurrent_event_handle_enable=True,
)

# Stream results
async for result in mediator.stream(
    ProcessFilesCommand(file_ids=["1", "2", "3"])
):
    if result:
        print(f"Processed: {result.file_id} - {result.status}")