Request Mediator¶
-
Back to Bootstrap Overview
Return to the Bootstrap overview page with all configuration options.
Overview¶
The RequestMediator is the standard mediator for handling commands and queries. It processes requests synchronously and emits events after handler execution.
Basic Configuration¶
import di
import cqrs
from cqrs.requests import bootstrap
def commands_mapper(mapper: cqrs.RequestMap) -> None:
mapper.bind(CreateUserCommand, CreateUserCommandHandler)
def queries_mapper(mapper: cqrs.RequestMap) -> None:
mapper.bind(GetUserQuery, GetUserQueryHandler)
def events_mapper(mapper: cqrs.EventMap) -> None:
mapper.bind(
cqrs.DomainEvent[UserCreatedPayload],
UserCreatedEventHandler
)
# Basic bootstrap with di.Container
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
queries_mapper=queries_mapper,
domain_events_mapper=events_mapper,
)
With Message Broker¶
from cqrs.message_brokers import devnull, kafka
# Using DevnullMessageBroker (for testing)
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
domain_events_mapper=events_mapper,
message_broker=devnull.DevnullMessageBroker(),
)
# Using KafkaMessageBroker
from cqrs.adapters.kafka import KafkaProducerAdapter
kafka_producer = KafkaProducerAdapter(
bootstrap_servers=["localhost:9092"],
client_id="my-app",
)
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
domain_events_mapper=events_mapper,
message_broker=kafka.KafkaMessageBroker(
producer=kafka_producer,
aiokafka_log_level="ERROR",
),
)
With Parallel Event Processing¶
# Enable parallel event processing
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
domain_events_mapper=events_mapper,
max_concurrent_event_handlers=5, # Process up to 5 events concurrently
concurrent_event_handle_enable=True, # Enable parallel processing
)
With Custom Middlewares¶
from cqrs.middlewares import base
class CustomMiddleware(base.Middleware):
async def __call__(self, request: cqrs.Request, handle):
print(f"Before handling {type(request).__name__}")
result = await handle(request)
print(f"After handling {type(request).__name__}")
return result
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
middlewares=[CustomMiddleware()],
)
With On Startup Callbacks¶
def initialize_database():
# Initialize database connections, create tables, etc.
print("Database initialized")
def setup_cache():
# Setup cache connections
print("Cache initialized")
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
on_startup=[initialize_database, setup_cache],
)
Complete Example¶
import di
import cqrs
from cqrs.requests import bootstrap
from cqrs.message_brokers import devnull
from cqrs.middlewares import base
class CreateUserCommand(cqrs.Request):
user_id: str
email: str
class UserCreatedEvent(cqrs.DomainEvent):
user_id: str
email: str
class CreateUserCommandHandler(cqrs.RequestHandler[CreateUserCommand, None]):
def __init__(self):
self._events = []
@property
def events(self) -> list[cqrs.Event]:
return self._events
async def handle(self, request: CreateUserCommand) -> None:
# Business logic
self._events.append(
UserCreatedEvent(user_id=request.user_id, email=request.email)
)
class UserCreatedEventHandler(cqrs.EventHandler[UserCreatedEvent]):
async def handle(self, event: UserCreatedEvent) -> None:
print(f"User {event.user_id} created with email {event.email}")
def commands_mapper(mapper: cqrs.RequestMap) -> None:
mapper.bind(CreateUserCommand, CreateUserCommandHandler)
def events_mapper(mapper: cqrs.EventMap) -> None:
mapper.bind(UserCreatedEvent, UserCreatedEventHandler)
# Complete configuration
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
domain_events_mapper=events_mapper,
message_broker=devnull.DevnullMessageBroker(),
max_concurrent_event_handlers=3,
concurrent_event_handle_enable=True,
on_startup=[lambda: print("Application started")],
)
# Use the mediator
result = await mediator.send(CreateUserCommand(user_id="1", email="user@example.com"))