#LLM Pipeline Cookbook

Version: 0.36.0 | Updated: 2026-03-20 | Applies to: ranvier-runtime 0.36+ | Category: Cookbook


#Overview

Ranvier models LLM workflows as typed pipelines where each stage -- intent classification, tool execution, response synthesis -- is a Transition with explicit input/output types. This cookbook covers patterns for building LLM agent pipelines with parallel tool calls, PII filtering, and retry/timeout resilience.


#1. Basic LLM Pipeline Structure

A typical LLM agent pipeline has four stages:

graph LR
    CI["Classify Intent"] --> ET["Execute Tools"] --> SR["Synthesize Response"] --> FO["Format Output"]

Each stage is a Transition, composed into an Axon pipeline:

use ranvier_runtime::Axon;

fn chat_pipeline() -> Axon<ChatRequest, ChatResponse, LlmError, AppResources> {
    Axon::typed::<ChatRequest, LlmError>("llm-chat")
        .then(ClassifyIntent)      // ChatRequest -> Intent
        .then(ExecuteTools)        // Intent -> ToolResults
        .then(SynthesizeResponse)  // ToolResults -> RawResponse
        .then(FormatOutput)        // RawResponse -> ChatResponse
}

#2. LlmTransition for Provider Calls

LlmTransition wraps LLM provider API calls as Transitions. It handles serialization, retry, and error mapping.

use ranvier_runtime::llm::{LlmTransition, LlmConfig};

let classify = LlmTransition::new(LlmConfig {
    model: "gpt-4o".into(),
    temperature: 0.0,
    max_tokens: 100,
    system_prompt: "Classify the user intent as one of: question, action, chitchat".into(),
});

let pipeline = Axon::typed::<ChatRequest, LlmError>("classify")
    .then(classify);

#3. Parallel Tool Execution

Use Axon::parallel() when multiple tools can run concurrently. Each branch is an independent Transition, and results are collected into a single output.

use ranvier_runtime::Axon;

#[derive(Clone)]
struct SearchWeb;
#[derive(Clone)]
struct QueryDatabase;
#[derive(Clone)]
struct FetchWeather;
#[derive(Clone)]
struct CalculateMath;

fn tool_executor() -> Axon<Intent, ToolResults, LlmError, AppResources> {
    Axon::typed::<Intent, LlmError>("tool-router")
        .then_fn("route-tools", |intent: Intent, bus: &mut Bus| {
            // Store selected tools in Bus for parallel execution
            bus.insert(SelectedTools(intent.required_tools.clone()));
            Outcome::next(intent)
        })
        .then(Axon::parallel("parallel-tools", vec![
            Box::new(SearchWeb),
            Box::new(QueryDatabase),
            Box::new(FetchWeather),
            Box::new(CalculateMath),
        ]))
        .then_fn("aggregate", |results: Vec<ToolOutput>, bus: &mut Bus| {
            let tool_results = ToolResults {
                outputs: results,
                execution_time: bus.get_cloned::<ExecutionTimer>()
                    .map(|t| t.elapsed())
                    .unwrap_or_default(),
            };
            Outcome::next(tool_results)
        })
}

#4. PII Filtering with Sensitive\

Use ranvier_compliance::Sensitive<T> to redact personally identifiable information from logs while keeping the full value available in the pipeline.

use ranvier_compliance::Sensitive;

#[derive(Debug, Clone, serde::Serialize)]
struct UserMessage {
    text: Sensitive<String>,
    user_id: Sensitive<String>,
    session_id: String, // not sensitive
}

// In release mode, Debug/Display output is redacted:
// UserMessage { text: [REDACTED], user_id: [REDACTED], session_id: "sess-123" }

// Inside a Transition, access the actual value:
let actual_text: &str = &message.text;  // Deref gives access to inner value

#13-Category PII Detection

Ranvier's compliance module detects 13 PII categories:

Category Examples
Email user@example.com
Phone +1-555-123-4567
SSN 123-45-6789
Credit Card 4111-1111-1111-1111
IP Address 192.168.1.1
Date of Birth 1990-01-15
Passport AB1234567
Driver License D12345678
Bank Account 1234567890
Address 123 Main St, City, ST 12345
Name (context-dependent)
Medical ID (pattern-based)
Tax ID 12-3456789
use ranvier_compliance::pii::{PiiDetector, PiiCategory};

let detector = PiiDetector::new();
let findings = detector.scan("Contact me at user@example.com or 555-0123");
// findings: [PiiCategory::Email, PiiCategory::Phone]

// Auto-redact in pipeline
let redacted = detector.redact("Call 555-0123 for details");
// "Call [PHONE] for details"

#5. Timeout and Retry for LLM Calls

LLM API calls tend to be slow and unreliable. Use then_with_timeout and then_with_retry for resilience.

use ranvier_runtime::Axon;
use std::time::Duration;

