Skip to content

Examples

  • Back to Transactional Outbox Overview

    Return to the Transactional Outbox overview page with all topics.

    Back to Overview


Overview

Here's a complete example showing the outbox pattern:

import asyncio
import di
import cqrs
from cqrs.requests import bootstrap
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

# Register events
class UserJoinedPayload(cqrs.BaseModel, frozen=True):
    user_id: str
    meeting_id: str

cqrs.OutboxedEventMap.register(
    "user_joined",
    cqrs.NotificationEvent[UserJoinedPayload],
)

# Command handler
class JoinMeetingCommand(cqrs.Request):
    user_id: str
    meeting_id: str

class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
    def __init__(self, outbox: cqrs.OutboxedEventRepository):
        self.outbox = outbox

    @property
    def events(self) -> list[cqrs.Event]:
        return []

    async def handle(self, request: JoinMeetingCommand) -> None:
        # Business logic
        print(f"User {request.user_id} joined meeting {request.meeting_id}")

        # Save event to outbox
        self.outbox.add(
            cqrs.NotificationEvent[UserJoinedPayload](
                event_name="user_joined",
                topic="user_events",
                payload=UserJoinedPayload(
                    user_id=request.user_id,
                    meeting_id=request.meeting_id,
                ),
            )
        )

        # Commit transaction
        await self.outbox.commit()

# Setup DI
def setup_di():
    container = di.Container()
    session_factory = async_sessionmaker(
        create_async_engine("mysql+asyncmy://user:pass@localhost/db")
    )

    container.bind(
        di.bind_by_type(
            di.Dependent(
                lambda: cqrs.SqlAlchemyOutboxedEventRepository(
                    session=session_factory(),
                ),
                scope="request",
            ),
            cqrs.OutboxedEventRepository,
        )
    )
    return container

# Bootstrap
mediator = bootstrap.bootstrap(
    di_container=setup_di(),
    commands_mapper=lambda m: m.bind(JoinMeetingCommand, JoinMeetingCommandHandler),
)

# Use mediator
await mediator.send(JoinMeetingCommand(user_id="123", meeting_id="456"))