#Saga Compensation Cookbook

Version: 0.36.0 | Updated: 2026-03-20 | Applies to: ranvier-runtime 0.36+ | Category: Cookbook


#Overview

Ranvier's saga pattern (then_compensated) enables automatic rollback when a multi-step pipeline fails partway through. Each forward step pairs with a compensation step that undoes its side effects. This cookbook covers practical saga composition, compensation ordering, and failure handling patterns.


#1. Basic Saga Pipeline

Pair each side-effecting step with a compensation using then_compensated(). If any step returns Outcome::Fault, the completed compensations run in LIFO order.

use ranvier_runtime::Axon;

let order_saga = Axon::simple::<String>("order-saga")
    .with_saga_policy(SagaPolicy::Enabled)
    .then(ValidateOrder)
    .then_compensated(ReserveInventory, ReleaseInventory)
    .then_compensated(AuthorizePayment, RefundPayment)
    .then_compensated(ArrangeShipping, CancelShipment)
    .then(CompleteOrder);

If ArrangeShipping fails:

  1. RefundPayment runs (undo payment)
  2. ReleaseInventory runs (undo reservation)

ValidateOrder has no compensation -- read-only steps need none. CompleteOrder has no compensation -- it is the terminal confirmation step.


#2. Compensation Order: LIFO Stack

Compensations execute in reverse registration order (Last In, First Out). This ensures dependent operations are undone before their prerequisites.

graph LR
    V["Validate"] --> R["Reserve"] --> Ch["Charge"] --> Sh["Ship"] --> Co["Complete"]
    Sh -.->|"1"| CS["CancelShip"]
    Ch -.->|"2"| RF["Refund"]
    R -.->|"3"| RL["Release"]

If Ship faults:

  1. CancelShip -- cancel the shipment that was started
  2. Refund -- return the charge (depends on the charge existing)
  3. Release -- free the inventory (depends on the reservation existing)

#3. Compensation Transition Pattern

A compensation receives the original input of its forward step and reverses the side effect. Compensations must be idempotent -- calling one twice must be safe.

use async_trait::async_trait;
use ranvier_core::prelude::*;

#[derive(Clone)]
struct ReserveInventory;

#[async_trait]
impl Transition<OrderItems, OrderItems> for ReserveInventory {
    type Error = String;
    type Resources = AppResources;

    async fn run(
        &self,
        input: OrderItems,
        resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<OrderItems, Self::Error> {
        let pool = &resources.db_pool;
        for item in &input.items {
            sqlx::query("UPDATE products SET stock = stock - $1 WHERE id = $2")
                .bind(item.quantity as i32)
                .bind(&item.product_id)
                .execute(pool)
                .await
                .map_err(|e| format!("inventory reservation failed: {}", e))?;
        }
        bus.insert(ReservationIds(input.items.iter().map(|i| i.product_id.clone()).collect()));
        Outcome::next(input)
    }
}

#[derive(Clone)]
struct ReleaseInventory;

#[async_trait]
impl Transition<OrderItems, OrderItems> for ReleaseInventory {
    type Error = String;
    type Resources = AppResources;

    async fn run(
        &self,
        input: OrderItems,
        resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<OrderItems, Self::Error> {
        let pool = &resources.db_pool;
        for item in &input.items {
            // Idempotent: adding stock back is safe even if called twice
            sqlx::query("UPDATE products SET stock = stock + $1 WHERE id = $2")
                .bind(item.quantity as i32)
                .bind(&item.product_id)
                .execute(pool)
                .await
                .map_err(|e| format!("inventory release failed: {}", e))?;
        }
        Outcome::next(input)
    }
}

#4. Compensation Failure Handling

If a compensation itself fails, the saga runtime logs the failure and continues running the remaining compensations. A compensation failure does not halt the rollback.

Ship faults -> CancelShip FAILS -> Refund runs -> Release runs
                    |
                    v
          tracing::error!("compensation CancelShip failed: ...")
          Saga continues with Refund and Release

Design rule: Write compensations to succeed even under adverse conditions. Use retry logic inside compensations for transient failures:

#[derive(Clone)]
struct RefundPayment;

#[async_trait]
impl Transition<PaymentAuth, PaymentAuth> for RefundPayment {
    type Error = String;
    type Resources = AppResources;