let pipeline = Axon::typed::<ChatRequest, LlmError>("resilient-chat")
    .then(ClassifyIntent)
    .then_with_timeout(
        ExecuteTools,
        Duration::from_secs(30),  // tool execution timeout
    )
    .then_with_retry(
        SynthesizeResponse,
        3,                         // max 3 attempts
        Duration::from_secs(2),    // delay between retries
    );

#Timeout Behavior

When a Transition exceeds the timeout:

  1. The task is cancelled via tokio::time::timeout
  2. A TimelineEvent::NodeTimeout is recorded in the Axon's timeline
  3. The Outcome becomes Fault with a timeout message

#Retry Behavior

When a Transition returns Fault and retries remain:

  1. The runtime waits the configured delay
  2. Re-executes the same Transition with the same input
  3. Records each attempt in the timeline

Retry is safe only for idempotent operations. LLM API calls are generally safe to retry -- the same prompt may produce different but equally valid responses.


#6. Conversation History via Bus

Store conversation history in the Bus for multi-turn interactions:

#[derive(Debug, Clone)]
struct ConversationHistory(Vec<Message>);

#[derive(Debug, Clone)]
struct Message {
    role: String,  // "user", "assistant", "system"
    content: String,
}

#[async_trait]
impl Transition<ChatRequest, Intent> for ClassifyIntent {
    type Error = LlmError;
    type Resources = AppResources;

    async fn run(
        &self,
        input: ChatRequest,
        resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<Intent, Self::Error> {
        // Read conversation history from Bus
        let history = bus.get_cloned::<ConversationHistory>()
            .unwrap_or(ConversationHistory(vec![]));

        // Build prompt with history context
        let mut messages = history.0.clone();
        messages.push(Message {
            role: "user".into(),
            content: input.message.clone(),
        });

        let response = resources.llm.classify(&messages).await?;

        // Update history in Bus
        let mut updated = history;
        updated.0.push(Message { role: "user".into(), content: input.message });
        updated.0.push(Message { role: "assistant".into(), content: response.raw.clone() });
        bus.insert(updated);

        Outcome::next(response.intent)
    }
}

#7. Tool Call Routing Pattern

Route to different tools based on the LLM classification result:

#[async_trait]
impl Transition<Intent, ToolResults> for ToolRouter {
    type Error = LlmError;
    type Resources = AppResources;

    async fn run(
        &self,
        input: Intent,
        resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<ToolResults, Self::Error> {
        let mut results = Vec::new();

        for tool_name in &input.required_tools {
            let result = match tool_name.as_str() {
                "search" => search_web(&input.query, resources).await,
                "database" => query_database(&input.query, resources).await,
                "weather" => fetch_weather(&input.location, resources).await,
                "calculator" => calculate(&input.expression).await,
                unknown => {
                    tracing::warn!("unknown tool: {}", unknown);
                    ToolOutput::error(format!("unknown tool: {}", unknown))
                }
            };
            results.push(result);
        }

        bus.insert(ToolCallCount(results.len()));
        Outcome::next(ToolResults { outputs: results })
    }
}

#8. Full Pipeline Example

Here is a production LLM agent that combines all the patterns above:

use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;
use ranvier_runtime::Axon;
use std::time::Duration;

fn agent_pipeline() -> Axon<ChatRequest, ChatResponse, LlmError, AppResources> {
    Axon::typed::<ChatRequest, LlmError>("llm-agent")
        // Stage 1: Input validation + PII scan
        .then_fn("validate", |req: ChatRequest, bus: &mut Bus| {
            if req.message.is_empty() {
                return Outcome::fault(LlmError::InvalidInput("empty message".into()));
            }
            let pii = PiiDetector::new().scan(&req.message);
            if !pii.is_empty() {
                bus.insert(PiiFindings(pii));
            }
            Outcome::next(req)
        })
        // Stage 2: Intent classification (with retry)
        .then_with_retry(ClassifyIntent, 2, Duration::from_secs(1))
        // Stage 3: Tool execution (with timeout)
        .then_with_timeout(ToolRouter, Duration::from_secs(30))
        // Stage 4: Response synthesis
        .then_with_retry(SynthesizeResponse, 2, Duration::from_secs(1))
        // Stage 5: Format and redact
        .then_fn("format", |raw: RawResponse, bus: &mut Bus| {
            let pii = bus.get_cloned::<PiiFindings>();
            let response = if pii.is_ok() {
                PiiDetector::new().redact(&raw.text)
            } else {
                raw.text
            };
            Outcome::next(ChatResponse { message: response })
        })
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    Ranvier::http::<AppResources>()
        .bind("0.0.0.0:8080")
        .guard(RequestIdGuard::new())
        .guard(AccessLogGuard::new())
        .guard(RateLimitGuard::new(60, 60_000))
        .post_typed("/api/chat", agent_pipeline())
        .run(app_resources)
        .await
}

#See Also

  • Guard Patterns Cookbook -- Guard composition patterns
  • Bus Access Patterns Cookbook -- Bus usage patterns
  • Saga Compensation Cookbook -- error recovery in pipelines