#Multi-Step Pipeline Cookbook
버전: 0.43.0 | 업데이트: 2026-03-28 | 적용 대상: ranvier-runtime 0.36+ | 카테고리: 쿡북
#개요
Multi-step pipeline 패턴은 각 단계가 서로 다른 출력 타입을 생성하고, 그것이 다음 단계의 입력이 되는 변환 체인입니다. Ranvier의 Axon 빌더는 이러한 타입 상태 진행을 컴파일 타임에 강제합니다 — 만약 단계 A가 Foo를 출력하고 단계 B가 Bar를 기대한다면, 코드는 컴파일되지 않습니다.
#1. 패턴 구조
Input[A] → Transform[A→B] → Transform[B→C] → Transform[C→D] → Output[D]주요 특징:
- 타입 진행: 각 transition은 명확히 구분되는 Input/Output 타입을 가짐
- 컴파일 타임 안전성: 타입 불일치를 컴파일러가 포착
- 조합 가능성: 각 단계는 독립적으로 테스트 가능한 transition
- 스트리밍 지원:
then_stream()을 통해 SSE/청크 출력 가능
#2. 적용 도메인: AI Agent
사용자 의도 분류 → 도구 선택 → 실행 → 응답 포맷팅.
// Types: Query → ClassifiedQuery → ToolResult → Response
let pipeline = Axon::typed::<Query, String>("agent-pipeline")
.then(classify_intent) // Query → ClassifiedQuery
.then(select_tool) // ClassifiedQuery → ToolSelection
.then(execute_tool) // ToolSelection → ToolResult
.then(format_response); // ToolResult → Response#왜 타입 진행인가?
컴파일러는 classify_intent가 ClassifiedQuery를 생성하는지 확인하며, 이것이 정확히 select_tool이 기대하는 것입니다. 실수로 분류 단계를 건너뛰거나 원시 쿼리를 도구 실행기에 전달할 수 없습니다.
#3. 적용 도메인: 데이터 처리 ETL
추출 → 변환 → 적재 파이프라인, 타입 단계로 구성.
// Types: RawRecord → ParsedRecord → EnrichedRecord → StorageResult
let etl = Axon::typed::<RawRecord, String>("etl-pipeline")
.then(parse_csv_row) // RawRecord → ParsedRecord
.then(enrich_with_lookup) // ParsedRecord → EnrichedRecord
.then(validate_schema) // EnrichedRecord → EnrichedRecord (filter)
.then(write_to_database); // EnrichedRecord → StorageResult#4. 적용 도메인: IoT 데이터 처리
센서 읽기 → 정규화 → 이상 탐지 → 알림.
// Types: SensorReading → NormalizedReading → AnalysisResult → AlertDecision
let iot_pipeline = Axon::typed::<SensorReading, String>("iot-analysis")
.then(normalize_reading) // SensorReading → NormalizedReading
.then(detect_anomalies) // NormalizedReading → AnalysisResult
.then(decide_alert); // AnalysisResult → AlertDecision#5. Streaming Pipeline
실시간 출력(SSE, 청크 응답)을 위해 then_stream() 사용:
use ranvier_macros::streaming_transition;
#[streaming_transition]
async fn generate_tokens(
input: ClassifiedQuery,
) -> Result<impl Stream<Item = TextChunk> + Send, String> {
let stream = async_stream::stream! {
for token in llm_inference(input).await {
yield TextChunk { text: token };
}
};
Ok(stream)
}
let streaming = Axon::typed::<Query, String>("chat-stream")
.then(classify_intent)
.then_stream(generate_tokens)
.map_items(|chunk| sanitize(chunk));#Stream 조합
then_stream(): Stream을 생성하는 transitionmap_items(): Stream 구조를 변경하지 않고 각 스트리밍 아이템을 변환- 결과는 HTTP SSE 엔드포인트를 위한
post_sse_typed()와 통합됨
#6. Pipeline에서 오류 처리
각 단계는 독립적으로 fault를 발생시킬 수 있으며, 남은 단계들을 단락시킵니다:
#[transition]
async fn classify_intent(query: Query) -> Outcome<ClassifiedQuery, String> {
if query.text.is_empty() {
return Outcome::Fault("Empty query cannot be classified".into());
}
let intent = classifier.classify(&query.text).await;
if intent.confidence < 0.3 {
return Outcome::Fault(format!(
"Low confidence ({:.0}%) — cannot route",
intent.confidence * 100.0
));
}
Outcome::Next(ClassifiedQuery {
text: query.text,
intent: intent.label,
confidence: intent.confidence,
})
}#7. Bus를 통한 횡단 관심사 데이터
타입 시그니처를 변경하지 않고 단계 간 메타데이터를 전달하려면 Bus를 사용하세요:
#[transition]
async fn classify_intent(
query: Query,
bus: &mut Bus,
) -> Outcome<ClassifiedQuery, String> {
let confidence = 0.92;
bus.insert(confidence); // Available to downstream stages
Outcome::Next(ClassifiedQuery { /* ... */ })
}
#[transition]
async fn format_response(
result: ToolResult,
bus: &mut Bus,
) -> Outcome<Response, String> {
let confidence = bus.get_cloned::<f64>().unwrap_or(0.0);
Outcome::Next(Response {
answer: result.output,
confidence,
})
}#빠른 참조
Start Template: ranvier new my-app --template pipeline
Example: cargo run -p llm-agent-pipeline
Pattern Tag: pipeline| 필요 사항 | 사용 |
|---|---|
| 선형 변환 체인 | Axon::typed::<Input, Error>().then(...) |
| 스트리밍 출력 | .then_stream(streaming_transition) |
| 아이템별 변환 | .map_items(fn) |
| 횡단 관심사 메타데이터 | bus.insert() / bus.get_cloned() |
#참고 자료
llm-agent-pipelineexample — 순수 Axon pipeline 데모sensor-decision-loopexample — IoT pipeline 데모cookbook_streaming_patterns.md— 고급 streaming 패턴cookbook_llm_pipeline.md— LLM 특화 pipeline 패턴