#외부 서비스 발신 패턴 Cookbook

버전: 0.40.0 | 업데이트: 2026-03-23 | 적용 대상: ranvier-core 0.40+ | 카테고리: 쿡북


#Overview

cookbook_http_ingress가 HTTP 요청 수신을 다룬다면, 이 쿡북은 발신을 다룬다 — HMAC 서명 웹훅 전송, Branch를 활용한 멀티채널 알림, 재시도 전략, DLQ 폴백 패턴.

모든 패턴은 기존 Ranvier 프리미티브(Transition, RetryPolicy, DLQ, Branch)와 생태계 crate(reqwest, hmac, sha2, lettre)를 조합하여 구현한다.


#1. HMAC 서명 웹훅 전송

등록된 엔드포인트로 서명된 웹훅 페이로드를 전송하는 Transition.

use async_trait::async_trait;
use hmac::{Hmac, Mac};
use ranvier_core::prelude::*;
use sha2::Sha256;

type HmacSha256 = Hmac<Sha256>;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WebhookPayload {
    pub url: String,
    pub secret: String,
    pub event_type: String,
    pub body: serde_json::Value,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DeliveryResult {
    pub status: u16,
    pub url: String,
    pub duration_ms: u64,
}

pub struct WebhookDelivery {
    client: reqwest::Client,
}

impl WebhookDelivery {
    pub fn new() -> Self {
        Self {
            client: reqwest::Client::builder()
                .timeout(std::time::Duration::from_secs(10))
                .build()
                .expect("reqwest client"),
        }
    }
}

#[async_trait]
impl Transition<WebhookPayload, DeliveryResult> for WebhookDelivery {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        payload: WebhookPayload,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<DeliveryResult, Self::Error> {
        let body_bytes = serde_json::to_vec(&payload.body)
            .map_err(|e| e.to_string())?;

        // HMAC-SHA256 서명
        let mut mac = HmacSha256::new_from_slice(payload.secret.as_bytes())
            .map_err(|e| e.to_string())?;
        mac.update(&body_bytes);
        let signature = hex::encode(mac.finalize().into_bytes());

        let start = std::time::Instant::now();

        let response = self.client
            .post(&payload.url)
            .header("Content-Type", "application/json")
            .header("X-Webhook-Signature", format!("sha256={signature}"))
            .header("X-Webhook-Event", &payload.event_type)
            .body(body_bytes)
            .send()
            .await
            .map_err(|e| e.to_string())?;

        let status = response.status().as_u16();
        let duration_ms = start.elapsed().as_millis() as u64;

        if status >= 200 && status < 300 {
            Outcome::next(DeliveryResult {
                status,
                url: payload.url,
                duration_ms,
            })
        } else {
            Outcome::fault(format!(
                "웹훅 전송 실패: {} → HTTP {status}",
                payload.url
            ))
        }
    }
}

#2. 상태 코드 분류 재시도 전략

HTTP 응답을 분류하여 재시도 여부를 결정한다. 429와 5xx는 재시도 가능, 4xx(429 제외)는 영구 실패.

use ranvier_core::prelude::*;
use ranvier_runtime::Axon;

#[derive(Debug, Clone)]
pub enum DeliveryOutcome {
    Delivered(DeliveryResult),
    Retryable(String),
    PermanentFailure(String),
}

pub struct ClassifyResponse;

#[async_trait]
impl Transition<DeliveryResult, DeliveryOutcome> for ClassifyResponse {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        result: DeliveryResult,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<DeliveryOutcome, Self::Error> {
        match result.status {
            200..=299 => Outcome::next(DeliveryOutcome::Delivered(result)),
            429 => Outcome::next(DeliveryOutcome::Retryable(
                "레이트 리밋 — 백오프 후 재시도".into(),
            )),
            500..=599 => Outcome::next(DeliveryOutcome::Retryable(
                format!("서버 에러 {}", result.status),
            )),
            _ => Outcome::next(DeliveryOutcome::PermanentFailure(
                format!("클라이언트 에러 {} — 재시도 불가", result.status),
            )),
        }
    }
}

#재시도 정책 적용 파이프라인

let webhook_pipeline = Axon::typed::<WebhookPayload, DeliveryResult>("webhook-deliver")
    .then(WebhookDelivery::new())
    .with_retry(RetryPolicy::exponential(3, std::time::Duration::from_secs(1)))
    .with_dlq("webhook-failures");

with_retry는 지수 백오프(1초, 2초, 4초)로 최대 3회 재시도한다. 모든 재시도 소진 후 실패한 페이로드는 webhook-failures DLQ로 라우팅된다.


#3. DLQ 모니터링 및 수동 재전송

실패한 전송은 DLQ에 저장된다. Inspector로 모니터링하고 수동 재전송할 수 있다.

// DLQ 항목은 Inspector REST API로 조회 가능:
// GET /api/v1/dlq?queue=webhook-failures
//
// 수동 재전송:
// POST /api/v1/dlq/webhook-failures/replay/{item_id}

// 프로그래밍 방식 DLQ 처리
let replay_pipeline = Axon::typed::<DlqItem, DeliveryResult>("dlq-replay")
    .then_fn("extract-payload", |item, _res, _bus| async move {
        let payload: WebhookPayload = serde_json::from_value(item.payload)
            .map_err(|e| e.to_string())?;
        Outcome::next(payload)
    })
    .then(WebhookDelivery::new())
    .with_retry(RetryPolicy::exponential(5, std::time::Duration::from_secs(5)));

#4. Branch를 활용한 멀티채널 알림

Ranvier의 Branch를 사용하여 사용자 선호도나 알림 유형에 따라 다른 채널로 알림을 라우팅한다.

use ranvier_core::prelude::*;
use ranvier_runtime::Axon;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Notification {
    pub channel: NotificationChannel,
    pub recipient: String,
    pub subject: String,
    pub body: String,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NotificationChannel {
    Email,
    Slack,
    Sms,
}

let notification_pipeline = Axon::typed::<Notification, String>("notify")
    .then_fn("route", |notif, _res, _bus| async move {
        match notif.channel {
            NotificationChannel::Email => {
                send_email(&notif).await
            }
            NotificationChannel::Slack => {
                send_slack(&notif).await
            }
            NotificationChannel::Sms => {
                send_sms(&notif).await
            }
        }
    });

#5. SMTP 이메일 전송 (lettre)

use lettre::{
    AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor,
    transport::smtp::authentication::Credentials,
};

async fn send_email(notif: &Notification) -> Outcome<String, String> {
    let creds = Credentials::new(
        std::env::var("SMTP_USER").unwrap_or_default(),
        std::env::var("SMTP_PASS").unwrap_or_default(),
    );

    let mailer = AsyncSmtpTransport::<Tokio1Executor>::relay(
        &std::env::var("SMTP_HOST").unwrap_or("smtp.gmail.com".into()),
    )
    .map_err(|e| e.to_string())?
    .credentials(creds)
    .build();

    let email = Message::builder()
        .from("noreply@example.com".parse().unwrap())
        .to(notif.recipient.parse().map_err(|e: lettre::address::AddressError| e.to_string())?)
        .subject(&notif.subject)
        .body(notif.body.clone())
        .map_err(|e| e.to_string())?;

    mailer.send(email).await.map_err(|e| e.to_string())?;
    Outcome::next(format!("{} 에게 이메일 전송 완료", notif.recipient))
}

#6. Slack 웹훅 (reqwest)

async fn send_slack(notif: &Notification) -> Outcome<String, String> {
    let webhook_url = std::env::var("SLACK_WEBHOOK_URL")
        .map_err(|_| "SLACK_WEBHOOK_URL 미설정".to_string())?;

    let client = reqwest::Client::new();
    let response = client
        .post(&webhook_url)
        .json(&serde_json::json!({
            "text": format!("*{}*\n{}", notif.subject, notif.body),
        }))
        .send()
        .await
        .map_err(|e| e.to_string())?;

    if response.status().is_success() {
        Outcome::next(format!("{} 에게 Slack 전송 완료", notif.recipient))
    } else {
        Outcome::fault(format!("Slack 전송 실패: {}", response.status()))
    }
}

#7. Twilio SMS (reqwest)

async fn send_sms(notif: &Notification) -> Outcome<String, String> {
    let account_sid = std::env::var("TWILIO_ACCOUNT_SID")
        .map_err(|_| "TWILIO_ACCOUNT_SID 미설정".to_string())?;
    let auth_token = std::env::var("TWILIO_AUTH_TOKEN")
        .map_err(|_| "TWILIO_AUTH_TOKEN 미설정".to_string())?;
    let from_number = std::env::var("TWILIO_FROM_NUMBER")
        .map_err(|_| "TWILIO_FROM_NUMBER 미설정".to_string())?;

    let client = reqwest::Client::new();
    let url = format!(
        "https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
    );

    let response = client
        .post(&url)
        .basic_auth(&account_sid, Some(&auth_token))
        .form(&[
            ("To", notif.recipient.as_str()),
            ("From", from_number.as_str()),
            ("Body", &format!("{}: {}", notif.subject, notif.body)),
        ])
        .send()
        .await
        .map_err(|e| e.to_string())?;

    if response.status().is_success() {
        Outcome::next(format!("{} 에게 SMS 전송 완료", notif.recipient))
    } else {
        Outcome::fault(format!("SMS 전송 실패: {}", response.status()))
    }
}

#8. 폴백 채널 패턴

1차 채널 실패 시 자동으로 대체 채널로 전환한다.

let resilient_notify = Axon::typed::<Notification, String>("resilient-notify")
    .then_fn("try-primary", |notif, _res, bus| async move {
        let result = match notif.channel {
            NotificationChannel::Email => send_email(&notif).await,
            NotificationChannel::Slack => send_slack(&notif).await,
            NotificationChannel::Sms => send_sms(&notif).await,
        };

        match result {
            Outcome::Next(msg) => Outcome::next(msg),
            Outcome::Fault(_) => {
                // 감사를 위해 원래 채널 저장
                bus.insert(FailedChannel(notif.channel.clone()));

                // 폴백: Email -> Slack, Slack -> Email, SMS -> Email
                let fallback = Notification {
                    channel: match notif.channel {
                        NotificationChannel::Email => NotificationChannel::Slack,
                        _ => NotificationChannel::Email,
                    },
                    ..notif
                };
                match fallback.channel {
                    NotificationChannel::Email => send_email(&fallback).await,
                    NotificationChannel::Slack => send_slack(&fallback).await,
                    NotificationChannel::Sms => send_sms(&fallback).await,
                }
            }
            other => other,
        }
    });

#[derive(Debug, Clone)]
struct FailedChannel(NotificationChannel);

#9. 발신 호출 서킷 브레이커

Atomic 카운터를 사용한 Guard로 간단한 서킷 브레이커를 구현한다.

use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::sync::Arc;

#[derive(Debug, Clone)]
pub struct CircuitBreakerGuard<T> {
    failure_count: Arc<AtomicU32>,
    is_open: Arc<AtomicBool>,
    threshold: u32,
    _marker: std::marker::PhantomData<T>,
}

impl<T> CircuitBreakerGuard<T> {
    pub fn new(threshold: u32) -> Self {
        Self {
            failure_count: Arc::new(AtomicU32::new(0)),
            is_open: Arc::new(AtomicBool::new(false)),
            threshold,
            _marker: std::marker::PhantomData,
        }
    }

    pub fn record_failure(&self) {
        let count = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
        if count >= self.threshold {
            self.is_open.store(true, Ordering::Relaxed);
        }
    }

    pub fn record_success(&self) {
        self.failure_count.store(0, Ordering::Relaxed);
        self.is_open.store(false, Ordering::Relaxed);
    }
}

#[async_trait]
impl<T: Send + Sync + 'static> Transition<T, T> for CircuitBreakerGuard<T> {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        input: T,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<T, Self::Error> {
        if self.is_open.load(Ordering::Relaxed) {
            Outcome::fault("503 Service Unavailable: 서킷 브레이커 열림".into())
        } else {
            Outcome::next(input)
        }
    }
}

