#LLM Gateway Cookbook

Version: 0.40.0 | Updated: 2026-03-23 | Applies to: ranvier-core 0.40+ | Category: Cookbook


#Overview

While cookbook_llm_pipeline covers single-provider LLM agent patterns, this cookbook covers the gateway layer that sits in front of multiple LLM providers — routing requests, counting tokens, enforcing cost budgets, streaming proxy, and automatic failover.

All patterns use existing Ranvier primitives (Transition, StreamingTransition, Guard, Bus) combined with ecosystem crates (async-openai, tiktoken-rs, reqwest).


#1. Provider Abstraction

Define a common interface for all LLM providers. Each provider is a Transition with the same input/output types.

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmRequest {
    pub model: String,
    pub messages: Vec<ChatMessage>,
    pub max_tokens: u32,
    pub temperature: f32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatMessage {
    pub role: String,     // "system", "user", "assistant"
    pub content: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmResponse {
    pub content: String,
    pub model: String,
    pub provider: String,
    pub usage: TokenUsage,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokenUsage {
    pub input_tokens: u32,
    pub output_tokens: u32,
    pub total_tokens: u32,
}

#2. Provider Transitions

#OpenAI Provider

use async_openai::{Client, types::CreateChatCompletionRequestArgs};

pub struct OpenAiProvider {
    client: Client<async_openai::config::OpenAIConfig>,
}

impl OpenAiProvider {
    pub fn new() -> Self {
        Self { client: Client::new() }
    }
}

#[async_trait]
impl Transition<LlmRequest, LlmResponse> for OpenAiProvider {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        req: LlmRequest,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<LlmResponse, Self::Error> {
        let messages: Vec<_> = req.messages.iter().map(|m| {
            async_openai::types::ChatCompletionRequestMessageArgs::default()
                .role(m.role.as_str())
                .content(m.content.clone())
                .build()
                .unwrap()
        }).collect();

        let request = CreateChatCompletionRequestArgs::default()
            .model(&req.model)
            .max_tokens(req.max_tokens)
            .temperature(req.temperature)
            .messages(messages)
            .build()
            .map_err(|e| e.to_string())?;

        let response = self.client.chat().create(request).await
            .map_err(|e| e.to_string())?;

        let choice = response.choices.first()
            .ok_or("no response choice")?;

        Outcome::next(LlmResponse {
            content: choice.message.content.clone().unwrap_or_default(),
            model: req.model,
            provider: "openai".into(),
            usage: TokenUsage {
                input_tokens: response.usage.as_ref().map(|u| u.prompt_tokens).unwrap_or(0),
                output_tokens: response.usage.as_ref().map(|u| u.completion_tokens).unwrap_or(0),
                total_tokens: response.usage.as_ref().map(|u| u.total_tokens).unwrap_or(0),
            },
        })
    }
}

#Anthropic Provider (reqwest)

pub struct AnthropicProvider {
    client: reqwest::Client,
    api_key: String,
}

impl AnthropicProvider {
    pub fn new(api_key: String) -> Self {
        Self {
            client: reqwest::Client::new(),
            api_key,
        }
    }
}

#[async_trait]
impl Transition<LlmRequest, LlmResponse> for AnthropicProvider {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        req: LlmRequest,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<LlmResponse, Self::Error> {
        let messages: Vec<_> = req.messages.iter()
            .filter(|m| m.role != "system")
            .map(|m| serde_json::json!({"role": m.role, "content": m.content}))
            .collect();

        let system = req.messages.iter()
            .find(|m| m.role == "system")
            .map(|m| m.content.clone())
            .unwrap_or_default();

        let body = serde_json::json!({
            "model": req.model,
            "max_tokens": req.max_tokens,
            "system": system,
            "messages": messages,
        });

        let response = self.client
            .post("https://api.anthropic.com/v1/messages")
            .header("x-api-key", &self.api_key)
            .header("anthropic-version", "2023-06-01")
            .json(&body)
            .send()
            .await
            .map_err(|e| e.to_string())?;

        let json: serde_json::Value = response.json().await
            .map_err(|e| e.to_string())?;

        let content = json["content"][0]["text"].as_str()
            .unwrap_or("").to_string();

        Outcome::next(LlmResponse {
            content,
            model: req.model,
            provider: "anthropic".into(),
            usage: TokenUsage {
                input_tokens: json["usage"]["input_tokens"].as_u64().unwrap_or(0) as u32,
                output_tokens: json["usage"]["output_tokens"].as_u64().unwrap_or(0) as u32,
                total_tokens: (json["usage"]["input_tokens"].as_u64().unwrap_or(0)
                    + json["usage"]["output_tokens"].as_u64().unwrap_or(0)) as u32,
            },
        })
    }
}

#3. Provider Routing

Route requests to different providers based on model name or explicit selection.

use ranvier_core::prelude::*;
use ranvier_runtime::Axon;

#[derive(Debug, Clone)]
pub enum ProviderSelection {
    OpenAi,
    Anthropic,
    Auto,  // select by model name prefix
}

/// Bus type: controls which provider to use.
#[derive(Debug, Clone)]
pub struct ProviderConfig(pub ProviderSelection);

let gateway_pipeline = Axon::typed::<LlmRequest, LlmResponse>("llm-gateway")
    .then_fn("route-provider", |req, _res, bus| async move {
        let config = bus.get_cloned::<ProviderConfig>()
            .map(|c| c.0)
            .unwrap_or(ProviderSelection::Auto);

        let provider = match config {
            ProviderSelection::OpenAi => ProviderSelection::OpenAi,
            ProviderSelection::Anthropic => ProviderSelection::Anthropic,
            ProviderSelection::Auto => {
                if req.model.starts_with("gpt") || req.model.starts_with("o") {
                    ProviderSelection::OpenAi
                } else if req.model.starts_with("claude") {
                    ProviderSelection::Anthropic
                } else {
                    ProviderSelection::OpenAi  // default
                }
            }
        };

        match provider {
            ProviderSelection::OpenAi => {
                OpenAiProvider::new().run(req, &(), bus).await
            }
            ProviderSelection::Anthropic => {
                let key = std::env::var("ANTHROPIC_API_KEY")
                    .map_err(|_| "ANTHROPIC_API_KEY not set".to_string())?;
                AnthropicProvider::new(key).run(req, &(), bus).await
            }
            _ => unreachable!(),
        }
    });

#4. Token Count Guard

Pre-validate that the request doesn't exceed the model's context window.

pub struct TokenCountGuard<T> {
    max_tokens: u32,
    _marker: std::marker::PhantomData<T>,
}

impl<T> TokenCountGuard<T> {
    pub fn with_limit(max_tokens: u32) -> Self {
        Self { max_tokens, _marker: std::marker::PhantomData }
    }
}

#[async_trait]
impl Transition<LlmRequest, LlmRequest> for TokenCountGuard<LlmRequest> {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        req: LlmRequest,
        _resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<LlmRequest, Self::Error> {
        // tiktoken-rs for accurate token counting
        let bpe = tiktoken_rs::get_bpe_from_model(&req.model)
            .unwrap_or_else(|_| tiktoken_rs::cl100k_base().unwrap());

        let total_tokens: usize = req.messages.iter()
            .map(|m| bpe.encode_with_special_tokens(&m.content).len())
            .sum();

        // Store token count in Bus for cost tracking
        bus.insert(InputTokenCount(total_tokens as u32));

        if total_tokens as u32 + req.max_tokens > self.max_tokens {
            Outcome::fault(format!(
                "413 Token limit exceeded: {} input + {} max_output > {} limit",
                total_tokens, req.max_tokens, self.max_tokens
            ))
        } else {
            Outcome::next(req)
        }
    }
}

#[derive(Debug, Clone)]
pub struct InputTokenCount(pub u32);

#5. Cost Budget Guard

Track cumulative cost per tenant and enforce daily/monthly budgets.

use std::sync::Arc;
use dashmap::DashMap;

#[derive(Debug, Clone)]
pub struct CostBudgetGuard<T> {
    /// tenant_id -> (total_cost_cents, last_reset_date)
    ledger: Arc<DashMap<String, (u64, chrono::NaiveDate)>>,
    daily_limit_cents: u64,
    _marker: std::marker::PhantomData<T>,
}

#[async_trait]
impl Transition<LlmRequest, LlmRequest> for CostBudgetGuard<LlmRequest> {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        req: LlmRequest,
        _resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<LlmRequest, Self::Error> {
        let tenant = bus.get_cloned::<super::TenantContext>()
            .map(|t| t.tenant_id)
            .unwrap_or_else(|_| "default".into());

        let today = chrono::Utc::now().date_naive();

        let mut entry = self.ledger.entry(tenant.clone())
            .or_insert((0, today));

        // Reset on new day
        if entry.1 < today {
            entry.0 = 0;
            entry.1 = today;
        }

        // Estimate cost from input token count
        let input_tokens = bus.get_cloned::<InputTokenCount>()
            .map(|t| t.0)
            .unwrap_or(0);

        // Rough cost: $0.01 per 1K tokens = 1 cent per 1K tokens
        let estimated_cost = (input_tokens as u64 + req.max_tokens as u64) / 1000;

        if entry.0 + estimated_cost > self.daily_limit_cents {
            Outcome::fault(format!(
                "429 Budget exceeded: tenant {} has used {} cents today (limit: {})",
                tenant, entry.0, self.daily_limit_cents
            ))
        } else {
            entry.0 += estimated_cost;
            Outcome::next(req)
        }
    }
}

#6. SSE Streaming Proxy

Use StreamingTransition to proxy LLM streaming responses as SSE events.

use ranvier_core::streaming::StreamingTransition;
use futures_util::StreamExt;

pub struct StreamingLlmProxy;

#[async_trait]
impl StreamingTransition for StreamingLlmProxy {
    type Input = LlmRequest;
    type Item = String;       // SSE text chunks
    type Error = String;
    type Result = LlmResponse;

    async fn run_streaming(
        &self,
        req: Self::Input,
        _resources: &(),
        bus: &mut Bus,
        sender: tokio::sync::mpsc::Sender<Self::Item>,
    ) -> Outcome<Self::Result, Self::Error> {
        // Use reqwest to stream from the provider
        let client = reqwest::Client::new();
        let api_key = std::env::var("OPENAI_API_KEY")
            .map_err(|_| "OPENAI_API_KEY not set".to_string())?;

        let body = serde_json::json!({
            "model": req.model,
            "messages": req.messages.iter().map(|m| {
                serde_json::json!({"role": m.role, "content": m.content})
            }).collect::<Vec<_>>(),
            "max_tokens": req.max_tokens,
            "stream": true,
        });

        let response = client
            .post("https://api.openai.com/v1/chat/completions")
            .bearer_auth(&api_key)
            .json(&body)
            .send()
            .await
            .map_err(|e| e.to_string())?;

        let mut full_content = String::new();
        let mut stream = response.bytes_stream();

        while let Some(chunk) = stream.next().await {
            let bytes = chunk.map_err(|e| e.to_string())?;
            let text = String::from_utf8_lossy(&bytes).to_string();

            // Parse SSE data lines
            for line in text.lines() {
                if let Some(data) = line.strip_prefix("data: ") {
                    if data == "[DONE]" { break; }
                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
                        if let Some(content) = json["choices"][0]["delta"]["content"].as_str() {
                            full_content.push_str(content);
                            let _ = sender.send(content.to_string()).await;
                        }
                    }
                }
            }
        }

        Outcome::next(LlmResponse {
            content: full_content,
            model: req.model,
            provider: "openai".into(),
            usage: TokenUsage { input_tokens: 0, output_tokens: 0, total_tokens: 0 },
        })
    }
}

Register as an SSE endpoint:

use ranvier_http::prelude::*;

Ranvier::http()
    .post_sse_typed("/api/chat/stream", streaming_gateway_pipeline())
    .run(())
    .await?;

#7. Automatic Failover

If the primary provider fails, transparently retry with a fallback provider.

let failover_pipeline = Axon::typed::<LlmRequest, LlmResponse>("llm-failover")
    .then_fn("try-primary-then-fallback", |req, _res, bus| async move {
        // Try primary (based on model name)
        let primary_result = if req.model.starts_with("claude") {
            let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
            AnthropicProvider::new(key).run(req.clone(), &(), bus).await
        } else {
            OpenAiProvider::new().run(req.clone(), &(), bus).await
        };

        match primary_result {
            Outcome::Next(resp) => Outcome::next(resp),
            Outcome::Fault(err) => {
                tracing::warn!(error = %err, "primary provider failed, trying fallback");

                // Fallback: remap model for the other provider
                let fallback_req = LlmRequest {
                    model: if req.model.starts_with("claude") {
                        "gpt-4o".into()
                    } else {
                        "claude-sonnet-4-5-20250929".into()
                    },
                    ..req
                };

                if fallback_req.model.starts_with("gpt") {
                    OpenAiProvider::new().run(fallback_req, &(), bus).await
                } else {
                    let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
                    AnthropicProvider::new(key).run(fallback_req, &(), bus).await
                }
            }
            other => other,
        }
    });

#8. Cost Tracking and Audit

Record every LLM call to the audit hash chain for cost attribution.

use ranvier_audit::prelude::*;

let audited_gateway = Axon::typed::<LlmRequest, LlmResponse>("audited-llm")
    .then(TokenCountGuard::with_limit(128_000))
    .then_fn("call-and-audit", |req, _res, bus| async move {
        let result = OpenAiProvider::new().run(req, &(), bus).await;

        if let Outcome::Next(ref resp) = result {
            if let Ok(audit) = bus.get_cloned::<AuditTrail>() {
                let tenant = bus.get_cloned::<super::TenantContext>()
                    .map(|t| t.tenant_id)
                    .unwrap_or_else(|_| "unknown".into());

                audit.record(AuditEntry {
                    action: "llm.call".into(),
                    actor: tenant,
                    resource: format!(
                        "provider={}, model={}, tokens={}",
                        resp.provider, resp.model, resp.usage.total_tokens
                    ),
                    timestamp: chrono::Utc::now(),
                }).await;
            }
        }

        result
    });

#9. Full Gateway Server Example

use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    Ranvier::http()
        .bind("0.0.0.0:8080")
        .guard(RequestIdGuard::new())
        .guard(AccessLogGuard::new())
        .guard(CorsGuard::<()>::permissive())
        .guard(AuthGuard::bearer(vec![
            std::env::var("GATEWAY_API_KEY")?,
        ]))
        .guard(RateLimitGuard::new(60, 60_000))  // 60 req/min
        // Synchronous completion
        .post_typed("/api/chat", gateway_pipeline())
        // SSE streaming
        .post_sse_typed("/api/chat/stream", streaming_gateway_pipeline())
        .run(())
        .await?;

    Ok(())
}

fn gateway_pipeline() -> Axon<LlmRequest, LlmResponse, String, ()> {
    Axon::typed::<LlmRequest, String>("llm-gateway")
        .then(TokenCountGuard::with_limit(128_000))
        .then_fn("route", |req, _res, bus| async move {
            // Auto-route by model name
            if req.model.starts_with("claude") {
                let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
                AnthropicProvider::new(key).run(req, &(), bus).await
            } else {
                OpenAiProvider::new().run(req, &(), bus).await
            }
        })
}

#See Also

  • LLM Pipeline Cookbook — single-provider agent pipeline patterns
  • Streaming Patterns Cookbook — StreamingTransition details
  • Guard Patterns Cookbook — Guard composition for token/cost limits
  • Multi-Tenant Isolation Cookbook — per-tenant cost budgets