    async fn run(
        &self,
        input: PaymentAuth,
        resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<PaymentAuth, Self::Error> {
        let payment_id = bus.get_cloned::<PaymentId>()
            .map(|p| p.0)
            .unwrap_or_default();

        // Retry up to 3 times for transient failures
        let mut attempts = 0;
        loop {
            match resources.payment_client.refund(&payment_id).await {
                Ok(_) => break,
                Err(e) if attempts < 3 => {
                    attempts += 1;
                    tracing::warn!("refund attempt {} failed: {}", attempts, e);
                    tokio::time::sleep(std::time::Duration::from_millis(500)).await;
                }
                Err(e) => {
                    tracing::error!("refund failed after 3 attempts: {}", e);
                    return Outcome::fault(format!("refund failed: {}", e));
                }
            }
        }
        Outcome::next(input)
    }
}

#5. Read-Only Steps and Partial Sagas

Not every step needs a compensation. Read-only or validation steps can use plain then() alongside compensated steps.

let pipeline = Axon::simple::<String>("mixed-pipeline")
    .with_saga_policy(SagaPolicy::Enabled)
    .then(ValidateInput)                               // no compensation: read-only
    .then(EnrichData)                                  // no compensation: read-only
    .then_compensated(PersistRecord, DeleteRecord)     // has side effect
    .then_compensated(SendNotification, RevokeNotification) // has side effect
    .then(FormatResponse);                             // no compensation: formatting

If SendNotification faults:

  1. DeleteRecord runs (the only completed compensation)
  2. ValidateInput, EnrichData, and FormatResponse have no compensations -- skipped.

#6. Bus State in Compensations

Compensations receive &mut Bus, which retains all data inserted by previous forward steps. This lets compensations read any context they need for cleanup.

// Forward: writes PaymentId to Bus
#[async_trait]
impl Transition<PaymentRequest, PaymentAuth> for AuthorizePayment {
    // ...
    async fn run(&self, input: PaymentRequest, res: &Self::Resources, bus: &mut Bus)
        -> Outcome<PaymentAuth, Self::Error>
    {
        let auth = res.payment_client.authorize(&input).await?;
        bus.insert(PaymentId(auth.id.clone()));  // saved for compensation
        Outcome::next(auth)
    }
}

// Compensation: reads PaymentId from Bus
#[async_trait]
impl Transition<PaymentRequest, PaymentRequest> for RefundPayment {
    // ...
    async fn run(&self, input: PaymentRequest, res: &Self::Resources, bus: &mut Bus)
        -> Outcome<PaymentRequest, Self::Error>
    {
        let payment_id = bus.require::<PaymentId>();  // guaranteed by forward step
        res.payment_client.refund(&payment_id.0).await?;
        Outcome::next(input)
    }
}

#7. Saga with Persistence

For production sagas, combine with PostgresPersistenceStore to survive process crashes. The persistence store checkpoints each step's completion, allowing recovery to resume from the last successful step.

use ranvier_runtime::{Axon, PersistenceConfig};

let saga = Axon::simple::<String>("durable-order")
    .with_saga_policy(SagaPolicy::Enabled)
    .with_persistence(PersistenceConfig {
        store: Box::new(PostgresPersistenceStore::new(pool.clone())),
        checkpoint_every_step: true,
    })
    .then(ValidateOrder)
    .then_compensated(ReserveInventory, ReleaseInventory)
    .then_compensated(AuthorizePayment, RefundPayment)
    .then_compensated(ArrangeShipping, CancelShipment)
    .then(CompleteOrder);

On crash recovery, the runtime replays from the last persisted checkpoint and executes compensations for any steps that completed after the checkpoint but before the crash.


#8. Testing Sagas

Use ranvier-test to verify compensation logic:

use ranvier_test::prelude::*;

#[tokio::test]
async fn test_saga_compensation_on_shipping_failure() {
    let mut test_bus = TestBus::new();
    test_bus.insert(DbPool(test_pool().await));

    // ArrangeShipping is mocked to fail
    let saga = Axon::simple::<String>("test-saga")
        .with_saga_policy(SagaPolicy::Enabled)
        .then_compensated(ReserveInventory, ReleaseInventory)
        .then_compensated(AuthorizePayment, RefundPayment)
        .then_compensated(FailingShipment, CancelShipment);

    let result = TestAxon::run(&saga, (), &(), &mut test_bus).await;
    assert_outcome_err!(&result);

    // Verify compensations ran: inventory should be released
    let stock = get_product_stock(&test_bus, "prod-1").await;
    assert_eq!(stock, 100); // back to original
}

#See Also

  • Bus Access Patterns Cookbook -- Bus usage in saga contexts
  • Persistence Ops Runbook -- durable saga configuration
  • Guard Patterns Cookbook -- Guard composition patterns