#Multi-Step Pipeline Cookbook

버전: 0.43.0 | 업데이트: 2026-03-28 | 적용 대상: ranvier-runtime 0.36+ | 카테고리: 쿡북


#개요

Multi-step pipeline 패턴은 각 단계가 서로 다른 출력 타입을 생성하고, 그것이 다음 단계의 입력이 되는 변환 체인입니다. Ranvier의 Axon 빌더는 이러한 타입 상태 진행을 컴파일 타임에 강제합니다 — 만약 단계 A가 Foo를 출력하고 단계 B가 Bar를 기대한다면, 코드는 컴파일되지 않습니다.


#1. 패턴 구조

Input[A] → Transform[A→B] → Transform[B→C] → Transform[C→D] → Output[D]

주요 특징:

  • 타입 진행: 각 transition은 명확히 구분되는 Input/Output 타입을 가짐
  • 컴파일 타임 안전성: 타입 불일치를 컴파일러가 포착
  • 조합 가능성: 각 단계는 독립적으로 테스트 가능한 transition
  • 스트리밍 지원: then_stream()을 통해 SSE/청크 출력 가능

#2. 적용 도메인: AI Agent

사용자 의도 분류 → 도구 선택 → 실행 → 응답 포맷팅.

// Types: Query → ClassifiedQuery → ToolResult → Response
let pipeline = Axon::typed::<Query, String>("agent-pipeline")
    .then(classify_intent)    // Query → ClassifiedQuery
    .then(select_tool)        // ClassifiedQuery → ToolSelection
    .then(execute_tool)       // ToolSelection → ToolResult
    .then(format_response);   // ToolResult → Response

#왜 타입 진행인가?

컴파일러는 classify_intentClassifiedQuery를 생성하는지 확인하며, 이것이 정확히 select_tool이 기대하는 것입니다. 실수로 분류 단계를 건너뛰거나 원시 쿼리를 도구 실행기에 전달할 수 없습니다.


#3. 적용 도메인: 데이터 처리 ETL

추출 → 변환 → 적재 파이프라인, 타입 단계로 구성.

// Types: RawRecord → ParsedRecord → EnrichedRecord → StorageResult
let etl = Axon::typed::<RawRecord, String>("etl-pipeline")
    .then(parse_csv_row)       // RawRecord → ParsedRecord
    .then(enrich_with_lookup)  // ParsedRecord → EnrichedRecord
    .then(validate_schema)     // EnrichedRecord → EnrichedRecord (filter)
    .then(write_to_database);  // EnrichedRecord → StorageResult

#4. 적용 도메인: IoT 데이터 처리

센서 읽기 → 정규화 → 이상 탐지 → 알림.

// Types: SensorReading → NormalizedReading → AnalysisResult → AlertDecision
let iot_pipeline = Axon::typed::<SensorReading, String>("iot-analysis")
    .then(normalize_reading)     // SensorReading → NormalizedReading
    .then(detect_anomalies)      // NormalizedReading → AnalysisResult
    .then(decide_alert);         // AnalysisResult → AlertDecision

#5. Streaming Pipeline

실시간 출력(SSE, 청크 응답)을 위해 then_stream() 사용:

use ranvier_macros::streaming_transition;

#[streaming_transition]
async fn generate_tokens(
    input: ClassifiedQuery,
) -> Result<impl Stream<Item = TextChunk> + Send, String> {
    let stream = async_stream::stream! {
        for token in llm_inference(input).await {
            yield TextChunk { text: token };
        }
    };
    Ok(stream)
}

let streaming = Axon::typed::<Query, String>("chat-stream")
    .then(classify_intent)
    .then_stream(generate_tokens)
    .map_items(|chunk| sanitize(chunk));

#Stream 조합

  • then_stream(): Stream을 생성하는 transition
  • map_items(): Stream 구조를 변경하지 않고 각 스트리밍 아이템을 변환
  • 결과는 HTTP SSE 엔드포인트를 위한 post_sse_typed()와 통합됨

#6. Pipeline에서 오류 처리

각 단계는 독립적으로 fault를 발생시킬 수 있으며, 남은 단계들을 단락시킵니다:

#[transition]
async fn classify_intent(query: Query) -> Outcome<ClassifiedQuery, String> {
    if query.text.is_empty() {
        return Outcome::Fault("Empty query cannot be classified".into());
    }

    let intent = classifier.classify(&query.text).await;
    if intent.confidence < 0.3 {
        return Outcome::Fault(format!(
            "Low confidence ({:.0}%) — cannot route",
            intent.confidence * 100.0
        ));
    }

    Outcome::Next(ClassifiedQuery {
        text: query.text,
        intent: intent.label,
        confidence: intent.confidence,
    })
}

#7. Bus를 통한 횡단 관심사 데이터

타입 시그니처를 변경하지 않고 단계 간 메타데이터를 전달하려면 Bus를 사용하세요:

#[transition]
async fn classify_intent(
    query: Query,
    bus: &mut Bus,
) -> Outcome<ClassifiedQuery, String> {
    let confidence = 0.92;
    bus.insert(confidence);  // Available to downstream stages
    Outcome::Next(ClassifiedQuery { /* ... */ })
}

#[transition]
async fn format_response(
    result: ToolResult,
    bus: &mut Bus,
) -> Outcome<Response, String> {
    let confidence = bus.get_cloned::<f64>().unwrap_or(0.0);
    Outcome::Next(Response {
        answer: result.output,
        confidence,
    })
}

#빠른 참조

Start Template:  ranvier new my-app --template pipeline
Example:         cargo run -p llm-agent-pipeline
Pattern Tag:     pipeline
필요 사항 사용
선형 변환 체인 Axon::typed::<Input, Error>().then(...)
스트리밍 출력 .then_stream(streaming_transition)
아이템별 변환 .map_items(fn)
횡단 관심사 메타데이터 bus.insert() / bus.get_cloned()

#참고 자료

  • llm-agent-pipeline example — 순수 Axon pipeline 데모
  • sensor-decision-loop example — IoT pipeline 데모
  • cookbook_streaming_patterns.md — 고급 streaming 패턴
  • cookbook_llm_pipeline.md — LLM 특화 pipeline 패턴