#LLM 파이프라인 Cookbook

버전: 0.36.0 | 업데이트: 2026-03-20 | 적용 대상: ranvier-runtime 0.36+ | 카테고리: 쿡북


#개요

Ranvier는 LLM 워크플로우를 타입 기반 파이프라인으로 구성합니다. 의도 분류, 도구 실행, 응답 합성 등 각 단계가 명시적 입출력 타입을 가진 Transition입니다. 이 Cookbook에서는 병렬 도구 호출, PII 필터링, 재시도/타임아웃을 갖춘 LLM 에이전트 파이프라인을 구축하는 방법을 소개합니다.


#1. 기본 LLM 파이프라인 구조

일반적인 LLM 에이전트 파이프라인은 네 단계로 구성됩니다:

graph LR
    CI["의도 분류"] --> ET["도구 실행"] --> SR["응답 합성"] --> FO["출력 포매팅"]

각 단계는 Transition이며, Axon 파이프라인으로 구성됩니다:

use ranvier_runtime::Axon;

fn chat_pipeline() -> Axon<ChatRequest, ChatResponse, LlmError, AppResources> {
    Axon::typed::<ChatRequest, LlmError>("llm-chat")
        .then(ClassifyIntent)      // ChatRequest -> Intent
        .then(ExecuteTools)        // Intent -> ToolResults
        .then(SynthesizeResponse)  // ToolResults -> RawResponse
        .then(FormatOutput)        // RawResponse -> ChatResponse
}

#2. 프로바이더 호출을 위한 LlmTransition

LlmTransition은 LLM 프로바이더 API 호출을 Transition으로 래핑합니다. 직렬화, 재시도, 오류 매핑을 처리합니다.

use ranvier_runtime::llm::{LlmTransition, LlmConfig};

let classify = LlmTransition::new(LlmConfig {
    model: "gpt-4o".into(),
    temperature: 0.0,
    max_tokens: 100,
    system_prompt: "사용자 의도를 question, action, chitchat 중 하나로 분류하세요".into(),
});

let pipeline = Axon::typed::<ChatRequest, LlmError>("classify")
    .then(classify);

#3. 병렬 도구 실행

여러 도구가 동시에 실행될 수 있을 때 Axon::parallel()을 사용합니다. 각 브랜치는 독립적인 Transition이며, 결과가 단일 출력으로 수집됩니다.

use ranvier_runtime::Axon;

#[derive(Clone)]
struct SearchWeb;
#[derive(Clone)]
struct QueryDatabase;
#[derive(Clone)]
struct FetchWeather;
#[derive(Clone)]
struct CalculateMath;

fn tool_executor() -> Axon<Intent, ToolResults, LlmError, AppResources> {
    Axon::typed::<Intent, LlmError>("tool-router")
        .then_fn("route-tools", |intent: Intent, bus: &mut Bus| {
            // 선택된 도구를 Bus에 저장하여 병렬 실행
            bus.insert(SelectedTools(intent.required_tools.clone()));
            Outcome::next(intent)
        })
        .then(Axon::parallel("parallel-tools", vec![
            Box::new(SearchWeb),
            Box::new(QueryDatabase),
            Box::new(FetchWeather),
            Box::new(CalculateMath),
        ]))
        .then_fn("aggregate", |results: Vec<ToolOutput>, bus: &mut Bus| {
            let tool_results = ToolResults {
                outputs: results,
                execution_time: bus.get_cloned::<ExecutionTimer>()
                    .map(|t| t.elapsed())
                    .unwrap_or_default(),
            };
            Outcome::next(tool_results)
        })
}

#4. Sensitive\를 이용한 PII 필터링

ranvier_compliance::Sensitive<T>를 사용하여 로그에서 개인 식별 정보를 리다크션하면서 파이프라인에서는 전체 값을 유지합니다.

use ranvier_compliance::Sensitive;

#[derive(Debug, Clone, serde::Serialize)]
struct UserMessage {
    text: Sensitive<String>,
    user_id: Sensitive<String>,
    session_id: String, // 민감하지 않음
}

// 릴리스 모드에서 Debug/Display 출력이 리다크션됨:
// UserMessage { text: [REDACTED], user_id: [REDACTED], session_id: "sess-123" }

// Transition 내에서 실제 값에 접근:
let actual_text: &str = &message.text;  // Deref로 내부 값 접근

#13개 카테고리 PII 감지

Ranvier의 컴플라이언스 모듈은 13개 PII 카테고리를 감지합니다:

카테고리 예시
이메일 user@example.com
전화번호 +1-555-123-4567
사회보장번호 123-45-6789
신용카드 4111-1111-1111-1111
IP 주소 192.168.1.1
생년월일 1990-01-15
여권 AB1234567
운전면허 D12345678
은행계좌 1234567890
주소 123 Main St, City, ST 12345
이름 (문맥 의존)
의료 ID (패턴 기반)
세금 ID 12-3456789
use ranvier_compliance::pii::{PiiDetector, PiiCategory};

let detector = PiiDetector::new();
let findings = detector.scan("user@example.com 또는 555-0123으로 연락하세요");
// findings: [PiiCategory::Email, PiiCategory::Phone]

// 파이프라인에서 자동 리다크션
let redacted = detector.redact("555-0123으로 전화하세요");
// "자세한 내용은 [PHONE]으로"

#5. LLM 호출을 위한 타임아웃과 재시도

LLM API 호출은 느리고 불안정합니다. 복원력을 위해 then_with_timeoutthen_with_retry를 사용하세요.

use ranvier_runtime::Axon;
use std::time::Duration;

