#Multi-Step Pipeline Cookbook
Version: 0.43.0 | Updated: 2026-03-28 | Applies to: ranvier-runtime 0.36+ | Category: Cookbook
#Overview
The multi-step pipeline pattern chains transformations where each stage produces
a different output type that becomes the next stage's input. Ranvier's Axon builder
enforces this typed state progression at compile time โ if stage A outputs Foo
and stage B expects Bar, the code won't compile.
#1. Pattern Anatomy
Input[A] โ Transform[AโB] โ Transform[BโC] โ Transform[CโD] โ Output[D]Key characteristics:
- Typed progression: Each transition has distinct Input/Output types
- Compile-time safety: Type mismatches caught by the compiler
- Composability: Stages are independently testable transitions
- Streaming support:
then_stream()enables SSE/chunked output
#2. Applied Domain: AI Agent
Classify user intent โ select tool โ execute โ format response.
// Types: Query โ ClassifiedQuery โ ToolResult โ Response
let pipeline = Axon::typed::<Query, String>("agent-pipeline")
.then(classify_intent) // Query โ ClassifiedQuery
.then(select_tool) // ClassifiedQuery โ ToolSelection
.then(execute_tool) // ToolSelection โ ToolResult
.then(format_response); // ToolResult โ Response#Why typed progression?
The compiler ensures classify_intent produces ClassifiedQuery, which is
exactly what select_tool expects. You cannot accidentally skip classification
or pass raw queries to the tool executor.
#3. Applied Domain: Data Processing ETL
Extract โ transform โ load pipeline with typed stages.
// Types: RawRecord โ ParsedRecord โ EnrichedRecord โ StorageResult
let etl = Axon::typed::<RawRecord, String>("etl-pipeline")
.then(parse_csv_row) // RawRecord โ ParsedRecord
.then(enrich_with_lookup) // ParsedRecord โ EnrichedRecord
.then(validate_schema) // EnrichedRecord โ EnrichedRecord (filter)
.then(write_to_database); // EnrichedRecord โ StorageResult#4. Applied Domain: IoT Data Processing
Sensor reading โ normalization โ anomaly detection โ alerting.
// Types: SensorReading โ NormalizedReading โ AnalysisResult โ AlertDecision
let iot_pipeline = Axon::typed::<SensorReading, String>("iot-analysis")
.then(normalize_reading) // SensorReading โ NormalizedReading
.then(detect_anomalies) // NormalizedReading โ AnalysisResult
.then(decide_alert); // AnalysisResult โ AlertDecision#5. Streaming Pipelines
For real-time output (SSE, chunked responses), use then_stream():
use ranvier_macros::streaming_transition;
#[streaming_transition]
async fn generate_tokens(
input: ClassifiedQuery,
) -> Result<impl Stream<Item = TextChunk> + Send, String> {
let stream = async_stream::stream! {
for token in llm_inference(input).await {
yield TextChunk { text: token };
}
};
Ok(stream)
}
let streaming = Axon::typed::<Query, String>("chat-stream")
.then(classify_intent)
.then_stream(generate_tokens)
.map_items(|chunk| sanitize(chunk));#Stream composition
then_stream(): Transition that produces a streammap_items(): Transform each streamed item without changing the stream structure- The result integrates with
post_sse_typed()for HTTP SSE endpoints
#6. Error Handling in Pipelines
Each stage can independently fault, short-circuiting the remaining stages:
#[transition]
async fn classify_intent(query: Query) -> Outcome<ClassifiedQuery, String> {
if query.text.is_empty() {
return Outcome::Fault("Empty query cannot be classified".into());
}
let intent = classifier.classify(&query.text).await;
if intent.confidence < 0.3 {
return Outcome::Fault(format!(
"Low confidence ({:.0}%) โ cannot route",
intent.confidence * 100.0
));
}
Outcome::Next(ClassifiedQuery {
text: query.text,
intent: intent.label,
confidence: intent.confidence,
})
}#7. Cross-cutting Data via Bus
Use the Bus to pass metadata between stages without changing type signatures:
#[transition]
async fn classify_intent(
query: Query,
bus: &mut Bus,
) -> Outcome<ClassifiedQuery, String> {
let confidence = 0.92;
bus.insert(confidence); // Available to downstream stages
Outcome::Next(ClassifiedQuery { /* ... */ })
}
#[transition]
async fn format_response(
result: ToolResult,
bus: &mut Bus,
) -> Outcome<Response, String> {
let confidence = bus.get_cloned::<f64>().unwrap_or(0.0);
Outcome::Next(Response {
answer: result.output,
confidence,
})
}#Quick Reference
Start Template: ranvier new my-app --template pipeline
Example: cargo run -p llm-agent-pipeline
Pattern Tag: pipeline| Need | Use |
|---|---|
| Linear transformation chain | Axon::typed::<Input, Error>().then(...) |
| Streaming output | .then_stream(streaming_transition) |
| Per-item transformation | .map_items(fn) |
| Cross-cutting metadata | bus.insert() / bus.get_cloned() |
#See Also
llm-agent-pipelineexample โ Pure Axon pipeline demosensor-decision-loopexample โ IoT pipeline democookbook_streaming_patterns.mdโ Advanced streaming patternscookbook_llm_pipeline.mdโ LLM-specific pipeline patterns