#LLM Pipeline Cookbook
Version: 0.36.0 | Updated: 2026-03-20 | Applies to: ranvier-runtime 0.36+ | Category: Cookbook
#Overview
Ranvier models LLM workflows as typed pipelines where each stage -- intent classification, tool execution, response synthesis -- is a Transition with explicit input/output types. This cookbook covers patterns for building LLM agent pipelines with parallel tool calls, PII filtering, and retry/timeout resilience.
#1. Basic LLM Pipeline Structure
A typical LLM agent pipeline has four stages:
graph LR
CI["Classify Intent"] --> ET["Execute Tools"] --> SR["Synthesize Response"] --> FO["Format Output"]Each stage is a Transition, composed into an Axon pipeline:
use ranvier_runtime::Axon;
fn chat_pipeline() -> Axon<ChatRequest, ChatResponse, LlmError, AppResources> {
Axon::typed::<ChatRequest, LlmError>("llm-chat")
.then(ClassifyIntent) // ChatRequest -> Intent
.then(ExecuteTools) // Intent -> ToolResults
.then(SynthesizeResponse) // ToolResults -> RawResponse
.then(FormatOutput) // RawResponse -> ChatResponse
}#2. LlmTransition for Provider Calls
LlmTransition wraps LLM provider API calls as Transitions. It handles
serialization, retry, and error mapping.
use ranvier_runtime::llm::{LlmTransition, LlmConfig};
let classify = LlmTransition::new(LlmConfig {
model: "gpt-4o".into(),
temperature: 0.0,
max_tokens: 100,
system_prompt: "Classify the user intent as one of: question, action, chitchat".into(),
});
let pipeline = Axon::typed::<ChatRequest, LlmError>("classify")
.then(classify);#3. Parallel Tool Execution
Use Axon::parallel() when multiple tools can run concurrently. Each
branch is an independent Transition, and results are collected into a single output.
use ranvier_runtime::Axon;
#[derive(Clone)]
struct SearchWeb;
#[derive(Clone)]
struct QueryDatabase;
#[derive(Clone)]
struct FetchWeather;
#[derive(Clone)]
struct CalculateMath;
fn tool_executor() -> Axon<Intent, ToolResults, LlmError, AppResources> {
Axon::typed::<Intent, LlmError>("tool-router")
.then_fn("route-tools", |intent: Intent, bus: &mut Bus| {
// Store selected tools in Bus for parallel execution
bus.insert(SelectedTools(intent.required_tools.clone()));
Outcome::next(intent)
})
.then(Axon::parallel("parallel-tools", vec![
Box::new(SearchWeb),
Box::new(QueryDatabase),
Box::new(FetchWeather),
Box::new(CalculateMath),
]))
.then_fn("aggregate", |results: Vec<ToolOutput>, bus: &mut Bus| {
let tool_results = ToolResults {
outputs: results,
execution_time: bus.get_cloned::<ExecutionTimer>()
.map(|t| t.elapsed())
.unwrap_or_default(),
};
Outcome::next(tool_results)
})
}#4. PII Filtering with Sensitive\
Use ranvier_compliance::Sensitive<T> to redact personally identifiable
information from logs while keeping the full value available in the pipeline.
use ranvier_compliance::Sensitive;
#[derive(Debug, Clone, serde::Serialize)]
struct UserMessage {
text: Sensitive<String>,
user_id: Sensitive<String>,
session_id: String, // not sensitive
}
// In release mode, Debug/Display output is redacted:
// UserMessage { text: [REDACTED], user_id: [REDACTED], session_id: "sess-123" }
// Inside a Transition, access the actual value:
let actual_text: &str = &message.text; // Deref gives access to inner value#13-Category PII Detection
Ranvier's compliance module detects 13 PII categories:
| Category | Examples |
|---|---|
| user@example.com | |
| Phone | +1-555-123-4567 |
| SSN | 123-45-6789 |
| Credit Card | 4111-1111-1111-1111 |
| IP Address | 192.168.1.1 |
| Date of Birth | 1990-01-15 |
| Passport | AB1234567 |
| Driver License | D12345678 |
| Bank Account | 1234567890 |
| Address | 123 Main St, City, ST 12345 |
| Name | (context-dependent) |
| Medical ID | (pattern-based) |
| Tax ID | 12-3456789 |
use ranvier_compliance::pii::{PiiDetector, PiiCategory};
let detector = PiiDetector::new();
let findings = detector.scan("Contact me at user@example.com or 555-0123");
// findings: [PiiCategory::Email, PiiCategory::Phone]
// Auto-redact in pipeline
let redacted = detector.redact("Call 555-0123 for details");
// "Call [PHONE] for details"#5. Timeout and Retry for LLM Calls
LLM API calls tend to be slow and unreliable. Use then_with_timeout and
then_with_retry for resilience.
use ranvier_runtime::Axon;
use std::time::Duration;
let pipeline = Axon::typed::<ChatRequest, LlmError>("resilient-chat")
.then(ClassifyIntent)
.then_with_timeout(
ExecuteTools,
Duration::from_secs(30), // tool execution timeout
)
.then_with_retry(
SynthesizeResponse,
3, // max 3 attempts
Duration::from_secs(2), // delay between retries
);#Timeout Behavior
When a Transition exceeds the timeout:
- The task is cancelled via
tokio::time::timeout - A
TimelineEvent::NodeTimeoutis recorded in the Axon's timeline - The Outcome becomes
Faultwith a timeout message
#Retry Behavior
When a Transition returns Fault and retries remain:
- The runtime waits the configured delay
- Re-executes the same Transition with the same input
- Records each attempt in the timeline
Retry is safe only for idempotent operations. LLM API calls are generally safe to retry -- the same prompt may produce different but equally valid responses.
#6. Conversation History via Bus
Store conversation history in the Bus for multi-turn interactions:
#[derive(Debug, Clone)]
struct ConversationHistory(Vec<Message>);
#[derive(Debug, Clone)]
struct Message {
role: String, // "user", "assistant", "system"
content: String,
}
#[async_trait]
impl Transition<ChatRequest, Intent> for ClassifyIntent {
type Error = LlmError;
type Resources = AppResources;
async fn run(
&self,
input: ChatRequest,
resources: &Self::Resources,
bus: &mut Bus,
) -> Outcome<Intent, Self::Error> {
// Read conversation history from Bus
let history = bus.get_cloned::<ConversationHistory>()
.unwrap_or(ConversationHistory(vec![]));
// Build prompt with history context
let mut messages = history.0.clone();
messages.push(Message {
role: "user".into(),
content: input.message.clone(),
});
let response = resources.llm.classify(&messages).await?;
// Update history in Bus
let mut updated = history;
updated.0.push(Message { role: "user".into(), content: input.message });
updated.0.push(Message { role: "assistant".into(), content: response.raw.clone() });
bus.insert(updated);
Outcome::next(response.intent)
}
}#7. Tool Call Routing Pattern
Route to different tools based on the LLM classification result:
#[async_trait]
impl Transition<Intent, ToolResults> for ToolRouter {
type Error = LlmError;
type Resources = AppResources;
async fn run(
&self,
input: Intent,
resources: &Self::Resources,
bus: &mut Bus,
) -> Outcome<ToolResults, Self::Error> {
let mut results = Vec::new();
for tool_name in &input.required_tools {
let result = match tool_name.as_str() {
"search" => search_web(&input.query, resources).await,
"database" => query_database(&input.query, resources).await,
"weather" => fetch_weather(&input.location, resources).await,
"calculator" => calculate(&input.expression).await,
unknown => {
tracing::warn!("unknown tool: {}", unknown);
ToolOutput::error(format!("unknown tool: {}", unknown))
}
};
results.push(result);
}
bus.insert(ToolCallCount(results.len()));
Outcome::next(ToolResults { outputs: results })
}
}#8. Full Pipeline Example
Here is a production LLM agent that combines all the patterns above:
use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;
use ranvier_runtime::Axon;
use std::time::Duration;
fn agent_pipeline() -> Axon<ChatRequest, ChatResponse, LlmError, AppResources> {
Axon::typed::<ChatRequest, LlmError>("llm-agent")
// Stage 1: Input validation + PII scan
.then_fn("validate", |req: ChatRequest, bus: &mut Bus| {
if req.message.is_empty() {
return Outcome::fault(LlmError::InvalidInput("empty message".into()));
}
let pii = PiiDetector::new().scan(&req.message);
if !pii.is_empty() {
bus.insert(PiiFindings(pii));
}
Outcome::next(req)
})
// Stage 2: Intent classification (with retry)
.then_with_retry(ClassifyIntent, 2, Duration::from_secs(1))
// Stage 3: Tool execution (with timeout)
.then_with_timeout(ToolRouter, Duration::from_secs(30))
// Stage 4: Response synthesis
.then_with_retry(SynthesizeResponse, 2, Duration::from_secs(1))
// Stage 5: Format and redact
.then_fn("format", |raw: RawResponse, bus: &mut Bus| {
let pii = bus.get_cloned::<PiiFindings>();
let response = if pii.is_ok() {
PiiDetector::new().redact(&raw.text)
} else {
raw.text
};
Outcome::next(ChatResponse { message: response })
})
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Ranvier::http::<AppResources>()
.bind("0.0.0.0:8080")
.guard(RequestIdGuard::new())
.guard(AccessLogGuard::new())
.guard(RateLimitGuard::new(60, 60_000))
.post_typed("/api/chat", agent_pipeline())
.run(app_resources)
.await
}#See Also
- Guard Patterns Cookbook -- Guard composition patterns
- Bus Access Patterns Cookbook -- Bus usage patterns
- Saga Compensation Cookbook -- error recovery in pipelines