#Egress Patterns Cookbook
Version: 0.40.0 | Updated: 2026-03-23 | Applies to: ranvier-core 0.40+ | Category: Cookbook
#Overview
While cookbook_http_ingress covers receiving HTTP requests, this cookbook covers
sending — webhook delivery with HMAC signatures, multi-channel notifications
via Branch, retry strategies, and DLQ fallback patterns.
All patterns use existing Ranvier primitives (Transition, RetryPolicy, DLQ, Branch) combined with ecosystem crates (reqwest, hmac, sha2, lettre).
#1. Webhook Delivery with HMAC Signature
A Transition that sends a signed webhook payload to a registered endpoint.
use async_trait::async_trait;
use hmac::{Hmac, Mac};
use ranvier_core::prelude::*;
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct WebhookPayload {
pub url: String,
pub secret: String,
pub event_type: String,
pub body: serde_json::Value,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DeliveryResult {
pub status: u16,
pub url: String,
pub duration_ms: u64,
}
pub struct WebhookDelivery {
client: reqwest::Client,
}
impl WebhookDelivery {
pub fn new() -> Self {
Self {
client: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.expect("reqwest client"),
}
}
}
#[async_trait]
impl Transition<WebhookPayload, DeliveryResult> for WebhookDelivery {
type Error = String;
type Resources = ();
async fn run(
&self,
payload: WebhookPayload,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<DeliveryResult, Self::Error> {
let body_bytes = serde_json::to_vec(&payload.body)
.map_err(|e| e.to_string())?;
// HMAC-SHA256 signature
let mut mac = HmacSha256::new_from_slice(payload.secret.as_bytes())
.map_err(|e| e.to_string())?;
mac.update(&body_bytes);
let signature = hex::encode(mac.finalize().into_bytes());
let start = std::time::Instant::now();
let response = self.client
.post(&payload.url)
.header("Content-Type", "application/json")
.header("X-Webhook-Signature", format!("sha256={signature}"))
.header("X-Webhook-Event", &payload.event_type)
.body(body_bytes)
.send()
.await
.map_err(|e| e.to_string())?;
let status = response.status().as_u16();
let duration_ms = start.elapsed().as_millis() as u64;
if status >= 200 && status < 300 {
Outcome::next(DeliveryResult {
status,
url: payload.url,
duration_ms,
})
} else {
Outcome::fault(format!(
"webhook delivery failed: HTTP {status} from {}",
payload.url
))
}
}
}#2. Retry Strategy with Status Classification
Classify HTTP responses to determine retry behavior. 429 and 5xx are retryable; 4xx (except 429) are permanent failures.
use ranvier_core::prelude::*;
use ranvier_runtime::Axon;
#[derive(Debug, Clone)]
pub enum DeliveryOutcome {
Delivered(DeliveryResult),
Retryable(String),
PermanentFailure(String),
}
pub struct ClassifyResponse;
#[async_trait]
impl Transition<DeliveryResult, DeliveryOutcome> for ClassifyResponse {
type Error = String;
type Resources = ();
async fn run(
&self,
result: DeliveryResult,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<DeliveryOutcome, Self::Error> {
match result.status {
200..=299 => Outcome::next(DeliveryOutcome::Delivered(result)),
429 => Outcome::next(DeliveryOutcome::Retryable(
"rate limited — retry after backoff".into(),
)),
500..=599 => Outcome::next(DeliveryOutcome::Retryable(
format!("server error {}", result.status),
)),
_ => Outcome::next(DeliveryOutcome::PermanentFailure(
format!("client error {} — not retryable", result.status),
)),
}
}
}#Pipeline with Retry Policy
let webhook_pipeline = Axon::typed::<WebhookPayload, DeliveryResult>("webhook-deliver")
.then(WebhookDelivery::new())
.with_retry(RetryPolicy::exponential(3, std::time::Duration::from_secs(1)))
.with_dlq("webhook-failures");The with_retry method applies exponential backoff (1s, 2s, 4s) for up to 3 attempts.
After all retries are exhausted, the failed payload is routed to the webhook-failures DLQ.
#3. DLQ Monitoring and Manual Replay
Failed deliveries land in the DLQ. Use Inspector to monitor and manually replay.
// DLQ items are visible via Inspector REST API:
// GET /api/v1/dlq?queue=webhook-failures
//
// Manual replay:
// POST /api/v1/dlq/webhook-failures/replay/{item_id}
// Programmatic DLQ processing
let replay_pipeline = Axon::typed::<DlqItem, DeliveryResult>("dlq-replay")
.then_fn("extract-payload", |item, _res, _bus| async move {
let payload: WebhookPayload = serde_json::from_value(item.payload)
.map_err(|e| e.to_string())?;
Outcome::next(payload)
})
.then(WebhookDelivery::new())
.with_retry(RetryPolicy::exponential(5, std::time::Duration::from_secs(5)));#4. Multi-Channel Notification with Branch
Use Ranvier's Branch to route notifications to different channels based on
user preferences or notification type.
use ranvier_core::prelude::*;
use ranvier_runtime::Axon;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Notification {
pub channel: NotificationChannel,
pub recipient: String,
pub subject: String,
pub body: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum NotificationChannel {
Email,
Slack,
Sms,
}
let notification_pipeline = Axon::typed::<Notification, String>("notify")
.then_fn("route", |notif, _res, _bus| async move {
match notif.channel {
NotificationChannel::Email => {
send_email(¬if).await
}
NotificationChannel::Slack => {
send_slack(¬if).await
}
NotificationChannel::Sms => {
send_sms(¬if).await
}
}
});#5. Email via SMTP (lettre)
use lettre::{
AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor,
transport::smtp::authentication::Credentials,
};
async fn send_email(notif: &Notification) -> Outcome<String, String> {
let creds = Credentials::new(
std::env::var("SMTP_USER").unwrap_or_default(),
std::env::var("SMTP_PASS").unwrap_or_default(),
);
let mailer = AsyncSmtpTransport::<Tokio1Executor>::relay(
&std::env::var("SMTP_HOST").unwrap_or("smtp.gmail.com".into()),
)
.map_err(|e| e.to_string())?
.credentials(creds)
.build();
let email = Message::builder()
.from("noreply@example.com".parse().unwrap())
.to(notif.recipient.parse().map_err(|e: lettre::address::AddressError| e.to_string())?)
.subject(¬if.subject)
.body(notif.body.clone())
.map_err(|e| e.to_string())?;
mailer.send(email).await.map_err(|e| e.to_string())?;
Outcome::next(format!("email sent to {}", notif.recipient))
}#6. Slack Webhook (reqwest)
async fn send_slack(notif: &Notification) -> Outcome<String, String> {
let webhook_url = std::env::var("SLACK_WEBHOOK_URL")
.map_err(|_| "SLACK_WEBHOOK_URL not set".to_string())?;
let client = reqwest::Client::new();
let response = client
.post(&webhook_url)
.json(&serde_json::json!({
"text": format!("*{}*\n{}", notif.subject, notif.body),
}))
.send()
.await
.map_err(|e| e.to_string())?;
if response.status().is_success() {
Outcome::next(format!("slack sent to {}", notif.recipient))
} else {
Outcome::fault(format!("slack failed: {}", response.status()))
}
}#7. SMS via Twilio (reqwest)
async fn send_sms(notif: &Notification) -> Outcome<String, String> {
let account_sid = std::env::var("TWILIO_ACCOUNT_SID")
.map_err(|_| "TWILIO_ACCOUNT_SID not set".to_string())?;
let auth_token = std::env::var("TWILIO_AUTH_TOKEN")
.map_err(|_| "TWILIO_AUTH_TOKEN not set".to_string())?;
let from_number = std::env::var("TWILIO_FROM_NUMBER")
.map_err(|_| "TWILIO_FROM_NUMBER not set".to_string())?;
let client = reqwest::Client::new();
let url = format!(
"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
);
let response = client
.post(&url)
.basic_auth(&account_sid, Some(&auth_token))
.form(&[
("To", notif.recipient.as_str()),
("From", from_number.as_str()),
("Body", &format!("{}: {}", notif.subject, notif.body)),
])
.send()
.await
.map_err(|e| e.to_string())?;
if response.status().is_success() {
Outcome::next(format!("sms sent to {}", notif.recipient))
} else {
Outcome::fault(format!("sms failed: {}", response.status()))
}
}#8. Fallback Channel Pattern
When the primary channel fails, automatically fall back to an alternative.
let resilient_notify = Axon::typed::<Notification, String>("resilient-notify")
.then_fn("try-primary", |notif, _res, bus| async move {
let result = match notif.channel {
NotificationChannel::Email => send_email(¬if).await,
NotificationChannel::Slack => send_slack(¬if).await,
NotificationChannel::Sms => send_sms(¬if).await,
};
match result {
Outcome::Next(msg) => Outcome::next(msg),
Outcome::Fault(_) => {
// Store original channel for audit
bus.insert(FailedChannel(notif.channel.clone()));
// Fallback: Email -> Slack, Slack -> Email, SMS -> Email
let fallback = Notification {
channel: match notif.channel {
NotificationChannel::Email => NotificationChannel::Slack,
_ => NotificationChannel::Email,
},
..notif
};
match fallback.channel {
NotificationChannel::Email => send_email(&fallback).await,
NotificationChannel::Slack => send_slack(&fallback).await,
NotificationChannel::Sms => send_sms(&fallback).await,
}
}
other => other,
}
});
#[derive(Debug, Clone)]
struct FailedChannel(NotificationChannel);#9. Circuit Breaker for Outbound Calls
Use a Guard with an atomic counter to implement a simple circuit breaker.
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct CircuitBreakerGuard<T> {
failure_count: Arc<AtomicU32>,
is_open: Arc<AtomicBool>,
threshold: u32,
_marker: std::marker::PhantomData<T>,
}
impl<T> CircuitBreakerGuard<T> {
pub fn new(threshold: u32) -> Self {
Self {
failure_count: Arc::new(AtomicU32::new(0)),
is_open: Arc::new(AtomicBool::new(false)),
threshold,
_marker: std::marker::PhantomData,
}
}
pub fn record_failure(&self) {
let count = self.failure_count.fetch_add(1, Ordering::Relaxed) + 1;
if count >= self.threshold {
self.is_open.store(true, Ordering::Relaxed);
}
}
pub fn record_success(&self) {
self.failure_count.store(0, Ordering::Relaxed);
self.is_open.store(false, Ordering::Relaxed);
}
}
#[async_trait]
impl<T: Send + Sync + 'static> Transition<T, T> for CircuitBreakerGuard<T> {
type Error = String;
type Resources = ();
async fn run(
&self,
input: T,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<T, Self::Error> {
if self.is_open.load(Ordering::Relaxed) {
Outcome::fault("503 Service Unavailable: circuit breaker open".into())
} else {
Outcome::next(input)
}
}
}#10. Audit Trail for Outbound Calls
Log every outbound call to the audit hash chain for compliance.
use ranvier_audit::prelude::*;
let audited_webhook = Axon::typed::<WebhookPayload, DeliveryResult>("audited-webhook")
.then_fn("audit-start", |payload, _res, bus| async move {
let audit = bus.require::<AuditTrail>();
audit.record(AuditEntry {
action: "webhook.attempt".into(),
actor: "system".into(),
resource: format!("url={}, event={}", payload.url, payload.event_type),
timestamp: chrono::Utc::now(),
}).await;
Outcome::next(payload)
})
.then(WebhookDelivery::new())
.then_fn("audit-complete", |result, _res, bus| async move {
let audit = bus.require::<AuditTrail>();
audit.record(AuditEntry {
action: "webhook.delivered".into(),
actor: "system".into(),
resource: format!("url={}, status={}", result.url, result.status),
timestamp: chrono::Utc::now(),
}).await;
Outcome::next(result)
});#See Also
- HttpIngress Patterns Cookbook — inbound HTTP patterns (counterpart)
- Guard Patterns Cookbook — Guard composition for circuit breaker
- Saga Compensation Cookbook — compensating failed deliveries
- Multi-Tenant Isolation Cookbook — tenant-scoped webhook delivery