#Saga 보상 Cookbook

버전: 0.36.0 | 업데이트: 2026-03-20 | 적용 대상: ranvier-runtime 0.36+ | 카테고리: 쿡북


#개요

Ranvier의 Saga 패턴(then_compensated)은 다단계 파이프라인이 중간에 실패했을 때 자동 롤백을 지원합니다. 각 전진 단계는 부수 효과를 되돌리는 보상 단계와 쌍을 이룹니다. 이 Cookbook에서는 Saga 구성, 보상 순서, 실패 처리의 실전 패턴을 소개합니다.


#1. 기본 Saga 파이프라인

then_compensated()로 부수 효과가 있는 각 단계에 보상을 쌍으로 연결합니다. 어떤 단계가 Outcome::Fault를 반환하면, 완료된 보상이 LIFO 순서로 실행됩니다.

use ranvier_runtime::Axon;

let order_saga = Axon::simple::<String>("order-saga")
    .with_saga_policy(SagaPolicy::Enabled)
    .then(ValidateOrder)
    .then_compensated(ReserveInventory, ReleaseInventory)
    .then_compensated(AuthorizePayment, RefundPayment)
    .then_compensated(ArrangeShipping, CancelShipment)
    .then(CompleteOrder);

ArrangeShipping이 실패하면:

  1. RefundPayment 실행 (결제 취소)
  2. ReleaseInventory 실행 (예약 해제)

ValidateOrder는 보상이 없음 -- 읽기 전용 단계는 보상 불필요. CompleteOrder는 보상이 없음 -- 최종 확인 단계.


#2. 보상 순서: LIFO 스택

보상은 등록의 역순(후입선출)으로 실행됩니다. 따라서 의존적인 작업이 선행 조건보다 먼저 되돌려집니다.

graph LR
    V["Validate"] --> R["Reserve"] --> Ch["Charge"] --> Sh["Ship"] --> Co["Complete"]
    Sh -.->|"1"| CS["CancelShip"]
    Ch -.->|"2"| RF["Refund"]
    R -.->|"3"| RL["Release"]

Ship이 실패하면:

  1. CancelShip -- 시작된 배송 취소
  2. Refund -- 청구 반환 (청구가 존재해야 함)
  3. Release -- 재고 해제 (예약이 존재해야 함)

#3. 보상 Transition 패턴

보상은 전진 단계의 원래 입력을 받아 부수 효과를 되돌립니다. 멱등성이 있어야 합니다 -- 두 번 호출해도 안전해야 합니다.

use async_trait::async_trait;
use ranvier_core::prelude::*;

#[derive(Clone)]
struct ReserveInventory;

#[async_trait]
impl Transition<OrderItems, OrderItems> for ReserveInventory {
    type Error = String;
    type Resources = AppResources;

    async fn run(
        &self,
        input: OrderItems,
        resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<OrderItems, Self::Error> {
        let pool = &resources.db_pool;
        for item in &input.items {
            sqlx::query("UPDATE products SET stock = stock - $1 WHERE id = $2")
                .bind(item.quantity as i32)
                .bind(&item.product_id)
                .execute(pool)
                .await
                .map_err(|e| format!("재고 예약 실패: {}", e))?;
        }
        bus.insert(ReservationIds(input.items.iter().map(|i| i.product_id.clone()).collect()));
        Outcome::next(input)
    }
}

#[derive(Clone)]
struct ReleaseInventory;

#[async_trait]
impl Transition<OrderItems, OrderItems> for ReleaseInventory {
    type Error = String;
    type Resources = AppResources;

    async fn run(
        &self,
        input: OrderItems,
        resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<OrderItems, Self::Error> {
        let pool = &resources.db_pool;
        for item in &input.items {
            // 멱등: 재고를 다시 추가하는 것은 두 번 호출해도 안전
            sqlx::query("UPDATE products SET stock = stock + $1 WHERE id = $2")
                .bind(item.quantity as i32)
                .bind(&item.product_id)
                .execute(pool)
                .await
                .map_err(|e| format!("재고 해제 실패: {}", e))?;
        }
        Outcome::next(input)
    }
}

#4. 보상 실패 처리

보상 자체가 실패하면 Saga 런타임이 실패를 기록하고 나머지 보상을 계속 실행합니다. 보상 실패로 롤백이 중단되지 않습니다.

Ship 실패 -> CancelShip 실패 -> Refund 실행 -> Release 실행
                   |
                   v
         tracing::error!("보상 CancelShip 실패: ...")
         Saga는 Refund와 Release를 계속 실행

설계 원칙: 보상은 불리한 조건에서도 성공하도록 작성해야 합니다. 일시적 오류에 대해 보상 내에서 재시도 로직을 사용하세요:

#[derive(Clone)]
struct RefundPayment;

#[async_trait]
impl Transition<PaymentAuth, PaymentAuth> for RefundPayment {
    type Error = String;
    type Resources = AppResources;

