Core Features¶
-
Bootstrap
Quick project setup and configuration with automatic DI container setup.
-
Request Handlers
Handle commands and queries with full type safety and async support.
-
Saga Pattern
Orchestrated Saga for distributed transactions with automatic compensation.
-
Event Handling
Process domain events with parallel processing and runtime execution.
-
Transaction Outbox
Guaranteed event delivery with at-least-once semantics.
-
Chain of Responsibility
Sequential request processing with flexible handler chaining.
-
Streaming
Incremental processing with real-time progress updates via SSE.
-
Integrations
FastAPI, FastStream, Kafka, and Protobuf integrations out of the box.
-
Mermaid Diagrams
Visualize architecture patterns and flows with interactive Mermaid diagrams.
What is it?¶
Python CQRS is a framework for implementing the CQRS (Command Query Responsibility Segregation) pattern in Python applications. It helps separate read and write operations, improving scalability, performance, and code maintainability.
Key Highlights:
- Performance — Separation of commands and queries, parallel event processing
- Reliability — Transaction Outbox for guaranteed event delivery
- Type Safety — Full Pydantic v2 support with runtime validation
- Ready Integrations — FastAPI, FastStream, Kafka out of the box
- Simple Setup — Bootstrap for quick configuration
Installation¶
Install Python CQRS using pip or uv:
Using pip:
Using uv:
Requirements
Python 3.8+ • Pydantic v2
Quick Start¶
import di
import cqrs
from cqrs.requests import bootstrap
# Define command, response and handler
class CreateUserCommand(cqrs.Request):
email: str
name: str
class CreateUserResponse(cqrs.Response):
user_id: str
class CreateUserHandler(cqrs.RequestHandler[CreateUserCommand, CreateUserResponse]):
async def handle(self, request: CreateUserCommand) -> CreateUserResponse:
# Your business logic here
user_id = f"user_{request.email}"
return CreateUserResponse(user_id=user_id)
# Bootstrap and use
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=lambda m: m.bind(CreateUserCommand, CreateUserHandler),
)
result = await mediator.send(CreateUserCommand(email="user@example.com", name="John"))
See Bootstrap for detailed setup instructions.