#LLM 게이트웨이 Cookbook

버전: 0.40.0 | 업데이트: 2026-03-23 | 적용 대상: ranvier-core 0.40+ | 카테고리: 쿡북


#Overview

cookbook_llm_pipeline이 단일 프로바이더 LLM 에이전트 패턴을 다룬다면, 이 쿡북은 여러 LLM 프로바이더 앞에 위치하는 게이트웨이 계층을 다룬다 — 요청 라우팅, 토큰 카운팅, 비용 예산 관리, 스트리밍 프록시, 자동 폴백.

모든 패턴은 기존 Ranvier 프리미티브(Transition, StreamingTransition, Guard, Bus)와 생태계 crate(async-openai, tiktoken-rs, reqwest)를 조합하여 구현한다.


#1. 프로바이더 추상화

모든 LLM 프로바이더를 위한 공통 인터페이스를 정의한다. 각 프로바이더는 동일한 입력/출력 타입을 가진 Transition이다.

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmRequest {
    pub model: String,
    pub messages: Vec<ChatMessage>,
    pub max_tokens: u32,
    pub temperature: f32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatMessage {
    pub role: String,     // "system", "user", "assistant"
    pub content: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmResponse {
    pub content: String,
    pub model: String,
    pub provider: String,
    pub usage: TokenUsage,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokenUsage {
    pub input_tokens: u32,
    pub output_tokens: u32,
    pub total_tokens: u32,
}

#2. 프로바이더 Transition

#OpenAI 프로바이더

use async_openai::{Client, types::CreateChatCompletionRequestArgs};

pub struct OpenAiProvider {
    client: Client<async_openai::config::OpenAIConfig>,
}

impl OpenAiProvider {
    pub fn new() -> Self {
        Self { client: Client::new() }
    }
}

#[async_trait]
impl Transition<LlmRequest, LlmResponse> for OpenAiProvider {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        req: LlmRequest,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<LlmResponse, Self::Error> {
        let messages: Vec<_> = req.messages.iter().map(|m| {
            async_openai::types::ChatCompletionRequestMessageArgs::default()
                .role(m.role.as_str())
                .content(m.content.clone())
                .build()
                .unwrap()
        }).collect();

        let request = CreateChatCompletionRequestArgs::default()
            .model(&req.model)
            .max_tokens(req.max_tokens)
            .temperature(req.temperature)
            .messages(messages)
            .build()
            .map_err(|e| e.to_string())?;

        let response = self.client.chat().create(request).await
            .map_err(|e| e.to_string())?;

        let choice = response.choices.first()
            .ok_or("응답 없음")?;

        Outcome::next(LlmResponse {
            content: choice.message.content.clone().unwrap_or_default(),
            model: req.model,
            provider: "openai".into(),
            usage: TokenUsage {
                input_tokens: response.usage.as_ref().map(|u| u.prompt_tokens).unwrap_or(0),
                output_tokens: response.usage.as_ref().map(|u| u.completion_tokens).unwrap_or(0),
                total_tokens: response.usage.as_ref().map(|u| u.total_tokens).unwrap_or(0),
            },
        })
    }
}

#Anthropic 프로바이더 (reqwest)

pub struct AnthropicProvider {
    client: reqwest::Client,
    api_key: String,
}

impl AnthropicProvider {
    pub fn new(api_key: String) -> Self {
        Self {
            client: reqwest::Client::new(),
            api_key,
        }
    }
}

#[async_trait]
impl Transition<LlmRequest, LlmResponse> for AnthropicProvider {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        req: LlmRequest,
        _resources: &Self::Resources,
        _bus: &mut Bus,
    ) -> Outcome<LlmResponse, Self::Error> {
        let messages: Vec<_> = req.messages.iter()
            .filter(|m| m.role != "system")
            .map(|m| serde_json::json!({"role": m.role, "content": m.content}))
            .collect();

        let system = req.messages.iter()
            .find(|m| m.role == "system")
            .map(|m| m.content.clone())
            .unwrap_or_default();

        let body = serde_json::json!({
            "model": req.model,
            "max_tokens": req.max_tokens,
            "system": system,
            "messages": messages,
        });

        let response = self.client
            .post("https://api.anthropic.com/v1/messages")
            .header("x-api-key", &self.api_key)
            .header("anthropic-version", "2023-06-01")
            .json(&body)
            .send()
            .await
            .map_err(|e| e.to_string())?;

        let json: serde_json::Value = response.json().await
            .map_err(|e| e.to_string())?;

        let content = json["content"][0]["text"].as_str()
            .unwrap_or("").to_string();

        Outcome::next(LlmResponse {
            content,
            model: req.model,
            provider: "anthropic".into(),
            usage: TokenUsage {
                input_tokens: json["usage"]["input_tokens"].as_u64().unwrap_or(0) as u32,
                output_tokens: json["usage"]["output_tokens"].as_u64().unwrap_or(0) as u32,
                total_tokens: (json["usage"]["input_tokens"].as_u64().unwrap_or(0)
                    + json["usage"]["output_tokens"].as_u64().unwrap_or(0)) as u32,
            },
        })
    }
}

#3. 프로바이더 라우팅

모델 이름 또는 명시적 선택에 따라 요청을 다른 프로바이더로 라우팅한다.

use ranvier_core::prelude::*;
use ranvier_runtime::Axon;

#[derive(Debug, Clone)]
pub enum ProviderSelection {
    OpenAi,
    Anthropic,
    Auto,  // 모델 이름 접두사로 자동 선택
}

/// Bus 타입: 어떤 프로바이더를 사용할지 제어.
#[derive(Debug, Clone)]
pub struct ProviderConfig(pub ProviderSelection);

let gateway_pipeline = Axon::typed::<LlmRequest, LlmResponse>("llm-gateway")
    .then_fn("route-provider", |req, _res, bus| async move {
        let config = bus.get_cloned::<ProviderConfig>()
            .map(|c| c.0)
            .unwrap_or(ProviderSelection::Auto);

        let provider = match config {
            ProviderSelection::OpenAi => ProviderSelection::OpenAi,
            ProviderSelection::Anthropic => ProviderSelection::Anthropic,
            ProviderSelection::Auto => {
                if req.model.starts_with("gpt") || req.model.starts_with("o") {
                    ProviderSelection::OpenAi
                } else if req.model.starts_with("claude") {
                    ProviderSelection::Anthropic
                } else {
                    ProviderSelection::OpenAi  // 기본값
                }
            }
        };

        match provider {
            ProviderSelection::OpenAi => {
                OpenAiProvider::new().run(req, &(), bus).await
            }
            ProviderSelection::Anthropic => {
                let key = std::env::var("ANTHROPIC_API_KEY")
                    .map_err(|_| "ANTHROPIC_API_KEY 미설정".to_string())?;
                AnthropicProvider::new(key).run(req, &(), bus).await
            }
            _ => unreachable!(),
        }
    });

#4. 토큰 카운트 Guard

요청이 모델의 컨텍스트 윈도우를 초과하지 않는지 사전 검증한다.

pub struct TokenCountGuard<T> {
    max_tokens: u32,
    _marker: std::marker::PhantomData<T>,
}

impl<T> TokenCountGuard<T> {
    pub fn with_limit(max_tokens: u32) -> Self {
        Self { max_tokens, _marker: std::marker::PhantomData }
    }
}

#[async_trait]
impl Transition<LlmRequest, LlmRequest> for TokenCountGuard<LlmRequest> {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        req: LlmRequest,
        _resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<LlmRequest, Self::Error> {
        // tiktoken-rs로 정확한 토큰 수 계산
        let bpe = tiktoken_rs::get_bpe_from_model(&req.model)
            .unwrap_or_else(|_| tiktoken_rs::cl100k_base().unwrap());

        let total_tokens: usize = req.messages.iter()
            .map(|m| bpe.encode_with_special_tokens(&m.content).len())
            .sum();

        // 비용 추적을 위해 토큰 수를 Bus에 저장
        bus.insert(InputTokenCount(total_tokens as u32));

        if total_tokens as u32 + req.max_tokens > self.max_tokens {
            Outcome::fault(format!(
                "413 토큰 한도 초과: {} 입력 + {} 최대출력 > {} 한도",
                total_tokens, req.max_tokens, self.max_tokens
            ))
        } else {
            Outcome::next(req)
        }
    }
}

#[derive(Debug, Clone)]
pub struct InputTokenCount(pub u32);

#5. 비용 예산 Guard

테넌트별 누적 비용을 추적하고 일별/월별 예산을 강제한다.

use std::sync::Arc;
use dashmap::DashMap;

#[derive(Debug, Clone)]
pub struct CostBudgetGuard<T> {
    /// tenant_id -> (총비용_센트, 마지막_리셋_날짜)
    ledger: Arc<DashMap<String, (u64, chrono::NaiveDate)>>,
    daily_limit_cents: u64,
    _marker: std::marker::PhantomData<T>,
}

#[async_trait]
impl Transition<LlmRequest, LlmRequest> for CostBudgetGuard<LlmRequest> {
    type Error = String;
    type Resources = ();

    async fn run(
        &self,
        req: LlmRequest,
        _resources: &Self::Resources,
        bus: &mut Bus,
    ) -> Outcome<LlmRequest, Self::Error> {
        let tenant = bus.get_cloned::<super::TenantContext>()
            .map(|t| t.tenant_id)
            .unwrap_or_else(|_| "default".into());

        let today = chrono::Utc::now().date_naive();

        let mut entry = self.ledger.entry(tenant.clone())
            .or_insert((0, today));

        // 새 날짜면 리셋
        if entry.1 < today {
            entry.0 = 0;
            entry.1 = today;
        }

        // 입력 토큰 수에서 비용 추정
        let input_tokens = bus.get_cloned::<InputTokenCount>()
            .map(|t| t.0)
            .unwrap_or(0);

        // 대략적 비용: 1K 토큰당 $0.01 = 1센트
        let estimated_cost = (input_tokens as u64 + req.max_tokens as u64) / 1000;

        if entry.0 + estimated_cost > self.daily_limit_cents {
            Outcome::fault(format!(
                "429 예산 초과: 테넌트 {}가 오늘 {}센트 사용 (한도: {})",
                tenant, entry.0, self.daily_limit_cents
            ))
        } else {
            entry.0 += estimated_cost;
            Outcome::next(req)
        }
    }
}

#6. SSE 스트리밍 프록시

StreamingTransition을 사용하여 LLM 스트리밍 응답을 SSE 이벤트로 프록시한다.

use ranvier_core::streaming::StreamingTransition;
use futures_util::StreamExt;

pub struct StreamingLlmProxy;

#[async_trait]
impl StreamingTransition for StreamingLlmProxy {
    type Input = LlmRequest;
    type Item = String;       // SSE 텍스트 청크
    type Error = String;
    type Result = LlmResponse;

    async fn run_streaming(
        &self,
        req: Self::Input,
        _resources: &(),
        bus: &mut Bus,
        sender: tokio::sync::mpsc::Sender<Self::Item>,
    ) -> Outcome<Self::Result, Self::Error> {
        let client = reqwest::Client::new();
        let api_key = std::env::var("OPENAI_API_KEY")
            .map_err(|_| "OPENAI_API_KEY 미설정".to_string())?;

        let body = serde_json::json!({
            "model": req.model,
            "messages": req.messages.iter().map(|m| {
                serde_json::json!({"role": m.role, "content": m.content})
            }).collect::<Vec<_>>(),
            "max_tokens": req.max_tokens,
            "stream": true,
        });

        let response = client
            .post("https://api.openai.com/v1/chat/completions")
            .bearer_auth(&api_key)
            .json(&body)
            .send()
            .await
            .map_err(|e| e.to_string())?;

        let mut full_content = String::new();
        let mut stream = response.bytes_stream();

        while let Some(chunk) = stream.next().await {
            let bytes = chunk.map_err(|e| e.to_string())?;
            let text = String::from_utf8_lossy(&bytes).to_string();

            // SSE 데이터 라인 파싱
            for line in text.lines() {
                if let Some(data) = line.strip_prefix("data: ") {
                    if data == "[DONE]" { break; }
                    if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
                        if let Some(content) = json["choices"][0]["delta"]["content"].as_str() {
                            full_content.push_str(content);
                            let _ = sender.send(content.to_string()).await;
                        }
                    }
                }
            }
        }

        Outcome::next(LlmResponse {
            content: full_content,
            model: req.model,
            provider: "openai".into(),
            usage: TokenUsage { input_tokens: 0, output_tokens: 0, total_tokens: 0 },
        })
    }
}

SSE 엔드포인트 등록:

use ranvier_http::prelude::*;

Ranvier::http()
    .post_sse_typed("/api/chat/stream", streaming_gateway_pipeline())
    .run(())
    .await?;

#7. 자동 폴백

1차 프로바이더 실패 시 폴백 프로바이더로 투명하게 재시도한다.

let failover_pipeline = Axon::typed::<LlmRequest, LlmResponse>("llm-failover")
    .then_fn("try-primary-then-fallback", |req, _res, bus| async move {
        // 모델 이름 기반으로 1차 시도
        let primary_result = if req.model.starts_with("claude") {
            let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
            AnthropicProvider::new(key).run(req.clone(), &(), bus).await
        } else {
            OpenAiProvider::new().run(req.clone(), &(), bus).await
        };

        match primary_result {
            Outcome::Next(resp) => Outcome::next(resp),
            Outcome::Fault(err) => {
                tracing::warn!(error = %err, "1차 프로바이더 실패, 폴백 시도");

                // 폴백: 다른 프로바이더에 맞게 모델 매핑
                let fallback_req = LlmRequest {
                    model: if req.model.starts_with("claude") {
                        "gpt-4o".into()
                    } else {
                        "claude-sonnet-4-5-20250929".into()
                    },
                    ..req
                };

                if fallback_req.model.starts_with("gpt") {
                    OpenAiProvider::new().run(fallback_req, &(), bus).await
                } else {
                    let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
                    AnthropicProvider::new(key).run(fallback_req, &(), bus).await
                }
            }
            other => other,
        }
    });

#8. 비용 추적 및 감사

비용 귀속을 위해 모든 LLM 호출을 감사 해시 체인에 기록한다.

use ranvier_audit::prelude::*;

let audited_gateway = Axon::typed::<LlmRequest, LlmResponse>("audited-llm")
    .then(TokenCountGuard::with_limit(128_000))
    .then_fn("call-and-audit", |req, _res, bus| async move {
        let result = OpenAiProvider::new().run(req, &(), bus).await;

        if let Outcome::Next(ref resp) = result {
            if let Ok(audit) = bus.get_cloned::<AuditTrail>() {
                let tenant = bus.get_cloned::<super::TenantContext>()
                    .map(|t| t.tenant_id)
                    .unwrap_or_else(|_| "unknown".into());

                audit.record(AuditEntry {
                    action: "llm.call".into(),
                    actor: tenant,
                    resource: format!(
                        "provider={}, model={}, tokens={}",
                        resp.provider, resp.model, resp.usage.total_tokens
                    ),
                    timestamp: chrono::Utc::now(),
                }).await;
            }
        }

        result
    });

#9. 전체 게이트웨이 서버 예제

use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    Ranvier::http()
        .bind("0.0.0.0:8080")
        .guard(RequestIdGuard::new())
        .guard(AccessLogGuard::new())
        .guard(CorsGuard::<()>::permissive())
        .guard(AuthGuard::bearer(vec![
            std::env::var("GATEWAY_API_KEY")?,
        ]))
        .guard(RateLimitGuard::new(60, 60_000))  // 60 req/min
        // 동기 완성
        .post_typed("/api/chat", gateway_pipeline())
        // SSE 스트리밍
        .post_sse_typed("/api/chat/stream", streaming_gateway_pipeline())
        .run(())
        .await?;

    Ok(())
}

fn gateway_pipeline() -> Axon<LlmRequest, LlmResponse, String, ()> {
    Axon::typed::<LlmRequest, String>("llm-gateway")
        .then(TokenCountGuard::with_limit(128_000))
        .then_fn("route", |req, _res, bus| async move {
            // 모델 이름으로 자동 라우팅
            if req.model.starts_with("claude") {
                let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
                AnthropicProvider::new(key).run(req, &(), bus).await
            } else {
                OpenAiProvider::new().run(req, &(), bus).await
            }
        })
}

#See Also

  • LLM Pipeline Cookbook — 단일 프로바이더 에이전트 파이프라인 패턴
  • Streaming Patterns Cookbook — StreamingTransition 상세
  • Guard Patterns Cookbook — 토큰/비용 제한 Guard 조합
  • Multi-Tenant Isolation Cookbook — 테넌트별 비용 예산