Skip to content

Complete Examples

  • Back to Event Handling Overview

    Return to the Event Handling overview page with all topics.

    Back to Overview


Overview

Here's a complete example demonstrating event handling:

import asyncio
import di
import cqrs
from cqrs.requests import bootstrap

# Domain event
class UserJoined(cqrs.DomainEvent, frozen=True):
    user_id: str
    meeting_id: str

# Command handler
class JoinMeetingCommand(cqrs.Request):
    user_id: str
    meeting_id: str

class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
    def __init__(self):
        self._events: list[cqrs.Event] = []

    @property
    def events(self) -> list[cqrs.Event]:
        return self._events

    async def handle(self, request: JoinMeetingCommand) -> None:
        # Business logic
        print(f"User {request.user_id} joined meeting {request.meeting_id}")

        # Emit domain event
        self._events.append(
            UserJoined(user_id=request.user_id, meeting_id=request.meeting_id)
        )

# Event handler
class UserJoinedEventHandler(cqrs.EventHandler[UserJoined]):
    async def handle(self, event: UserJoined) -> None:
        print(f"Processing event: User {event.user_id} joined meeting {event.meeting_id}")
        # Side effects: send notification, update read model, etc.

# Mappers
def commands_mapper(mapper: cqrs.RequestMap) -> None:
    mapper.bind(JoinMeetingCommand, JoinMeetingCommandHandler)

def domain_events_mapper(mapper: cqrs.EventMap) -> None:
    mapper.bind(UserJoined, UserJoinedEventHandler)

# Bootstrap with parallel processing
mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    domain_events_mapper=domain_events_mapper,
    max_concurrent_event_handlers=3,
    concurrent_event_handle_enable=True,
)

# Execute command
await mediator.send(JoinMeetingCommand(user_id="123", meeting_id="456"))

# Flow:
# 1. Command handler executes
# 2. UserJoined event is collected
# 3. EventDispatcher processes event (finds UserJoinedEventHandler)
# 4. UserJoinedEventHandler.handle() executes
# 5. Response is returned