    async fn run(
        &self,
        input: PaymentAuth,
        resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<PaymentAuth, Self::Error> {
        let payment_id = bus.get_cloned::<PaymentId>()
            .map(|p| p.0)
            .unwrap_or_default();

        // 일시적 오류를 위해 최대 3회 재시도
        let mut attempts = 0;
        loop {
            match resources.payment_client.refund(&payment_id).await {
                Ok(_) => break,
                Err(e) if attempts < 3 => {
                    attempts += 1;
                    tracing::warn!("환불 시도 {}/{} 실패: {}", attempts, 3, e);
                    tokio::time::sleep(std::time::Duration::from_millis(500)).await;
                }
                Err(e) => {
                    tracing::error!("3회 시도 후 환불 실패: {}", e);
                    return Outcome::fault(format!("환불 실패: {}", e));
                }
            }
        }
        Outcome::next(input)
    }
}

#5. 읽기 전용 단계와 부분 Saga

모든 단계에 보상이 필요한 것은 아닙니다. 읽기 전용이나 검증 단계는 보상 없이 일반 then()을 사용하면 됩니다.

let pipeline = Axon::simple::<String>("mixed-pipeline")
    .with_saga_policy(SagaPolicy::Enabled)
    .then(ValidateInput)                               // 보상 없음: 읽기 전용
    .then(EnrichData)                                  // 보상 없음: 읽기 전용
    .then_compensated(PersistRecord, DeleteRecord)     // 부수 효과 있음
    .then_compensated(SendNotification, RevokeNotification) // 부수 효과 있음
    .then(FormatResponse);                             // 보상 없음: 포매팅

SendNotification이 실패하면:

  1. DeleteRecord 실행 (유일하게 완료된 보상)
  2. ValidateInput, EnrichData, FormatResponse는 보상이 없음 -- 건너뜀.

#6. 보상에서의 Bus 상태

보상은 &mut Bus를 받으며, 이전 전진 단계에서 삽입한 모든 데이터가 유지됩니다. 따라서 보상 단계에서 정리에 필요한 컨텍스트를 읽을 수 있습니다.

// 전진: PaymentId를 Bus에 쓰기
#[async_trait]
impl Transition<PaymentRequest, PaymentAuth> for AuthorizePayment {
    // ...
    async fn run(&self, input: PaymentRequest, res: &Self::Resources, bus: &mut Bus)
        -> Outcome<PaymentAuth, Self::Error>
    {
        let auth = res.payment_client.authorize(&input).await?;
        bus.insert(PaymentId(auth.id.clone()));  // 보상을 위해 저장
        Outcome::next(auth)
    }
}

// 보상: Bus에서 PaymentId 읽기
#[async_trait]
impl Transition<PaymentRequest, PaymentRequest> for RefundPayment {
    // ...
    async fn run(&self, input: PaymentRequest, res: &Self::Resources, bus: &mut Bus)
        -> Outcome<PaymentRequest, Self::Error>
    {
        let payment_id = bus.require::<PaymentId>();  // 전진 단계에서 보장
        res.payment_client.refund(&payment_id.0).await?;
        Outcome::next(input)
    }
}

#7. 영속성과 함께하는 Saga

프로덕션 Saga에서는 PostgresPersistenceStore와 함께 사용하여 프로세스 크래시를 견딜 수 있습니다. 영속 저장소는 각 단계의 완료를 체크포인트하여 복구 시 마지막 성공 단계부터 재개할 수 있습니다.

use ranvier_runtime::{Axon, PersistenceConfig};

let saga = Axon::simple::<String>("durable-order")
    .with_saga_policy(SagaPolicy::Enabled)
    .with_persistence(PersistenceConfig {
        store: Box::new(PostgresPersistenceStore::new(pool.clone())),
        checkpoint_every_step: true,
    })
    .then(ValidateOrder)
    .then_compensated(ReserveInventory, ReleaseInventory)
    .then_compensated(AuthorizePayment, RefundPayment)
    .then_compensated(ArrangeShipping, CancelShipment)
    .then(CompleteOrder);

크래시 복구 시 런타임은 마지막 체크포인트부터 재생하며, 체크포인트 이후 완료된 단계에 대해 보상을 실행합니다.


#8. Saga 테스트

ranvier-test를 사용하여 보상 로직을 검증합니다:

use ranvier_test::prelude::*;

#[tokio::test]
async fn test_saga_compensation_on_shipping_failure() {
    let mut test_bus = TestBus::new();
    test_bus.insert(DbPool(test_pool().await));

    // ArrangeShipping이 실패하도록 모킹
    let saga = Axon::simple::<String>("test-saga")
        .with_saga_policy(SagaPolicy::Enabled)
        .then_compensated(ReserveInventory, ReleaseInventory)
        .then_compensated(AuthorizePayment, RefundPayment)
        .then_compensated(FailingShipment, CancelShipment);

    let result = TestAxon::run(&saga, (), &(), &mut test_bus).await;
    assert_outcome_err!(&result);

    // 보상 실행 검증: 재고가 해제되어야 함
    let stock = get_product_stock(&test_bus, "prod-1").await;
    assert_eq!(stock, 100); // 원래 값으로 복원
}

#관련 문서

  • Bus 접근 패턴 Cookbook -- Saga 컨텍스트에서의 Bus 사용
  • 영속성 운영 런북 -- 내구성 Saga 설정
  • Guard 패턴 Cookbook -- Guard 구성 패턴