#Multi-Step Pipeline Cookbook

Version: 0.43.0 | Updated: 2026-03-28 | Applies to: ranvier-runtime 0.36+ | Category: Cookbook


#Overview

The multi-step pipeline pattern chains transformations where each stage produces a different output type that becomes the next stage's input. Ranvier's Axon builder enforces this typed state progression at compile time โ€” if stage A outputs Foo and stage B expects Bar, the code won't compile.


#1. Pattern Anatomy

Input[A] โ†’ Transform[Aโ†’B] โ†’ Transform[Bโ†’C] โ†’ Transform[Cโ†’D] โ†’ Output[D]

Key characteristics:

  • Typed progression: Each transition has distinct Input/Output types
  • Compile-time safety: Type mismatches caught by the compiler
  • Composability: Stages are independently testable transitions
  • Streaming support: then_stream() enables SSE/chunked output

#2. Applied Domain: AI Agent

Classify user intent โ†’ select tool โ†’ execute โ†’ format response.

// Types: Query โ†’ ClassifiedQuery โ†’ ToolResult โ†’ Response
let pipeline = Axon::typed::<Query, String>("agent-pipeline")
    .then(classify_intent)    // Query โ†’ ClassifiedQuery
    .then(select_tool)        // ClassifiedQuery โ†’ ToolSelection
    .then(execute_tool)       // ToolSelection โ†’ ToolResult
    .then(format_response);   // ToolResult โ†’ Response

#Why typed progression?

The compiler ensures classify_intent produces ClassifiedQuery, which is exactly what select_tool expects. You cannot accidentally skip classification or pass raw queries to the tool executor.


#3. Applied Domain: Data Processing ETL

Extract โ†’ transform โ†’ load pipeline with typed stages.

// Types: RawRecord โ†’ ParsedRecord โ†’ EnrichedRecord โ†’ StorageResult
let etl = Axon::typed::<RawRecord, String>("etl-pipeline")
    .then(parse_csv_row)       // RawRecord โ†’ ParsedRecord
    .then(enrich_with_lookup)  // ParsedRecord โ†’ EnrichedRecord
    .then(validate_schema)     // EnrichedRecord โ†’ EnrichedRecord (filter)
    .then(write_to_database);  // EnrichedRecord โ†’ StorageResult

#4. Applied Domain: IoT Data Processing

Sensor reading โ†’ normalization โ†’ anomaly detection โ†’ alerting.

// Types: SensorReading โ†’ NormalizedReading โ†’ AnalysisResult โ†’ AlertDecision
let iot_pipeline = Axon::typed::<SensorReading, String>("iot-analysis")
    .then(normalize_reading)     // SensorReading โ†’ NormalizedReading
    .then(detect_anomalies)      // NormalizedReading โ†’ AnalysisResult
    .then(decide_alert);         // AnalysisResult โ†’ AlertDecision

#5. Streaming Pipelines

For real-time output (SSE, chunked responses), use then_stream():

use ranvier_macros::streaming_transition;

#[streaming_transition]
async fn generate_tokens(
    input: ClassifiedQuery,
) -> Result<impl Stream<Item = TextChunk> + Send, String> {
    let stream = async_stream::stream! {
        for token in llm_inference(input).await {
            yield TextChunk { text: token };
        }
    };
    Ok(stream)
}

let streaming = Axon::typed::<Query, String>("chat-stream")
    .then(classify_intent)
    .then_stream(generate_tokens)
    .map_items(|chunk| sanitize(chunk));

#Stream composition

  • then_stream(): Transition that produces a stream
  • map_items(): Transform each streamed item without changing the stream structure
  • The result integrates with post_sse_typed() for HTTP SSE endpoints

#6. Error Handling in Pipelines

Each stage can independently fault, short-circuiting the remaining stages:

#[transition]
async fn classify_intent(query: Query) -> Outcome<ClassifiedQuery, String> {
    if query.text.is_empty() {
        return Outcome::Fault("Empty query cannot be classified".into());
    }

    let intent = classifier.classify(&query.text).await;
    if intent.confidence < 0.3 {
        return Outcome::Fault(format!(
            "Low confidence ({:.0}%) โ€” cannot route",
            intent.confidence * 100.0
        ));
    }

    Outcome::Next(ClassifiedQuery {
        text: query.text,
        intent: intent.label,
        confidence: intent.confidence,
    })
}

#7. Cross-cutting Data via Bus

Use the Bus to pass metadata between stages without changing type signatures:

#[transition]
async fn classify_intent(
    query: Query,
    bus: &mut Bus,
) -> Outcome<ClassifiedQuery, String> {
    let confidence = 0.92;
    bus.insert(confidence);  // Available to downstream stages
    Outcome::Next(ClassifiedQuery { /* ... */ })
}

#[transition]
async fn format_response(
    result: ToolResult,
    bus: &mut Bus,
) -> Outcome<Response, String> {
    let confidence = bus.get_cloned::<f64>().unwrap_or(0.0);
    Outcome::Next(Response {
        answer: result.output,
        confidence,
    })
}

#Quick Reference

Start Template:  ranvier new my-app --template pipeline
Example:         cargo run -p llm-agent-pipeline
Pattern Tag:     pipeline
Need Use
Linear transformation chain Axon::typed::<Input, Error>().then(...)
Streaming output .then_stream(streaming_transition)
Per-item transformation .map_items(fn)
Cross-cutting metadata bus.insert() / bus.get_cloned()

#See Also

  • llm-agent-pipeline example โ€” Pure Axon pipeline demo
  • sensor-decision-loop example โ€” IoT pipeline demo
  • cookbook_streaming_patterns.md โ€” Advanced streaming patterns
  • cookbook_llm_pipeline.md โ€” LLM-specific pipeline patterns