Skip to content

Complete Examples

  • Back to Event Handling Overview

    Return to the Event Handling overview page with all topics.

    Back to Overview


Overview

Here's a complete example demonstrating event handling:

import asyncio
import di
import cqrs
from cqrs.requests import bootstrap

# Domain event
class UserJoined(cqrs.DomainEvent, frozen=True):
    user_id: str
    meeting_id: str

# Command handler
class JoinMeetingCommand(cqrs.Request):
    user_id: str
    meeting_id: str

class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
    def __init__(self):
        self._events: list[cqrs.Event] = []

    @property
    def events(self) -> list[cqrs.Event]:
        return self._events

    async def handle(self, request: JoinMeetingCommand) -> None:
        # Business logic
        print(f"User {request.user_id} joined meeting {request.meeting_id}")

        # Emit domain event
        self._events.append(
            UserJoined(user_id=request.user_id, meeting_id=request.meeting_id)
        )

# Event handler
class UserJoinedEventHandler(cqrs.EventHandler[UserJoined]):
    async def handle(self, event: UserJoined) -> None:
        print(f"Processing event: User {event.user_id} joined meeting {event.meeting_id}")
        # Side effects: send notification, update read model, etc.

# Mappers
def commands_mapper(mapper: cqrs.RequestMap) -> None:
    mapper.bind(JoinMeetingCommand, JoinMeetingCommandHandler)

def domain_events_mapper(mapper: cqrs.EventMap) -> None:
    mapper.bind(UserJoined, UserJoinedEventHandler)

# Bootstrap with parallel processing
mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    domain_events_mapper=domain_events_mapper,
    max_concurrent_event_handlers=3,
    concurrent_event_handle_enable=True,
)

# Execute command
await mediator.send(JoinMeetingCommand(user_id="123", meeting_id="456"))

# Flow:
# 1. Command handler executes
# 2. UserJoined event is collected
# 3. EventProcessor.emit_events runs (EventEmitter finds UserJoinedEventHandler)
# 4. UserJoinedEventHandler.handle() executes
# 5. Response is returned

Event handler chain (follow-up events)

Event handlers can produce follow-up events via the events property. These are processed in the same pipeline (BFS in sequential mode, or under the same semaphore in parallel mode), enabling multi-level chains (e.g. L1 → L2 → L3).

import typing
import cqrs
from cqrs.events.event import IEvent

# Level 1: emitted by command handler
class EventL1(cqrs.DomainEvent, frozen=True):
    seed: str

# Level 2: emitted by HandlerL1
class EventL2(cqrs.DomainEvent, frozen=True):
    seed: str

# Level 3: emitted by HandlerL2 (terminal)
class EventL3(cqrs.DomainEvent, frozen=True):
    seed: str

class HandlerL1(cqrs.EventHandler[EventL1]):
    def __init__(self) -> None:
        self._follow_ups: list[IEvent] = []

    @property
    def events(self) -> typing.Sequence[IEvent]:
        return tuple(self._follow_ups)

    async def handle(self, event: EventL1) -> None:
        # Side effects...
        self._follow_ups.append(EventL2(seed=event.seed))

class HandlerL2(cqrs.EventHandler[EventL2]):
    def __init__(self) -> None:
        self._follow_ups: list[IEvent] = []

    @property
    def events(self) -> typing.Sequence[IEvent]:
        return tuple(self._follow_ups)

    async def handle(self, event: EventL2) -> None:
        # Side effects...
        self._follow_ups.append(EventL3(seed=event.seed))

class HandlerL3(cqrs.EventHandler[EventL3]):
    async def handle(self, event: EventL3) -> None:
        # Terminal handler — no follow-ups (default events = ())
        pass

# In events_mapper:
def domain_events_mapper(mapper: cqrs.EventMap) -> None:
    mapper.bind(EventL1, HandlerL1)
    mapper.bind(EventL2, HandlerL2)
    mapper.bind(EventL3, HandlerL3)

When you emit EventL1(seed="x"), the processor runs: L1 → HandlerL1 emits L2 → HandlerL2 emits L3 → HandlerL3 runs. All in the same emit_events() call (sequential BFS or parallel with FIRST_COMPLETED).