#LLM Gateway Cookbook
Version: 0.40.0 | Updated: 2026-03-23 | Applies to: ranvier-core 0.40+ | Category: Cookbook
#Overview
While cookbook_llm_pipeline covers single-provider LLM agent patterns, this cookbook
covers the gateway layer that sits in front of multiple LLM providers — routing
requests, counting tokens, enforcing cost budgets, streaming proxy, and automatic
failover.
All patterns use existing Ranvier primitives (Transition, StreamingTransition, Guard, Bus) combined with ecosystem crates (async-openai, tiktoken-rs, reqwest).
#1. Provider Abstraction
Define a common interface for all LLM providers. Each provider is a Transition with the same input/output types.
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmRequest {
pub model: String,
pub messages: Vec<ChatMessage>,
pub max_tokens: u32,
pub temperature: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatMessage {
pub role: String, // "system", "user", "assistant"
pub content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmResponse {
pub content: String,
pub model: String,
pub provider: String,
pub usage: TokenUsage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokenUsage {
pub input_tokens: u32,
pub output_tokens: u32,
pub total_tokens: u32,
}#2. Provider Transitions
#OpenAI Provider
use async_openai::{Client, types::CreateChatCompletionRequestArgs};
pub struct OpenAiProvider {
client: Client<async_openai::config::OpenAIConfig>,
}
impl OpenAiProvider {
pub fn new() -> Self {
Self { client: Client::new() }
}
}
#[async_trait]
impl Transition<LlmRequest, LlmResponse> for OpenAiProvider {
type Error = String;
type Resources = ();
async fn run(
&self,
req: LlmRequest,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<LlmResponse, Self::Error> {
let messages: Vec<_> = req.messages.iter().map(|m| {
async_openai::types::ChatCompletionRequestMessageArgs::default()
.role(m.role.as_str())
.content(m.content.clone())
.build()
.unwrap()
}).collect();
let request = CreateChatCompletionRequestArgs::default()
.model(&req.model)
.max_tokens(req.max_tokens)
.temperature(req.temperature)
.messages(messages)
.build()
.map_err(|e| e.to_string())?;
let response = self.client.chat().create(request).await
.map_err(|e| e.to_string())?;
let choice = response.choices.first()
.ok_or("no response choice")?;
Outcome::next(LlmResponse {
content: choice.message.content.clone().unwrap_or_default(),
model: req.model,
provider: "openai".into(),
usage: TokenUsage {
input_tokens: response.usage.as_ref().map(|u| u.prompt_tokens).unwrap_or(0),
output_tokens: response.usage.as_ref().map(|u| u.completion_tokens).unwrap_or(0),
total_tokens: response.usage.as_ref().map(|u| u.total_tokens).unwrap_or(0),
},
})
}
}#Anthropic Provider (reqwest)
pub struct AnthropicProvider {
client: reqwest::Client,
api_key: String,
}
impl AnthropicProvider {
pub fn new(api_key: String) -> Self {
Self {
client: reqwest::Client::new(),
api_key,
}
}
}
#[async_trait]
impl Transition<LlmRequest, LlmResponse> for AnthropicProvider {
type Error = String;
type Resources = ();
async fn run(
&self,
req: LlmRequest,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<LlmResponse, Self::Error> {
let messages: Vec<_> = req.messages.iter()
.filter(|m| m.role != "system")
.map(|m| serde_json::json!({"role": m.role, "content": m.content}))
.collect();
let system = req.messages.iter()
.find(|m| m.role == "system")
.map(|m| m.content.clone())
.unwrap_or_default();
let body = serde_json::json!({
"model": req.model,
"max_tokens": req.max_tokens,
"system": system,
"messages": messages,
});
let response = self.client
.post("https://api.anthropic.com/v1/messages")
.header("x-api-key", &self.api_key)
.header("anthropic-version", "2023-06-01")
.json(&body)
.send()
.await
.map_err(|e| e.to_string())?;
let json: serde_json::Value = response.json().await
.map_err(|e| e.to_string())?;
let content = json["content"][0]["text"].as_str()
.unwrap_or("").to_string();
Outcome::next(LlmResponse {
content,
model: req.model,
provider: "anthropic".into(),
usage: TokenUsage {
input_tokens: json["usage"]["input_tokens"].as_u64().unwrap_or(0) as u32,
output_tokens: json["usage"]["output_tokens"].as_u64().unwrap_or(0) as u32,
total_tokens: (json["usage"]["input_tokens"].as_u64().unwrap_or(0)
+ json["usage"]["output_tokens"].as_u64().unwrap_or(0)) as u32,
},
})
}
}#3. Provider Routing
Route requests to different providers based on model name or explicit selection.
use ranvier_core::prelude::*;
use ranvier_runtime::Axon;
#[derive(Debug, Clone)]
pub enum ProviderSelection {
OpenAi,
Anthropic,
Auto, // select by model name prefix
}
/// Bus type: controls which provider to use.
#[derive(Debug, Clone)]
pub struct ProviderConfig(pub ProviderSelection);
let gateway_pipeline = Axon::typed::<LlmRequest, LlmResponse>("llm-gateway")
.then_fn("route-provider", |req, _res, bus| async move {
let config = bus.get_cloned::<ProviderConfig>()
.map(|c| c.0)
.unwrap_or(ProviderSelection::Auto);
let provider = match config {
ProviderSelection::OpenAi => ProviderSelection::OpenAi,
ProviderSelection::Anthropic => ProviderSelection::Anthropic,
ProviderSelection::Auto => {
if req.model.starts_with("gpt") || req.model.starts_with("o") {
ProviderSelection::OpenAi
} else if req.model.starts_with("claude") {
ProviderSelection::Anthropic
} else {
ProviderSelection::OpenAi // default
}
}
};
match provider {
ProviderSelection::OpenAi => {
OpenAiProvider::new().run(req, &(), bus).await
}
ProviderSelection::Anthropic => {
let key = std::env::var("ANTHROPIC_API_KEY")
.map_err(|_| "ANTHROPIC_API_KEY not set".to_string())?;
AnthropicProvider::new(key).run(req, &(), bus).await
}
_ => unreachable!(),
}
});#4. Token Count Guard
Pre-validate that the request doesn't exceed the model's context window.
pub struct TokenCountGuard<T> {
max_tokens: u32,
_marker: std::marker::PhantomData<T>,
}
impl<T> TokenCountGuard<T> {
pub fn with_limit(max_tokens: u32) -> Self {
Self { max_tokens, _marker: std::marker::PhantomData }
}
}
#[async_trait]
impl Transition<LlmRequest, LlmRequest> for TokenCountGuard<LlmRequest> {
type Error = String;
type Resources = ();
async fn run(
&self,
req: LlmRequest,
_resources: &Self::Resources,
bus: &mut Bus,
) -> Outcome<LlmRequest, Self::Error> {
// tiktoken-rs for accurate token counting
let bpe = tiktoken_rs::get_bpe_from_model(&req.model)
.unwrap_or_else(|_| tiktoken_rs::cl100k_base().unwrap());
let total_tokens: usize = req.messages.iter()
.map(|m| bpe.encode_with_special_tokens(&m.content).len())
.sum();
// Store token count in Bus for cost tracking
bus.insert(InputTokenCount(total_tokens as u32));
if total_tokens as u32 + req.max_tokens > self.max_tokens {
Outcome::fault(format!(
"413 Token limit exceeded: {} input + {} max_output > {} limit",
total_tokens, req.max_tokens, self.max_tokens
))
} else {
Outcome::next(req)
}
}
}
#[derive(Debug, Clone)]
pub struct InputTokenCount(pub u32);#5. Cost Budget Guard
Track cumulative cost per tenant and enforce daily/monthly budgets.
use std::sync::Arc;
use dashmap::DashMap;
#[derive(Debug, Clone)]
pub struct CostBudgetGuard<T> {
/// tenant_id -> (total_cost_cents, last_reset_date)
ledger: Arc<DashMap<String, (u64, chrono::NaiveDate)>>,
daily_limit_cents: u64,
_marker: std::marker::PhantomData<T>,
}
#[async_trait]
impl Transition<LlmRequest, LlmRequest> for CostBudgetGuard<LlmRequest> {
type Error = String;
type Resources = ();
async fn run(
&self,
req: LlmRequest,
_resources: &Self::Resources,
bus: &mut Bus,
) -> Outcome<LlmRequest, Self::Error> {
let tenant = bus.get_cloned::<super::TenantContext>()
.map(|t| t.tenant_id)
.unwrap_or_else(|_| "default".into());
let today = chrono::Utc::now().date_naive();
let mut entry = self.ledger.entry(tenant.clone())
.or_insert((0, today));
// Reset on new day
if entry.1 < today {
entry.0 = 0;
entry.1 = today;
}
// Estimate cost from input token count
let input_tokens = bus.get_cloned::<InputTokenCount>()
.map(|t| t.0)
.unwrap_or(0);
// Rough cost: $0.01 per 1K tokens = 1 cent per 1K tokens
let estimated_cost = (input_tokens as u64 + req.max_tokens as u64) / 1000;
if entry.0 + estimated_cost > self.daily_limit_cents {
Outcome::fault(format!(
"429 Budget exceeded: tenant {} has used {} cents today (limit: {})",
tenant, entry.0, self.daily_limit_cents
))
} else {
entry.0 += estimated_cost;
Outcome::next(req)
}
}
}#6. SSE Streaming Proxy
Use StreamingTransition to proxy LLM streaming responses as SSE events.
use ranvier_core::streaming::StreamingTransition;
use futures_util::StreamExt;
pub struct StreamingLlmProxy;
#[async_trait]
impl StreamingTransition for StreamingLlmProxy {
type Input = LlmRequest;
type Item = String; // SSE text chunks
type Error = String;
type Result = LlmResponse;
async fn run_streaming(
&self,
req: Self::Input,
_resources: &(),
bus: &mut Bus,
sender: tokio::sync::mpsc::Sender<Self::Item>,
) -> Outcome<Self::Result, Self::Error> {
// Use reqwest to stream from the provider
let client = reqwest::Client::new();
let api_key = std::env::var("OPENAI_API_KEY")
.map_err(|_| "OPENAI_API_KEY not set".to_string())?;
let body = serde_json::json!({
"model": req.model,
"messages": req.messages.iter().map(|m| {
serde_json::json!({"role": m.role, "content": m.content})
}).collect::<Vec<_>>(),
"max_tokens": req.max_tokens,
"stream": true,
});
let response = client
.post("https://api.openai.com/v1/chat/completions")
.bearer_auth(&api_key)
.json(&body)
.send()
.await
.map_err(|e| e.to_string())?;
let mut full_content = String::new();
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let bytes = chunk.map_err(|e| e.to_string())?;
let text = String::from_utf8_lossy(&bytes).to_string();
// Parse SSE data lines
for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" { break; }
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(content) = json["choices"][0]["delta"]["content"].as_str() {
full_content.push_str(content);
let _ = sender.send(content.to_string()).await;
}
}
}
}
}
Outcome::next(LlmResponse {
content: full_content,
model: req.model,
provider: "openai".into(),
usage: TokenUsage { input_tokens: 0, output_tokens: 0, total_tokens: 0 },
})
}
}Register as an SSE endpoint:
use ranvier_http::prelude::*;
Ranvier::http()
.post_sse_typed("/api/chat/stream", streaming_gateway_pipeline())
.run(())
.await?;#7. Automatic Failover
If the primary provider fails, transparently retry with a fallback provider.
let failover_pipeline = Axon::typed::<LlmRequest, LlmResponse>("llm-failover")
.then_fn("try-primary-then-fallback", |req, _res, bus| async move {
// Try primary (based on model name)
let primary_result = if req.model.starts_with("claude") {
let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
AnthropicProvider::new(key).run(req.clone(), &(), bus).await
} else {
OpenAiProvider::new().run(req.clone(), &(), bus).await
};
match primary_result {
Outcome::Next(resp) => Outcome::next(resp),
Outcome::Fault(err) => {
tracing::warn!(error = %err, "primary provider failed, trying fallback");
// Fallback: remap model for the other provider
let fallback_req = LlmRequest {
model: if req.model.starts_with("claude") {
"gpt-4o".into()
} else {
"claude-sonnet-4-5-20250929".into()
},
..req
};
if fallback_req.model.starts_with("gpt") {
OpenAiProvider::new().run(fallback_req, &(), bus).await
} else {
let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
AnthropicProvider::new(key).run(fallback_req, &(), bus).await
}
}
other => other,
}
});#8. Cost Tracking and Audit
Record every LLM call to the audit hash chain for cost attribution.
use ranvier_audit::prelude::*;
let audited_gateway = Axon::typed::<LlmRequest, LlmResponse>("audited-llm")
.then(TokenCountGuard::with_limit(128_000))
.then_fn("call-and-audit", |req, _res, bus| async move {
let result = OpenAiProvider::new().run(req, &(), bus).await;
if let Outcome::Next(ref resp) = result {
if let Ok(audit) = bus.get_cloned::<AuditTrail>() {
let tenant = bus.get_cloned::<super::TenantContext>()
.map(|t| t.tenant_id)
.unwrap_or_else(|_| "unknown".into());
audit.record(AuditEntry {
action: "llm.call".into(),
actor: tenant,
resource: format!(
"provider={}, model={}, tokens={}",
resp.provider, resp.model, resp.usage.total_tokens
),
timestamp: chrono::Utc::now(),
}).await;
}
}
result
});#9. Full Gateway Server Example
use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ranvier::http()
.bind("0.0.0.0:8080")
.guard(RequestIdGuard::new())
.guard(AccessLogGuard::new())
.guard(CorsGuard::<()>::permissive())
.guard(AuthGuard::bearer(vec![
std::env::var("GATEWAY_API_KEY")?,
]))
.guard(RateLimitGuard::new(60, 60_000)) // 60 req/min
// Synchronous completion
.post_typed("/api/chat", gateway_pipeline())
// SSE streaming
.post_sse_typed("/api/chat/stream", streaming_gateway_pipeline())
.run(())
.await?;
Ok(())
}
fn gateway_pipeline() -> Axon<LlmRequest, LlmResponse, String, ()> {
Axon::typed::<LlmRequest, String>("llm-gateway")
.then(TokenCountGuard::with_limit(128_000))
.then_fn("route", |req, _res, bus| async move {
// Auto-route by model name
if req.model.starts_with("claude") {
let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
AnthropicProvider::new(key).run(req, &(), bus).await
} else {
OpenAiProvider::new().run(req, &(), bus).await
}
})
}#See Also
- LLM Pipeline Cookbook — single-provider agent pipeline patterns
- Streaming Patterns Cookbook — StreamingTransition details
- Guard Patterns Cookbook — Guard composition for token/cost limits
- Multi-Tenant Isolation Cookbook — per-tenant cost budgets