Saga Examples¶
-
Back to Saga Overview
Return to the Saga Pattern overview page with all topics.
Basic Saga Example¶
import dataclasses
import uuid
import di
import cqrs
from cqrs.saga import bootstrap
from cqrs.saga.saga import Saga
from cqrs.saga.step import SagaStepHandler, SagaStepResult
from cqrs.saga.storage.memory import MemorySagaStorage
from cqrs.saga.models import SagaContext
from cqrs.response import Response
@dataclasses.dataclass
class OrderContext(SagaContext):
order_id: str
items: list[str]
total_amount: float
inventory_reservation_id: str | None = None
payment_id: str | None = None
class ReserveInventoryStep(SagaStepHandler[OrderContext, Response]):
def __init__(self, inventory_service):
self._inventory_service = inventory_service
async def act(self, context: OrderContext) -> SagaStepResult:
reservation_id = await self._inventory_service.reserve_items(
context.order_id, context.items
)
context.inventory_reservation_id = reservation_id
return self._generate_step_result(Response())
async def compensate(self, context: OrderContext) -> None:
if context.inventory_reservation_id:
await self._inventory_service.release_items(
context.inventory_reservation_id
)
# Define saga class with steps
class OrderSaga(Saga[OrderContext]):
steps = [ReserveInventoryStep, ProcessPaymentStep]
# Setup DI container
di_container = di.Container()
# ... register services ...
# Create saga storage
storage = MemorySagaStorage()
# Register saga in SagaMap
def saga_mapper(mapper: cqrs.SagaMap) -> None:
mapper.bind(OrderContext, OrderSaga)
# Create saga mediator using bootstrap
mediator = bootstrap.bootstrap(
di_container=di_container,
sagas_mapper=saga_mapper,
saga_storage=storage,
)
# Execute saga
context = OrderContext(order_id="123", items=["item_1"], total_amount=100.0)
saga_id = uuid.uuid4()
async for step_result in mediator.stream(context, saga_id=saga_id):
print(f"Step completed: {step_result.step_type.__name__}")
Complete example: examples/saga.py
Recovery Example¶
from cqrs.saga.recovery import recover_saga
# Get saga instance (or keep reference to saga class)
saga = OrderSaga()
# Recover interrupted saga
saga_id = uuid.UUID("550e8400-e29b-41d4-a716-446655440000")
try:
await recover_saga(
saga=saga,
saga_id=saga_id,
context_builder=OrderContext,
container=di_container, # Same container used in bootstrap
storage=storage,
)
print("Saga recovered successfully!")
except RuntimeError:
# Expected if saga was in COMPENSATING/FAILED state
print("Compensation completed")
Complete example: examples/saga_recovery.py
FastAPI SSE Integration¶
import fastapi
import json
import uuid
from cqrs.saga import bootstrap
def mediator_factory() -> cqrs.SagaMediator:
"""Create saga mediator using bootstrap."""
return bootstrap.bootstrap(
di_container=di_container,
sagas_mapper=saga_mapper,
saga_storage=storage,
)
@app.post("/process-order")
async def process_order(
request: ProcessOrderRequest,
mediator: cqrs.SagaMediator = fastapi.Depends(mediator_factory),
):
async def generate_sse():
saga_id = uuid.uuid4()
context = OrderContext(...)
yield f"data: {json.dumps({'type': 'start', 'saga_id': str(saga_id)})}\n\n"
async for step_result in mediator.stream(context, saga_id=saga_id):
yield f"data: {json.dumps({'type': 'step_progress', 'step': step_result.step_type.__name__})}\n\n"
yield f"data: {json.dumps({'type': 'complete'})}\n\n"
return fastapi.responses.StreamingResponse(
generate_sse(), media_type="text/event-stream"
)
Complete example: examples/saga_fastapi_sse.py
SQLAlchemy Storage Example¶
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from cqrs.saga.storage.sqlalchemy import SqlAlchemySagaStorage, Base
from cqrs.saga import bootstrap
# Setup
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def create_storage() -> SqlAlchemySagaStorage:
return SqlAlchemySagaStorage(SessionLocal())
# Usage
storage = await create_storage()
# Register saga in SagaMap
def saga_mapper(mapper: cqrs.SagaMap) -> None:
mapper.bind(OrderContext, OrderSaga)
# Create saga mediator using bootstrap
mediator = bootstrap.bootstrap(
di_container=di_container,
sagas_mapper=saga_mapper,
saga_storage=storage,
)
# Execute saga
context = OrderContext(...)
saga_id = uuid.uuid4()
async for step_result in mediator.stream(context, saga_id=saga_id):
print(f"Step: {step_result.step_type.__name__}")
await storage.session.commit()
Compensation Retry Configuration¶
Compensation retry configuration is handled at the transaction level. When using SagaMediator, retry settings can be configured when creating the saga transaction. However, the recommended approach is to configure retry settings in your saga class or use the default settings.
For advanced retry configuration, you can access the transaction directly:
# Note: Compensation retry is configured at the SagaTransaction level
# When using mediator.stream(), default retry settings are used
# For custom retry configuration, you may need to access the transaction directly
Background Recovery Job¶
import asyncio
from cqrs.saga.recovery import recover_saga
async def recovery_job():
saga = OrderSaga() # Get saga instance
while True:
incomplete_sagas = await find_incomplete_sagas()
for saga_id in incomplete_sagas:
try:
await recover_saga(
saga=saga,
saga_id=saga_id,
context_builder=OrderContext,
container=di_container,
storage=storage,
)
except RuntimeError:
pass # Compensation completed
await asyncio.sleep(60) # Scan every minute
Fallback Pattern Example¶
import dataclasses
import uuid
import di
from di import dependent
import cqrs
from cqrs.saga import bootstrap
from cqrs.saga.fallback import Fallback
from cqrs.adapters.circuit_breaker import AioBreakerAdapter
from cqrs.saga.saga import Saga
from cqrs.saga.step import SagaStepHandler, SagaStepResult
from cqrs.saga.storage.memory import MemorySagaStorage
from cqrs.saga.models import SagaContext
from cqrs.response import Response
@dataclasses.dataclass
class OrderContext(SagaContext):
order_id: str
reservation_id: str | None = None
class ReserveInventoryResponse(Response):
reservation_id: str
source: str
class PrimaryStep(SagaStepHandler[OrderContext, ReserveInventoryResponse]):
async def act(self, context: OrderContext) -> SagaStepResult[OrderContext, ReserveInventoryResponse]:
# Primary step that may fail
raise RuntimeError("Service unavailable")
class FallbackStep(SagaStepHandler[OrderContext, ReserveInventoryResponse]):
async def act(self, context: OrderContext) -> SagaStepResult[OrderContext, ReserveInventoryResponse]:
# Fallback step executes when primary fails
reservation_id = f"fallback_reservation_{context.order_id}"
context.reservation_id = reservation_id
return self._generate_step_result(
ReserveInventoryResponse(reservation_id=reservation_id, source="fallback")
)
class OrderSaga(Saga[OrderContext]):
steps = [
Fallback(
step=PrimaryStep,
fallback=FallbackStep,
circuit_breaker=AioBreakerAdapter(
fail_max=2,
timeout_duration=60,
),
),
]
# Setup
di_container = di.Container()
storage = MemorySagaStorage()
def saga_mapper(mapper: cqrs.SagaMap) -> None:
mapper.bind(OrderContext, OrderSaga)
mediator = bootstrap.bootstrap(
di_container=di_container,
sagas_mapper=saga_mapper,
saga_storage=storage,
)
# Execute
context = OrderContext(order_id="123")
saga_id = uuid.uuid4()
async for step_result in mediator.stream(context, saga_id=saga_id):
print(f"Step: {step_result.step_type.__name__}")
Complete example: examples/saga_fallback.py