Complete Examples¶
-
Back to Event Handling Overview
Return to the Event Handling overview page with all topics.
Overview¶
Here's a complete example demonstrating event handling:
import asyncio
import di
import cqrs
from cqrs.requests import bootstrap
# Domain event
class UserJoined(cqrs.DomainEvent, frozen=True):
user_id: str
meeting_id: str
# Command handler
class JoinMeetingCommand(cqrs.Request):
user_id: str
meeting_id: str
class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
def __init__(self):
self._events: list[cqrs.Event] = []
@property
def events(self) -> list[cqrs.Event]:
return self._events
async def handle(self, request: JoinMeetingCommand) -> None:
# Business logic
print(f"User {request.user_id} joined meeting {request.meeting_id}")
# Emit domain event
self._events.append(
UserJoined(user_id=request.user_id, meeting_id=request.meeting_id)
)
# Event handler
class UserJoinedEventHandler(cqrs.EventHandler[UserJoined]):
async def handle(self, event: UserJoined) -> None:
print(f"Processing event: User {event.user_id} joined meeting {event.meeting_id}")
# Side effects: send notification, update read model, etc.
# Mappers
def commands_mapper(mapper: cqrs.RequestMap) -> None:
mapper.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
def domain_events_mapper(mapper: cqrs.EventMap) -> None:
mapper.bind(UserJoined, UserJoinedEventHandler)
# Bootstrap with parallel processing
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
domain_events_mapper=domain_events_mapper,
max_concurrent_event_handlers=3,
concurrent_event_handle_enable=True,
)
# Execute command
await mediator.send(JoinMeetingCommand(user_id="123", meeting_id="456"))
# Flow:
# 1. Command handler executes
# 2. UserJoined event is collected
# 3. EventDispatcher processes event (finds UserJoinedEventHandler)
# 4. UserJoinedEventHandler.handle() executes
# 5. Response is returned