Skip to content
Python CQRS

Python CQRS

Event-Driven Architecture Framework for Distributed Systems


Core Features

  • Bootstrap

    Quick project setup and configuration with automatic DI container setup.

    Read More

  • Request Handlers

    Handle commands and queries with full type safety and async support.

    Read More

  • Saga Pattern

    Orchestrated Saga for distributed transactions with automatic compensation.

    Read More

  • Event Handling

    Process domain events with parallel processing and runtime execution.

    Read More

  • Transaction Outbox

    Guaranteed event delivery with at-least-once semantics.

    Read More

  • Chain of Responsibility

    Sequential request processing with flexible handler chaining.

    Read More

  • Streaming

    Incremental processing with real-time progress updates via SSE.

    Read More

  • Integrations

    FastAPI, FastStream, Kafka, and Protobuf integrations out of the box.

    Read More

  • Mermaid Diagrams

    Visualize architecture patterns and flows with interactive Mermaid diagrams.

    Read More


What is it?

Python CQRS is a framework for implementing the CQRS (Command Query Responsibility Segregation) pattern in Python applications. It helps separate read and write operations, improving scalability, performance, and code maintainability.

Key Highlights:

  • Performance — Separation of commands and queries, parallel event processing
  • Reliability — Transaction Outbox for guaranteed event delivery
  • Type Safety — Full Pydantic v2 support with runtime validation
  • Ready Integrations — FastAPI, FastStream, Kafka out of the box
  • Simple Setup — Bootstrap for quick configuration

Installation

Install Python CQRS using pip or uv:

Using pip:

pip install python-cqrs

Using uv:

uv pip install python-cqrs

Requirements

Python 3.8+ • Pydantic v2


Quick Start

import di
import cqrs
from cqrs.requests import bootstrap

# Define command, response and handler
class CreateUserCommand(cqrs.Request):
    email: str
    name: str

class CreateUserResponse(cqrs.Response):
    user_id: str

class CreateUserHandler(cqrs.RequestHandler[CreateUserCommand, CreateUserResponse]):
    async def handle(self, request: CreateUserCommand) -> CreateUserResponse:
        # Your business logic here
        user_id = f"user_{request.email}"
        return CreateUserResponse(user_id=user_id)

# Bootstrap and use
mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=lambda m: m.bind(CreateUserCommand, CreateUserHandler),
)

result = await mediator.send(CreateUserCommand(email="user@example.com", name="John"))

See Bootstrap for detailed setup instructions.