Saga Recovery & Eventual Consistency¶
-
Back to Saga Overview
Return to the Saga Pattern overview page with all topics.
Recovery ensures eventual consistency by resuming interrupted sagas from persistent storage, guaranteeing all sagas eventually reach a terminal state (COMPLETED or FAILED). Recovery attempts are tracked per saga so you can limit retries and exclude persistently failing sagas.
Overview¶
Sagas can be interrupted due to server crashes, network timeouts, or system overload. Recovery solves this by:
- Persisting saga state after each step
- Periodically scanning for incomplete sagas (via
get_sagas_for_recovery) - Resuming execution from the last completed step
- Completing compensation if saga was in compensating state
- Tracking recovery attempts — on recovery failure, the storage increments
recovery_attemptsautomatically so sagas can be retried or excluded when the limit is reached
Eventual Consistency¶
The saga pattern ensures eventual consistency through:
- Persistent State — Saved after each step
- Recovery Mechanism — Interrupted sagas can be resumed
- Recovery Attempts — Each saga has a
recovery_attemptscounter; it is incremented automatically when recovery fails, so you can limit retries and exclude sagas that exceedmax_recovery_attempts - Compensation Guarantee — Failed sagas are always compensated
- Terminal States — All sagas eventually reach COMPLETED or FAILED
Recovery Process¶
from cqrs.saga.recovery import recover_saga
# Recover interrupted saga
await recover_saga(
saga=saga,
saga_id=saga_id,
context_builder=OrderContext, # or lambda d: OrderContext(**d)
)
Recovery steps:
- Load saga status and context from storage
- Reconstruct context object from persisted data
- Resume execution from last completed step or complete compensation
Concurrency Safety (Row Locking)¶
In a distributed environment with multiple replicas, multiple recovery workers might attempt to recover the same incomplete saga simultaneously.
To prevent race conditions, recover_saga uses Row Locking:
- It calls
storage.load_saga_state(saga_id, read_for_update=True). - For SQL databases, this executes
SELECT ... FOR UPDATE. - This acquires a database-level lock on the saga row.
- If another worker tries to recover the same saga, it will block until the first worker completes or releases the lock.
This ensures that only one worker can actively recover and execute a specific saga at any given time.
Recovery Scenarios¶
Interrupted Forward Execution¶
Status: RUNNING
Recovery: Skips completed steps, resumes from last completed step
status, _, _ = await storage.load_saga_state(saga_id) # RUNNING
await recover_saga(saga, saga_id, OrderContext)
# Skips completed steps, continues execution
Interrupted Compensation¶
Status: COMPENSATING
Recovery: Completes compensation in reverse order
status, _, _ = await storage.load_saga_state(saga_id) # COMPENSATING
try:
await recover_saga(saga, saga_id, OrderContext)
except RuntimeError:
pass # Expected - compensation completed
Already Completed¶
Status: COMPLETED
Recovery: No action needed
Recovery Attempts¶
Each saga in storage has a recovery_attempts counter. It is used to:
- Limit retries — Sagas that fail recovery repeatedly can be excluded from future recovery runs
- Avoid infinite loops — Persistently failing sagas (e.g. due to bad data) stop being picked after
max_recovery_attempts
Automatic increment: When recover_saga() fails (exception during resume), the storage's increment_recovery_attempts(saga_id, new_status=SagaStatus.FAILED) is called automatically. Callers do not need to call increment_recovery_attempts themselves.
Explicit set: Use storage.set_recovery_attempts(saga_id, attempts) to set the counter to a specific value: e.g. 0 after successfully recovering one of the steps, or the maximum value so the saga is excluded from further recovery without changing its status.
Getting sagas for recovery: Use storage.get_sagas_for_recovery() instead of a custom query:
# All saga types (default)
ids = await storage.get_sagas_for_recovery(
limit=50,
max_recovery_attempts=5, # Only sagas with recovery_attempts < 5
stale_after_seconds=120, # Only sagas not updated in last 2 minutes (avoids picking active sagas)
)
# Only sagas of a specific type (e.g. one recovery job per saga name)
ids = await storage.get_sagas_for_recovery(
limit=50,
max_recovery_attempts=5,
saga_name="OrderSaga",
)
| Parameter | Description |
|---|---|
limit |
Maximum number of saga IDs to return |
max_recovery_attempts |
Only include sagas with recovery_attempts strictly less than this value (default: 5) |
stale_after_seconds |
If set, only include sagas whose updated_at is older than (now − this value). Use to avoid picking sagas currently being executed. None = no filter |
saga_name |
If set, only include sagas with this name (e.g. handler/type name). None (default) = return all saga types |
Returns saga IDs in status RUNNING or COMPENSATING, ordered by updated_at ascending (oldest first).
Strict Backward Recovery¶
Once a saga enters COMPENSATING or FAILED status, forward execution is permanently disabled. Only compensation can proceed.
This prevents "zombie states" where compensation actions conflict with new execution attempts.
Implementing Recovery¶
Background Recovery Job¶
Use storage.get_sagas_for_recovery() to get saga IDs that need recovery. On recovery failure, recover_saga() calls increment_recovery_attempts internally — no extra code needed. You can pass saga_name to run separate recovery jobs per saga type.
import asyncio
from cqrs.saga.recovery import recover_saga
async def recovery_job(storage, saga, context_builder, container, saga_name=None):
while True:
ids = await storage.get_sagas_for_recovery(
limit=50,
max_recovery_attempts=5,
stale_after_seconds=120, # Avoid sagas currently being executed
saga_name=saga_name, # None = all types; or e.g. "OrderSaga" for one type
)
for saga_id in ids:
try:
await recover_saga(
saga=saga,
saga_id=saga_id,
context_builder=context_builder,
container=container,
storage=storage,
)
except RuntimeError:
pass # Expected when compensation completed (forward execution not allowed)
except Exception as e:
logger.error(f"Recovery failed for {saga_id}: {e}")
# recovery_attempts already incremented by recover_saga
await asyncio.sleep(60) # Scan every minute
Using with Scheduler¶
# APScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
scheduler.add_job(
lambda: recovery_job(storage, OrderSaga(), OrderContext, container),
'interval',
minutes=5,
)
scheduler.start()
Best Practices¶
- Run recovery periodically — Background job using
get_sagas_for_recovery()to scan for incomplete sagas - Use
max_recovery_attempts— Exclude sagas that fail recovery too many times (e.g. 5) to avoid infinite retries - Use
stale_after_seconds— Avoid picking sagas that are currently being executed by another worker - Use
saga_namefor per-type recovery — When running separate recovery jobs per saga type, passsaga_nameso each job only processes its own sagas - Handle failures — Log errors and send alerts;
increment_recovery_attemptsis called automatically byrecover_saga - Monitor metrics — Track recovery rate, duration, failures, and sagas exceeding max attempts
- Use persistent storage — Memory storage loses data on restart