#10. 발신 호출 감사 추적

규제 준수를 위해 모든 발신 호출을 감사 해시 체인에 기록한다.

use ranvier_audit::prelude::*;

let audited_webhook = Axon::typed::<WebhookPayload, DeliveryResult>("audited-webhook")
    .then_fn("audit-start", |payload, _res, bus| async move {
        let audit = bus.require::<AuditTrail>();
        audit.record(AuditEntry {
            action: "webhook.attempt".into(),
            actor: "system".into(),
            resource: format!("url={}, event={}", payload.url, payload.event_type),
            timestamp: chrono::Utc::now(),
        }).await;
        Outcome::next(payload)
    })
    .then(WebhookDelivery::new())
    .then_fn("audit-complete", |result, _res, bus| async move {
        let audit = bus.require::<AuditTrail>();
        audit.record(AuditEntry {
            action: "webhook.delivered".into(),
            actor: "system".into(),
            resource: format!("url={}, status={}", result.url, result.status),
            timestamp: chrono::Utc::now(),
        }).await;
        Outcome::next(result)
    });

#See Also

  • HttpIngress Patterns Cookbook — 수신 HTTP 패턴 (상호 보완)
  • Guard Patterns Cookbook — 서킷 브레이커 Guard 조합
  • Saga Compensation Cookbook — 실패한 전송 보상
  • Multi-Tenant Isolation Cookbook — 테넌트 스코프 웹훅 전송