let pipeline = Axon::typed::<ChatRequest, LlmError>("resilient-chat")
    .then(ClassifyIntent)
    .then_with_timeout(
        ExecuteTools,
        Duration::from_secs(30),  // 도구 실행 타임아웃
    )
    .then_with_retry(
        SynthesizeResponse,
        3,                         // 최대 3회 시도
        Duration::from_secs(2),    // 재시도 간 지연
    );

#타임아웃 동작

Transition이 타임아웃을 초과하면:

  1. tokio::time::timeout을 통해 작업 취소
  2. Axon의 타임라인에 TimelineEvent::NodeTimeout 기록
  3. Outcome이 타임아웃 메시지와 함께 Fault로 변환

#재시도 동작

Transition이 Fault를 반환하고 재시도 횟수가 남아 있으면:

  1. 런타임이 설정된 지연 시간을 대기
  2. 동일한 입력으로 같은 Transition을 재실행
  3. 타임라인에 각 시도를 기록

재시도는 멱등적 작업에만 안전합니다. LLM API 호출은 일반적으로 멱등적입니다(같은 프롬프트가 다르지만 유효한 응답을 생성).


#6. Bus를 통한 대화 히스토리

멀티턴 상호작용을 위해 Bus에 대화 히스토리를 저장합니다:

#[derive(Debug, Clone)]
struct ConversationHistory(Vec<Message>);

#[derive(Debug, Clone)]
struct Message {
    role: String,  // "user", "assistant", "system"
    content: String,
}

#[async_trait]
impl Transition<ChatRequest, Intent> for ClassifyIntent {
    type Error = LlmError;
    type Resources = AppResources;

    async fn run(
        &self,
        input: ChatRequest,
        resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<Intent, Self::Error> {
        // Bus에서 대화 히스토리 읽기
        let history = bus.get_cloned::<ConversationHistory>()
            .unwrap_or(ConversationHistory(vec![]));

        // 히스토리 컨텍스트로 프롬프트 구성
        let mut messages = history.0.clone();
        messages.push(Message {
            role: "user".into(),
            content: input.message.clone(),
        });

        let response = resources.llm.classify(&messages).await?;

        // Bus에서 히스토리 업데이트
        let mut updated = history;
        updated.0.push(Message { role: "user".into(), content: input.message });
        updated.0.push(Message { role: "assistant".into(), content: response.raw.clone() });
        bus.insert(updated);

        Outcome::next(response.intent)
    }
}

#7. 도구 호출 라우팅 패턴

LLM 분류를 기반으로 Bus 데이터를 사용하여 다른 도구로 라우팅합니다:

#[async_trait]
impl Transition<Intent, ToolResults> for ToolRouter {
    type Error = LlmError;
    type Resources = AppResources;

    async fn run(
        &self,
        input: Intent,
        resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<ToolResults, Self::Error> {
        let mut results = Vec::new();

        for tool_name in &input.required_tools {
            let result = match tool_name.as_str() {
                "search" => search_web(&input.query, resources).await,
                "database" => query_database(&input.query, resources).await,
                "weather" => fetch_weather(&input.location, resources).await,
                "calculator" => calculate(&input.expression).await,
                unknown => {
                    tracing::warn!("알 수 없는 도구: {}", unknown);
                    ToolOutput::error(format!("알 수 없는 도구: {}", unknown))
                }
            };
            results.push(result);
        }

        bus.insert(ToolCallCount(results.len()));
        Outcome::next(ToolResults { outputs: results })
    }
}

#8. 전체 파이프라인 예제

모든 패턴을 조합한 프로덕션 LLM 에이전트:

use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;
use ranvier_runtime::Axon;
use std::time::Duration;

fn agent_pipeline() -> Axon<ChatRequest, ChatResponse, LlmError, AppResources> {
    Axon::typed::<ChatRequest, LlmError>("llm-agent")
        // 단계 1: 입력 검증 + PII 스캔
        .then_fn("validate", |req: ChatRequest, bus: &mut Bus| {
            if req.message.is_empty() {
                return Outcome::fault(LlmError::InvalidInput("빈 메시지".into()));
            }
            let pii = PiiDetector::new().scan(&req.message);
            if !pii.is_empty() {
                bus.insert(PiiFindings(pii));
            }
            Outcome::next(req)
        })
        // 단계 2: 의도 분류 (재시도 포함)
        .then_with_retry(ClassifyIntent, 2, Duration::from_secs(1))
        // 단계 3: 도구 실행 (타임아웃 포함)
        .then_with_timeout(ToolRouter, Duration::from_secs(30))
        // 단계 4: 응답 합성
        .then_with_retry(SynthesizeResponse, 2, Duration::from_secs(1))
        // 단계 5: 포매팅 및 리다크션
        .then_fn("format", |raw: RawResponse, bus: &mut Bus| {
            let pii = bus.get_cloned::<PiiFindings>();
            let response = if pii.is_ok() {
                PiiDetector::new().redact(&raw.text)
            } else {
                raw.text
            };
            Outcome::next(ChatResponse { message: response })
        })
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    Ranvier::http::<AppResources>()
        .bind("0.0.0.0:8080")
        .guard(RequestIdGuard::new())
        .guard(AccessLogGuard::new())
        .guard(RateLimitGuard::new(60, 60_000))
        .post_typed("/api/chat", agent_pipeline())
        .run(app_resources)
        .await
}

#관련 문서

  • Guard 패턴 Cookbook -- Guard 구성 패턴
  • Bus 접근 패턴 Cookbook -- Bus 사용 패턴
  • Saga 보상 Cookbook -- 파이프라인에서의 오류 복구