Saga Storage¶
-
Back to Saga Overview
Return to the Saga Pattern overview page with all topics.
Storage persists saga state and execution history, enabling recovery of interrupted sagas and ensuring eventual consistency.
Overview¶
Two storage implementations:
| Storage Type | Use Case | Persistence |
|---|---|---|
| MemorySagaStorage | Testing, development | In-memory (not persistent) |
| SqlAlchemySagaStorage | Production | Database (persistent) |
Storage Interface¶
class ISagaStorage(abc.ABC):
async def create_saga(saga_id, name, context) -> None
async def update_context(saga_id, context, current_version: int | None = None) -> None
async def update_status(saga_id, status) -> None
async def log_step(saga_id, step_name, action, status, details=None) -> None
async def load_saga_state(saga_id, *, read_for_update: bool = False) -> tuple[SagaStatus, dict, int]
async def get_step_history(saga_id) -> list[SagaLogEntry]
async def get_sagas_for_recovery(limit, max_recovery_attempts=5, stale_after_seconds=None, saga_name=None) -> list[uuid.UUID]
async def increment_recovery_attempts(saga_id, new_status: SagaStatus | None = None) -> None
async def set_recovery_attempts(saga_id, attempts: int) -> None
# Optional: checkpoint commits (reduces storage load and deadlock risk)
def create_run(self) -> contextlib.AbstractAsyncContextManager[SagaStorageRun]:
"""Yield a SagaStorageRun for one saga execution; raises NotImplementedError if not supported."""
raise NotImplementedError("This storage does not support create_run()")
- get_sagas_for_recovery — Returns saga IDs that need recovery (RUNNING, COMPENSATING) with
recovery_attempts<max_recovery_attempts, optionally filtered by staleness and by saga name. Whensaga_nameisNone(default), returns all saga types; when set, only sagas with that name. Used by recovery jobs. - increment_recovery_attempts — Called automatically by
recover_saga()on recovery failure; incrementsrecovery_attemptsand optionally updates status (e.g. to FAILED). - set_recovery_attempts — Sets the recovery attempt counter to an explicit value. Use to reset after successfully recovering a step (e.g. set to
0) or to set to the maximum so the saga is excluded from further recovery (e.g. mark as permanently failed without changing status).
Checkpoint commits and SagaStorageRun¶
When a storage implements create_run(), the saga can run in a single session with checkpoint commits: one commit only at key points (after creating the saga and setting RUNNING, after each completed step, after each compensated step, at completion or failure) instead of committing after every storage call. This reduces the number of commits, shortens lock hold time, and lowers deadlock risk (e.g. with SQLAlchemy).
SagaStorageRun— Protocol for a scoped “session” for a single saga. It exposes the same mutation/read methods as the storage (create_saga,update_context,update_status,log_step,load_saga_state,get_step_history) but does not commit automatically; the caller must callcommit()at the desired checkpoints.rollback()aborts the run without persisting changes.create_run()— Returns an async context manager that yields aSagaStorageRun. If a storage does not support it (e.g. a custom implementation), it may raiseNotImplementedError; in that caseSagaTransactionfalls back to the previous behaviour (no single session, no checkpoint commits; each call may commit immediately depending on the implementation).load_saga_state(..., read_for_update=True)— When loading state for recovery or exclusive update, the implementation can lock the row (e.g.SELECT ... FOR UPDATEin SQLAlchemy). Together with checkpoint commits, this shortens lock duration and reduces deadlock risk.
Memory Storage¶
In-memory implementation for testing and development.
import cqrs
from cqrs.saga import bootstrap
from cqrs.saga.storage.memory import MemorySagaStorage
storage = MemorySagaStorage()
# Register saga in SagaMap
def saga_mapper(mapper: cqrs.SagaMap) -> None:
mapper.bind(OrderContext, OrderSaga)
# Create saga mediator using bootstrap
mediator = bootstrap.bootstrap(
di_container=di_container,
sagas_mapper=saga_mapper,
saga_storage=storage,
)
# Access storage data
status, context_data, version = await storage.load_saga_state(saga_id)
history = await storage.get_step_history(saga_id)
Features:
- ✅ Fast and lightweight
- ✅ No database setup required
- ✅ Implements
create_run()— yields_MemorySagaStorageRun;commit()androllback()are no-ops, but the protocol is aligned with SQLAlchemy for tests. - ❌ Not persistent (data lost on restart)
SQLAlchemy Storage¶
Database-backed implementation for production. It uses a session factory. When the saga uses create_run(), all operations for one saga run go through a single AsyncSession and are committed only at checkpoints (after create + RUNNING, after each step, after each compensation step, at completion/failure), which reduces commits and deadlock risk.
Database Schema¶
saga_executions:
id(UUID) - Primary keystatus(VARCHAR) - PENDING, RUNNING, COMPENSATING, COMPLETED, FAILEDcontext(JSON)version(INTEGER) - Optimistic locking version (default: 1)recovery_attempts(INTEGER) - Number of failed recovery attempts (default: 0); used byget_sagas_for_recovery,increment_recovery_attempts, andset_recovery_attemptscreated_at,updated_at(TIMESTAMP)
saga_logs:
id(BIGSERIAL) - Primary keysaga_id(UUID) - Foreign keystep_name(VARCHAR)action(VARCHAR) - "act" or "compensate"status(VARCHAR) - STARTED, COMPLETED, FAILEDdetails(TEXT)created_at(TIMESTAMP)
Indexes: saga_id, created_at
Usage¶
The storage requires an async_sessionmaker to create short-lived sessions for each operation.
import uuid
import cqrs
from cqrs.saga import bootstrap
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from cqrs.saga.storage.sqlalchemy import SqlAlchemySagaStorage, Base
# Setup Engine with connection pool
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
)
# Create session factory (factory, NOT session instance)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
# Initialize tables (run once)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Initialize storage with session FACTORY
storage = SqlAlchemySagaStorage(session_factory)
# Register saga in SagaMap
def saga_mapper(mapper: cqrs.SagaMap) -> None:
mapper.bind(OrderContext, OrderSaga)
# Create saga mediator using bootstrap
mediator = bootstrap.bootstrap(
di_container=di_container,
sagas_mapper=saga_mapper,
saga_storage=storage,
)
# Execute saga
context = OrderContext(...)
saga_id = uuid.uuid4()
# With SqlAlchemySagaStorage, commits occur at checkpoints (after each step, etc.)
async for step_result in mediator.stream(context, saga_id=saga_id):
print(f"Step: {step_result.step_type.__name__}")
Transaction Management¶
SqlAlchemySagaStorage implements create_run(): it yields a _SqlAlchemySagaStorageRun backed by a single AsyncSession per saga run. All mutations go through that session and are committed only when the saga calls run.commit() at checkpoints (after creating the saga and setting RUNNING, after each completed step, after each compensated step, at completion or failure). On exception within the run context, the run's rollback() is invoked.
This design ensures:
- Fewer commits — One commit per checkpoint instead of per storage call.
- Shorter lock time — With
load_saga_state(..., read_for_update=True), the row is locked only for the duration of the run; checkpoint commits shorten that duration and reduce deadlock risk. - Crash safety — Completed checkpoints are persisted; recovery can resume from the last checkpoint.
- Backward compatibility — Custom storages that do not implement
create_run()continue to work; the saga falls back to calling storage methods directly (no single session).
Concurrency Control¶
The storage implementation provides two mechanisms to handle concurrency in distributed environments:
1. Optimistic Locking (Versioning)¶
To prevent "lost updates" when multiple steps might update the context simultaneously (though sagas typically execute sequentially), the version column is used.
update_contextaccepts an optionalcurrent_version.- If provided, the storage checks if
version == current_version. - If matched, it updates the context and increments the version (
version + 1). - If not matched, it raises
SagaConcurrencyError, indicating the state was modified by another process.
2. Row Locking (Recovery Safety and Deadlock Mitigation)¶
When recovering or exclusively updating a saga (e.g., after a crash), it is critical that only one worker picks up the saga to avoid duplicate execution.
load_saga_state(..., read_for_update=True)usesSELECT ... FOR UPDATE(in SQL databases), acquiring a row-level lock on the saga execution record.- Other workers attempting to lock the same saga will wait or fail, ensuring exclusive access.
- When the storage supports
create_run(), the saga holds the session (and thus the lock) only between checkpoints; commits are done at key points, which shortens lock duration and reduces deadlock risk.
Choosing Storage¶
Memory Storage:
- ✅ Unit tests, development
- ❌ Not persistent
SQLAlchemy Storage:
- ✅ Production, multi-process
- ✅ Recovery after restarts
- ✅ Audit trail
- ✅ Robust transaction management
Best Practices¶
- Use persistent storage in production — Memory storage loses data on restart
- Configure Connection Pool — Set appropriate
pool_sizeandmax_overflowfor your load. - Create indexes — Index
saga_idandcreated_atfor better performance - Monitor storage size — Archive old saga logs periodically