#외부 서비스 발신 패턴 Cookbook
버전: 0.40.0 | 업데이트: 2026-03-23 | 적용 대상: ranvier-core 0.40+ | 카테고리: 쿡북
#Overview
cookbook_http_ingress가 HTTP 요청 수신을 다룬다면, 이 쿡북은 발신을 다룬다 —
HMAC 서명 웹훅 전송, Branch를 활용한 멀티채널 알림, 재시도 전략, DLQ 폴백 패턴.
모든 패턴은 기존 Ranvier 프리미티브(Transition, RetryPolicy, DLQ, Branch)와 생태계 crate(reqwest, hmac, sha2, lettre)를 조합하여 구현한다.
#1. HMAC 서명 웹훅 전송
등록된 엔드포인트로 서명된 웹훅 페이로드를 전송하는 Transition.
use async_trait::async_trait;
use hmac::{Hmac, Mac};
use ranvier_core::prelude::*;
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WebhookPayload {
pub url: String,
pub secret: String,
pub event_type: String,
pub body: serde_json::Value,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DeliveryResult {
pub status: u16,
pub url: String,
pub duration_ms: u64,
}
pub struct WebhookDelivery {
client: reqwest::Client,
}
impl WebhookDelivery {
pub fn new() -> Self {
Self {
client: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.expect("reqwest client"),
}
}
}
#[async_trait]
impl Transition<WebhookPayload, DeliveryResult> for WebhookDelivery {
type Error = String;
type Resources = ();
async fn run(
&self,
payload: WebhookPayload,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<DeliveryResult, Self::Error> {
let body_bytes = serde_json::to_vec(&payload.body)
.map_err(|e| e.to_string())?;
// HMAC-SHA256 서명
let mut mac = HmacSha256::new_from_slice(payload.secret.as_bytes())
.map_err(|e| e.to_string())?;
mac.update(&body_bytes);
let signature = hex::encode(mac.finalize().into_bytes());
let start = std::time::Instant::now();
let response = self.client
.post(&payload.url)
.header("Content-Type", "application/json")
.header("X-Webhook-Signature", format!("sha256={signature}"))
.header("X-Webhook-Event", &payload.event_type)
.body(body_bytes)
.send()
.await
.map_err(|e| e.to_string())?;
let status = response.status().as_u16();
let duration_ms = start.elapsed().as_millis() as u64;
if status >= 200 && status < 300 {
Outcome::next(DeliveryResult {
status,
url: payload.url,
duration_ms,
})
} else {
Outcome::fault(format!(
"웹훅 전송 실패: {} → HTTP {status}",
payload.url
))
}
}
}#2. 상태 코드 분류 재시도 전략
HTTP 응답을 분류하여 재시도 여부를 결정한다. 429와 5xx는 재시도 가능, 4xx(429 제외)는 영구 실패.
use ranvier_core::prelude::*;
use ranvier_runtime::Axon;
#[derive(Debug, Clone)]
pub enum DeliveryOutcome {
Delivered(DeliveryResult),
Retryable(String),
PermanentFailure(String),
}
pub struct ClassifyResponse;
#[async_trait]
impl Transition<DeliveryResult, DeliveryOutcome> for ClassifyResponse {
type Error = String;
type Resources = ();
async fn run(
&self,
result: DeliveryResult,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<DeliveryOutcome, Self::Error> {
match result.status {
200..=299 => Outcome::next(DeliveryOutcome::Delivered(result)),
429 => Outcome::next(DeliveryOutcome::Retryable(
"레이트 리밋 — 백오프 후 재시도".into(),
)),
500..=599 => Outcome::next(DeliveryOutcome::Retryable(
format!("서버 에러 {}", result.status),
)),
_ => Outcome::next(DeliveryOutcome::PermanentFailure(
format!("클라이언트 에러 {} — 재시도 불가", result.status),
)),
}
}
}#재시도 정책 적용 파이프라인
let webhook_pipeline = Axon::typed::<WebhookPayload, DeliveryResult>("webhook-deliver")
.then(WebhookDelivery::new())
.with_retry(RetryPolicy::exponential(3, std::time::Duration::from_secs(1)))
.with_dlq("webhook-failures");with_retry는 지수 백오프(1초, 2초, 4초)로 최대 3회 재시도한다.
모든 재시도 소진 후 실패한 페이로드는 webhook-failures DLQ로 라우팅된다.
#3. DLQ 모니터링 및 수동 재전송
실패한 전송은 DLQ에 저장된다. Inspector로 모니터링하고 수동 재전송할 수 있다.
// DLQ 항목은 Inspector REST API로 조회 가능:
// GET /api/v1/dlq?queue=webhook-failures
//
// 수동 재전송:
// POST /api/v1/dlq/webhook-failures/replay/{item_id}
// 프로그래밍 방식 DLQ 처리
let replay_pipeline = Axon::typed::<DlqItem, DeliveryResult>("dlq-replay")
.then_fn("extract-payload", |item, _res, _bus| async move {
let payload: WebhookPayload = serde_json::from_value(item.payload)
.map_err(|e| e.to_string())?;
Outcome::next(payload)
})
.then(WebhookDelivery::new())
.with_retry(RetryPolicy::exponential(5, std::time::Duration::from_secs(5)));#4. Branch를 활용한 멀티채널 알림
Ranvier의 Branch를 사용하여 사용자 선호도나 알림 유형에 따라
다른 채널로 알림을 라우팅한다.
use ranvier_core::prelude::*;
use ranvier_runtime::Axon;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Notification {
pub channel: NotificationChannel,
pub recipient: String,
pub subject: String,
pub body: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NotificationChannel {
Email,
Slack,
Sms,
}
let notification_pipeline = Axon::typed::<Notification, String>("notify")
.then_fn("route", |notif, _res, _bus| async move {
match notif.channel {
NotificationChannel::Email => {
send_email(¬if).await
}
NotificationChannel::Slack => {
send_slack(¬if).await
}
NotificationChannel::Sms => {
send_sms(¬if).await
}
}
});#5. SMTP 이메일 전송 (lettre)
use lettre::{
AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor,
transport::smtp::authentication::Credentials,
};
async fn send_email(notif: &Notification) -> Outcome<String, String> {
let creds = Credentials::new(
std::env::var("SMTP_USER").unwrap_or_default(),
std::env::var("SMTP_PASS").unwrap_or_default(),
);
let mailer = AsyncSmtpTransport::<Tokio1Executor>::relay(
&std::env::var("SMTP_HOST").unwrap_or("smtp.gmail.com".into()),
)
.map_err(|e| e.to_string())?
.credentials(creds)
.build();
let email = Message::builder()
.from("noreply@example.com".parse().unwrap())
.to(notif.recipient.parse().map_err(|e: lettre::address::AddressError| e.to_string())?)
.subject(¬if.subject)
.body(notif.body.clone())
.map_err(|e| e.to_string())?;
mailer.send(email).await.map_err(|e| e.to_string())?;
Outcome::next(format!("{} 에게 이메일 전송 완료", notif.recipient))
}#6. Slack 웹훅 (reqwest)
async fn send_slack(notif: &Notification) -> Outcome<String, String> {
let webhook_url = std::env::var("SLACK_WEBHOOK_URL")
.map_err(|_| "SLACK_WEBHOOK_URL 미설정".to_string())?;
let client = reqwest::Client::new();
let response = client
.post(&webhook_url)
.json(&serde_json::json!({
"text": format!("*{}*\n{}", notif.subject, notif.body),
}))
.send()
.await
.map_err(|e| e.to_string())?;
if response.status().is_success() {
Outcome::next(format!("{} 에게 Slack 전송 완료", notif.recipient))
} else {
Outcome::fault(format!("Slack 전송 실패: {}", response.status()))
}
}#7. Twilio SMS (reqwest)
async fn send_sms(notif: &Notification) -> Outcome<String, String> {
let account_sid = std::env::var("TWILIO_ACCOUNT_SID")
.map_err(|_| "TWILIO_ACCOUNT_SID 미설정".to_string())?;
let auth_token = std::env::var("TWILIO_AUTH_TOKEN")
.map_err(|_| "TWILIO_AUTH_TOKEN 미설정".to_string())?;
let from_number = std::env::var("TWILIO_FROM_NUMBER")
.map_err(|_| "TWILIO_FROM_NUMBER 미설정".to_string())?;
let client = reqwest::Client::new();
let url = format!(
"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
);
let response = client
.post(&url)
.basic_auth(&account_sid, Some(&auth_token))
.form(&[
("To", notif.recipient.as_str()),
("From", from_number.as_str()),
("Body", &format!("{}: {}", notif.subject, notif.body)),
])
.send()
.await
.map_err(|e| e.to_string())?;
if response.status().is_success() {
Outcome::next(format!("{} 에게 SMS 전송 완료", notif.recipient))
} else {
Outcome::fault(format!("SMS 전송 실패: {}", response.status()))
}
}#8. 폴백 채널 패턴
1차 채널 실패 시 자동으로 대체 채널로 전환한다.
let resilient_notify = Axon::typed::<Notification, String>("resilient-notify")
.then_fn("try-primary", |notif, _res, bus| async move {
let result = match notif.channel {
NotificationChannel::Email => send_email(¬if).await,
NotificationChannel::Slack => send_slack(¬if).await,
NotificationChannel::Sms => send_sms(¬if).await,
};
match result {
Outcome::Next(msg) => Outcome::next(msg),
Outcome::Fault(_) => {
// 감사를 위해 원래 채널 저장
bus.insert(FailedChannel(notif.channel.clone()));
// 폴백: Email -> Slack, Slack -> Email, SMS -> Email
let fallback = Notification {
channel: match notif.channel {
NotificationChannel::Email => NotificationChannel::Slack,
_ => NotificationChannel::Email,
},
..notif
};
match fallback.channel {
NotificationChannel::Email => send_email(&fallback).await,
NotificationChannel::Slack => send_slack(&fallback).await,
NotificationChannel::Sms => send_sms(&fallback).await,
}
}
other => other,
}
});
#[derive(Debug, Clone)]
struct FailedChannel(NotificationChannel);#9. 발신 호출 서킷 브레이커
Atomic 카운터를 사용한 Guard로 간단한 서킷 브레이커를 구현한다.
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct CircuitBreakerGuard<T> {
failure_count: Arc<AtomicU32>,
is_open: Arc<AtomicBool>,
threshold: u32,
_marker: std::marker::PhantomData<T>,
}
impl<T> CircuitBreakerGuard<T> {
pub fn new(threshold: u32) -> Self {
Self {
failure_count: Arc::new(AtomicU32::new(0)),
is_open: Arc::new(AtomicBool::new(false)),
threshold,
_marker: std::marker::PhantomData,
}
}
pub fn record_failure(&self) {
let count = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
if count >= self.threshold {
self.is_open.store(true, Ordering::Relaxed);
}
}
pub fn record_success(&self) {
self.failure_count.store(0, Ordering::Relaxed);
self.is_open.store(false, Ordering::Relaxed);
}
}
#[async_trait]
impl<T: Send + Sync + 'static> Transition<T, T> for CircuitBreakerGuard<T> {
type Error = String;
type Resources = ();
async fn run(
&self,
input: T,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<T, Self::Error> {
if self.is_open.load(Ordering::Relaxed) {
Outcome::fault("503 Service Unavailable: 서킷 브레이커 열림".into())
} else {
Outcome::next(input)
}
}
}#10. 발신 호출 감사 추적
규제 준수를 위해 모든 발신 호출을 감사 해시 체인에 기록한다.
use ranvier_audit::prelude::*;
let audited_webhook = Axon::typed::<WebhookPayload, DeliveryResult>("audited-webhook")
.then_fn("audit-start", |payload, _res, bus| async move {
let audit = bus.require::<AuditTrail>();
audit.record(AuditEntry {
action: "webhook.attempt".into(),
actor: "system".into(),
resource: format!("url={}, event={}", payload.url, payload.event_type),
timestamp: chrono::Utc::now(),
}).await;
Outcome::next(payload)
})
.then(WebhookDelivery::new())
.then_fn("audit-complete", |result, _res, bus| async move {
let audit = bus.require::<AuditTrail>();
audit.record(AuditEntry {
action: "webhook.delivered".into(),
actor: "system".into(),
resource: format!("url={}, status={}", result.url, result.status),
timestamp: chrono::Utc::now(),
}).await;
Outcome::next(result)
});#See Also
- HttpIngress Patterns Cookbook — 수신 HTTP 패턴 (상호 보완)
- Guard Patterns Cookbook — 서킷 브레이커 Guard 조합
- Saga Compensation Cookbook — 실패한 전송 보상
- Multi-Tenant Isolation Cookbook — 테넌트 스코프 웹훅 전송