#Egress Patterns Cookbook

Version: 0.40.0 | Updated: 2026-03-23 | Applies to: ranvier-core 0.40+ | Category: Cookbook


#Overview

While cookbook_http_ingress covers receiving HTTP requests, this cookbook covers sending — webhook delivery with HMAC signatures, multi-channel notifications via Branch, retry strategies, and DLQ fallback patterns.

All patterns use existing Ranvier primitives (Transition, RetryPolicy, DLQ, Branch) combined with ecosystem crates (reqwest, hmac, sha2, lettre).


#1. Webhook Delivery with HMAC Signature

A Transition that sends a signed webhook payload to a registered endpoint.

use async_trait::async_trait;
use hmac::{Hmac, Mac};
use ranvier_core::prelude::*;
use sha2::Sha256;

type HmacSha256 = Hmac<Sha256>;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WebhookPayload {
    pub url: String,
    pub secret: String,
    pub event_type: String,
    pub body: serde_json::Value,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DeliveryResult {
    pub status: u16,
    pub url: String,
    pub duration_ms: u64,
}

pub struct WebhookDelivery {
    client: reqwest::Client,
}

impl WebhookDelivery {
    pub fn new() -> Self {
        Self {
            client: reqwest::Client::builder()
                .timeout(std::time::Duration::from_secs(10))
                .build()
                .expect("reqwest client"),
        }
    }
}

#[async_trait]
impl Transition<WebhookPayload, DeliveryResult> for WebhookDelivery {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        payload: WebhookPayload,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<DeliveryResult, Self::Error> {
        let body_bytes = serde_json::to_vec(&payload.body)
            .map_err(|e| e.to_string())?;

        // HMAC-SHA256 signature
        let mut mac = HmacSha256::new_from_slice(payload.secret.as_bytes())
            .map_err(|e| e.to_string())?;
        mac.update(&body_bytes);
        let signature = hex::encode(mac.finalize().into_bytes());

        let start = std::time::Instant::now();

        let response = self.client
            .post(&payload.url)
            .header("Content-Type", "application/json")
            .header("X-Webhook-Signature", format!("sha256={signature}"))
            .header("X-Webhook-Event", &payload.event_type)
            .body(body_bytes)
            .send()
            .await
            .map_err(|e| e.to_string())?;

        let status = response.status().as_u16();
        let duration_ms = start.elapsed().as_millis() as u64;

        if status >= 200 && status < 300 {
            Outcome::next(DeliveryResult {
                status,
                url: payload.url,
                duration_ms,
            })
        } else {
            Outcome::fault(format!(
                "webhook delivery failed: HTTP {status} from {}",
                payload.url
            ))
        }
    }
}

#2. Retry Strategy with Status Classification

Classify HTTP responses to determine retry behavior. 429 and 5xx are retryable; 4xx (except 429) are permanent failures.

use ranvier_core::prelude::*;
use ranvier_runtime::Axon;

#[derive(Debug, Clone)]
pub enum DeliveryOutcome {
    Delivered(DeliveryResult),
    Retryable(String),
    PermanentFailure(String),
}

pub struct ClassifyResponse;

#[async_trait]
impl Transition<DeliveryResult, DeliveryOutcome> for ClassifyResponse {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        result: DeliveryResult,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<DeliveryOutcome, Self::Error> {
        match result.status {
            200..=299 => Outcome::next(DeliveryOutcome::Delivered(result)),
            429 => Outcome::next(DeliveryOutcome::Retryable(
                "rate limited — retry after backoff".into(),
            )),
            500..=599 => Outcome::next(DeliveryOutcome::Retryable(
                format!("server error {}", result.status),
            )),
            _ => Outcome::next(DeliveryOutcome::PermanentFailure(
                format!("client error {} — not retryable", result.status),
            )),
        }
    }
}

#Pipeline with Retry Policy

let webhook_pipeline = Axon::typed::<WebhookPayload, DeliveryResult>("webhook-deliver")
    .then(WebhookDelivery::new())
    .with_retry(RetryPolicy::exponential(3, std::time::Duration::from_secs(1)))
    .with_dlq("webhook-failures");

The with_retry method applies exponential backoff (1s, 2s, 4s) for up to 3 attempts. After all retries are exhausted, the failed payload is routed to the webhook-failures DLQ.


#3. DLQ Monitoring and Manual Replay

Failed deliveries land in the DLQ. Use Inspector to monitor and manually replay.

// DLQ items are visible via Inspector REST API:
// GET /api/v1/dlq?queue=webhook-failures
//
// Manual replay:
// POST /api/v1/dlq/webhook-failures/replay/{item_id}

// Programmatic DLQ processing
let replay_pipeline = Axon::typed::<DlqItem, DeliveryResult>("dlq-replay")
    .then_fn("extract-payload", |item, _res, _bus| async move {
        let payload: WebhookPayload = serde_json::from_value(item.payload)
            .map_err(|e| e.to_string())?;
        Outcome::next(payload)
    })
    .then(WebhookDelivery::new())
    .with_retry(RetryPolicy::exponential(5, std::time::Duration::from_secs(5)));

#4. Multi-Channel Notification with Branch

Use Ranvier's Branch to route notifications to different channels based on user preferences or notification type.

use ranvier_core::prelude::*;
use ranvier_runtime::Axon;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Notification {
    pub channel: NotificationChannel,
    pub recipient: String,
    pub subject: String,
    pub body: String,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NotificationChannel {
    Email,
    Slack,
    Sms,
}

let notification_pipeline = Axon::typed::<Notification, String>("notify")
    .then_fn("route", |notif, _res, _bus| async move {
        match notif.channel {
            NotificationChannel::Email => {
                send_email(&notif).await
            }
            NotificationChannel::Slack => {
                send_slack(&notif).await
            }
            NotificationChannel::Sms => {
                send_sms(&notif).await
            }
        }
    });

#5. Email via SMTP (lettre)

use lettre::{
    AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor,
    transport::smtp::authentication::Credentials,
};

async fn send_email(notif: &Notification) -> Outcome<String, String> {
    let creds = Credentials::new(
        std::env::var("SMTP_USER").unwrap_or_default(),
        std::env::var("SMTP_PASS").unwrap_or_default(),
    );

    let mailer = AsyncSmtpTransport::<Tokio1Executor>::relay(
        &std::env::var("SMTP_HOST").unwrap_or("smtp.gmail.com".into()),
    )
    .map_err(|e| e.to_string())?
    .credentials(creds)
    .build();

    let email = Message::builder()
        .from("noreply@example.com".parse().unwrap())
        .to(notif.recipient.parse().map_err(|e: lettre::address::AddressError| e.to_string())?)
        .subject(&notif.subject)
        .body(notif.body.clone())
        .map_err(|e| e.to_string())?;

    mailer.send(email).await.map_err(|e| e.to_string())?;
    Outcome::next(format!("email sent to {}", notif.recipient))
}

#6. Slack Webhook (reqwest)

async fn send_slack(notif: &Notification) -> Outcome<String, String> {
    let webhook_url = std::env::var("SLACK_WEBHOOK_URL")
        .map_err(|_| "SLACK_WEBHOOK_URL not set".to_string())?;

    let client = reqwest::Client::new();
    let response = client
        .post(&webhook_url)
        .json(&serde_json::json!({
            "text": format!("*{}*\n{}", notif.subject, notif.body),
        }))
        .send()
        .await
        .map_err(|e| e.to_string())?;

    if response.status().is_success() {
        Outcome::next(format!("slack sent to {}", notif.recipient))
    } else {
        Outcome::fault(format!("slack failed: {}", response.status()))
    }
}

#7. SMS via Twilio (reqwest)

async fn send_sms(notif: &Notification) -> Outcome<String, String> {
    let account_sid = std::env::var("TWILIO_ACCOUNT_SID")
        .map_err(|_| "TWILIO_ACCOUNT_SID not set".to_string())?;
    let auth_token = std::env::var("TWILIO_AUTH_TOKEN")
        .map_err(|_| "TWILIO_AUTH_TOKEN not set".to_string())?;
    let from_number = std::env::var("TWILIO_FROM_NUMBER")
        .map_err(|_| "TWILIO_FROM_NUMBER not set".to_string())?;

    let client = reqwest::Client::new();
    let url = format!(
        "https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
    );

    let response = client
        .post(&url)
        .basic_auth(&account_sid, Some(&auth_token))
        .form(&[
            ("To", notif.recipient.as_str()),
            ("From", from_number.as_str()),
            ("Body", &format!("{}: {}", notif.subject, notif.body)),
        ])
        .send()
        .await
        .map_err(|e| e.to_string())?;

    if response.status().is_success() {
        Outcome::next(format!("sms sent to {}", notif.recipient))
    } else {
        Outcome::fault(format!("sms failed: {}", response.status()))
    }
}

#8. Fallback Channel Pattern

When the primary channel fails, automatically fall back to an alternative.

let resilient_notify = Axon::typed::<Notification, String>("resilient-notify")
    .then_fn("try-primary", |notif, _res, bus| async move {
        let result = match notif.channel {
            NotificationChannel::Email => send_email(&notif).await,
            NotificationChannel::Slack => send_slack(&notif).await,
            NotificationChannel::Sms => send_sms(&notif).await,
        };

        match result {
            Outcome::Next(msg) => Outcome::next(msg),
            Outcome::Fault(_) => {
                // Store original channel for audit
                bus.insert(FailedChannel(notif.channel.clone()));

                // Fallback: Email -> Slack, Slack -> Email, SMS -> Email
                let fallback = Notification {
                    channel: match notif.channel {
                        NotificationChannel::Email => NotificationChannel::Slack,
                        _ => NotificationChannel::Email,
                    },
                    ..notif
                };
                match fallback.channel {
                    NotificationChannel::Email => send_email(&fallback).await,
                    NotificationChannel::Slack => send_slack(&fallback).await,
                    NotificationChannel::Sms => send_sms(&fallback).await,
                }
            }
            other => other,
        }
    });

#[derive(Debug, Clone)]
struct FailedChannel(NotificationChannel);

#9. Circuit Breaker for Outbound Calls

Use a Guard with an atomic counter to implement a simple circuit breaker.

use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::sync::Arc;

#[derive(Debug, Clone)]
pub struct CircuitBreakerGuard<T> {
    failure_count: Arc<AtomicU32>,
    is_open: Arc<AtomicBool>,
    threshold: u32,
    _marker: std::marker::PhantomData<T>,
}

impl<T> CircuitBreakerGuard<T> {
    pub fn new(threshold: u32) -> Self {
        Self {
            failure_count: Arc::new(AtomicU32::new(0)),
            is_open: Arc::new(AtomicBool::new(false)),
            threshold,
            _marker: std::marker::PhantomData,
        }
    }

    pub fn record_failure(&self) {
        let count = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
        if count >= self.threshold {
            self.is_open.store(true, Ordering::Relaxed);
        }
    }

    pub fn record_success(&self) {
        self.failure_count.store(0, Ordering::Relaxed);
        self.is_open.store(false, Ordering::Relaxed);
    }
}

#[async_trait]
impl<T: Send + Sync + 'static> Transition<T, T> for CircuitBreakerGuard<T> {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        input: T,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<T, Self::Error> {
        if self.is_open.load(Ordering::Relaxed) {
            Outcome::fault("503 Service Unavailable: circuit breaker open".into())
        } else {
            Outcome::next(input)
        }
    }
}

#10. Audit Trail for Outbound Calls

Log every outbound call to the audit hash chain for compliance.

use ranvier_audit::prelude::*;

let audited_webhook = Axon::typed::<WebhookPayload, DeliveryResult>("audited-webhook")
    .then_fn("audit-start", |payload, _res, bus| async move {
        let audit = bus.require::<AuditTrail>();
        audit.record(AuditEntry {
            action: "webhook.attempt".into(),
            actor: "system".into(),
            resource: format!("url={}, event={}", payload.url, payload.event_type),
            timestamp: chrono::Utc::now(),
        }).await;
        Outcome::next(payload)
    })
    .then(WebhookDelivery::new())
    .then_fn("audit-complete", |result, _res, bus| async move {
        let audit = bus.require::<AuditTrail>();
        audit.record(AuditEntry {
            action: "webhook.delivered".into(),
            actor: "system".into(),
            resource: format!("url={}, status={}", result.url, result.status),
            timestamp: chrono::Utc::now(),
        }).await;
        Outcome::next(result)
    });

#See Also

  • HttpIngress Patterns Cookbook — inbound HTTP patterns (counterpart)
  • Guard Patterns Cookbook — Guard composition for circuit breaker
  • Saga Compensation Cookbook — compensating failed deliveries
  • Multi-Tenant Isolation Cookbook — tenant-scoped webhook delivery