#LLM 게이트웨이 Cookbook
버전: 0.40.0 | 업데이트: 2026-03-23 | 적용 대상: ranvier-core 0.40+ | 카테고리: 쿡북
#Overview
cookbook_llm_pipeline이 단일 프로바이더 LLM 에이전트 패턴을 다룬다면, 이 쿡북은
여러 LLM 프로바이더 앞에 위치하는 게이트웨이 계층을 다룬다 — 요청 라우팅,
토큰 카운팅, 비용 예산 관리, 스트리밍 프록시, 자동 폴백.
모든 패턴은 기존 Ranvier 프리미티브(Transition, StreamingTransition, Guard, Bus)와 생태계 crate(async-openai, tiktoken-rs, reqwest)를 조합하여 구현한다.
#1. 프로바이더 추상화
모든 LLM 프로바이더를 위한 공통 인터페이스를 정의한다. 각 프로바이더는 동일한 입력/출력 타입을 가진 Transition이다.
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmRequest {
pub model: String,
pub messages: Vec<ChatMessage>,
pub max_tokens: u32,
pub temperature: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatMessage {
pub role: String, // "system", "user", "assistant"
pub content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmResponse {
pub content: String,
pub model: String,
pub provider: String,
pub usage: TokenUsage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokenUsage {
pub input_tokens: u32,
pub output_tokens: u32,
pub total_tokens: u32,
}#2. 프로바이더 Transition
#OpenAI 프로바이더
use async_openai::{Client, types::CreateChatCompletionRequestArgs};
pub struct OpenAiProvider {
client: Client<async_openai::config::OpenAIConfig>,
}
impl OpenAiProvider {
pub fn new() -> Self {
Self { client: Client::new() }
}
}
#[async_trait]
impl Transition<LlmRequest, LlmResponse> for OpenAiProvider {
type Error = String;
type Resources = ();
async fn run(
&self,
req: LlmRequest,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<LlmResponse, Self::Error> {
let messages: Vec<_> = req.messages.iter().map(|m| {
async_openai::types::ChatCompletionRequestMessageArgs::default()
.role(m.role.as_str())
.content(m.content.clone())
.build()
.unwrap()
}).collect();
let request = CreateChatCompletionRequestArgs::default()
.model(&req.model)
.max_tokens(req.max_tokens)
.temperature(req.temperature)
.messages(messages)
.build()
.map_err(|e| e.to_string())?;
let response = self.client.chat().create(request).await
.map_err(|e| e.to_string())?;
let choice = response.choices.first()
.ok_or("응답 없음")?;
Outcome::next(LlmResponse {
content: choice.message.content.clone().unwrap_or_default(),
model: req.model,
provider: "openai".into(),
usage: TokenUsage {
input_tokens: response.usage.as_ref().map(|u| u.prompt_tokens).unwrap_or(0),
output_tokens: response.usage.as_ref().map(|u| u.completion_tokens).unwrap_or(0),
total_tokens: response.usage.as_ref().map(|u| u.total_tokens).unwrap_or(0),
},
})
}
}#Anthropic 프로바이더 (reqwest)
pub struct AnthropicProvider {
client: reqwest::Client,
api_key: String,
}
impl AnthropicProvider {
pub fn new(api_key: String) -> Self {
Self {
client: reqwest::Client::new(),
api_key,
}
}
}
#[async_trait]
impl Transition<LlmRequest, LlmResponse> for AnthropicProvider {
type Error = String;
type Resources = ();
async fn run(
&self,
req: LlmRequest,
_resources: &Self::Resources,
_bus: &mut Bus,
) -> Outcome<LlmResponse, Self::Error> {
let messages: Vec<_> = req.messages.iter()
.filter(|m| m.role != "system")
.map(|m| serde_json::json!({"role": m.role, "content": m.content}))
.collect();
let system = req.messages.iter()
.find(|m| m.role == "system")
.map(|m| m.content.clone())
.unwrap_or_default();
let body = serde_json::json!({
"model": req.model,
"max_tokens": req.max_tokens,
"system": system,
"messages": messages,
});
let response = self.client
.post("https://api.anthropic.com/v1/messages")
.header("x-api-key", &self.api_key)
.header("anthropic-version", "2023-06-01")
.json(&body)
.send()
.await
.map_err(|e| e.to_string())?;
let json: serde_json::Value = response.json().await
.map_err(|e| e.to_string())?;
let content = json["content"][0]["text"].as_str()
.unwrap_or("").to_string();
Outcome::next(LlmResponse {
content,
model: req.model,
provider: "anthropic".into(),
usage: TokenUsage {
input_tokens: json["usage"]["input_tokens"].as_u64().unwrap_or(0) as u32,
output_tokens: json["usage"]["output_tokens"].as_u64().unwrap_or(0) as u32,
total_tokens: (json["usage"]["input_tokens"].as_u64().unwrap_or(0)
+ json["usage"]["output_tokens"].as_u64().unwrap_or(0)) as u32,
},
})
}
}#3. 프로바이더 라우팅
모델 이름 또는 명시적 선택에 따라 요청을 다른 프로바이더로 라우팅한다.
use ranvier_core::prelude::*;
use ranvier_runtime::Axon;
#[derive(Debug, Clone)]
pub enum ProviderSelection {
OpenAi,
Anthropic,
Auto, // 모델 이름 접두사로 자동 선택
}
/// Bus 타입: 어떤 프로바이더를 사용할지 제어.
#[derive(Debug, Clone)]
pub struct ProviderConfig(pub ProviderSelection);
let gateway_pipeline = Axon::typed::<LlmRequest, LlmResponse>("llm-gateway")
.then_fn("route-provider", |req, _res, bus| async move {
let config = bus.get_cloned::<ProviderConfig>()
.map(|c| c.0)
.unwrap_or(ProviderSelection::Auto);
let provider = match config {
ProviderSelection::OpenAi => ProviderSelection::OpenAi,
ProviderSelection::Anthropic => ProviderSelection::Anthropic,
ProviderSelection::Auto => {
if req.model.starts_with("gpt") || req.model.starts_with("o") {
ProviderSelection::OpenAi
} else if req.model.starts_with("claude") {
ProviderSelection::Anthropic
} else {
ProviderSelection::OpenAi // 기본값
}
}
};
match provider {
ProviderSelection::OpenAi => {
OpenAiProvider::new().run(req, &(), bus).await
}
ProviderSelection::Anthropic => {
let key = std::env::var("ANTHROPIC_API_KEY")
.map_err(|_| "ANTHROPIC_API_KEY 미설정".to_string())?;
AnthropicProvider::new(key).run(req, &(), bus).await
}
_ => unreachable!(),
}
});#4. 토큰 카운트 Guard
요청이 모델의 컨텍스트 윈도우를 초과하지 않는지 사전 검증한다.
pub struct TokenCountGuard<T> {
max_tokens: u32,
_marker: std::marker::PhantomData<T>,
}
impl<T> TokenCountGuard<T> {
pub fn with_limit(max_tokens: u32) -> Self {
Self { max_tokens, _marker: std::marker::PhantomData }
}
}
#[async_trait]
impl Transition<LlmRequest, LlmRequest> for TokenCountGuard<LlmRequest> {
type Error = String;
type Resources = ();
async fn run(
&self,
req: LlmRequest,
_resources: &Self::Resources,
bus: &mut Bus,
) -> Outcome<LlmRequest, Self::Error> {
// tiktoken-rs로 정확한 토큰 수 계산
let bpe = tiktoken_rs::get_bpe_from_model(&req.model)
.unwrap_or_else(|_| tiktoken_rs::cl100k_base().unwrap());
let total_tokens: usize = req.messages.iter()
.map(|m| bpe.encode_with_special_tokens(&m.content).len())
.sum();
// 비용 추적을 위해 토큰 수를 Bus에 저장
bus.insert(InputTokenCount(total_tokens as u32));
if total_tokens as u32 + req.max_tokens > self.max_tokens {
Outcome::fault(format!(
"413 토큰 한도 초과: {} 입력 + {} 최대출력 > {} 한도",
total_tokens, req.max_tokens, self.max_tokens
))
} else {
Outcome::next(req)
}
}
}
#[derive(Debug, Clone)]
pub struct InputTokenCount(pub u32);#5. 비용 예산 Guard
테넌트별 누적 비용을 추적하고 일별/월별 예산을 강제한다.
use std::sync::Arc;
use dashmap::DashMap;
#[derive(Debug, Clone)]
pub struct CostBudgetGuard<T> {
/// tenant_id -> (총비용_센트, 마지막_리셋_날짜)
ledger: Arc<DashMap<String, (u64, chrono::NaiveDate)>>,
daily_limit_cents: u64,
_marker: std::marker::PhantomData<T>,
}
#[async_trait]
impl Transition<LlmRequest, LlmRequest> for CostBudgetGuard<LlmRequest> {
type Error = String;
type Resources = ();
async fn run(
&self,
req: LlmRequest,
_resources: &Self::Resources,
bus: &mut Bus,
) -> Outcome<LlmRequest, Self::Error> {
let tenant = bus.get_cloned::<super::TenantContext>()
.map(|t| t.tenant_id)
.unwrap_or_else(|_| "default".into());
let today = chrono::Utc::now().date_naive();
let mut entry = self.ledger.entry(tenant.clone())
.or_insert((0, today));
// 새 날짜면 리셋
if entry.1 < today {
entry.0 = 0;
entry.1 = today;
}
// 입력 토큰 수에서 비용 추정
let input_tokens = bus.get_cloned::<InputTokenCount>()
.map(|t| t.0)
.unwrap_or(0);
// 대략적 비용: 1K 토큰당 $0.01 = 1센트
let estimated_cost = (input_tokens as u64 + req.max_tokens as u64) / 1000;
if entry.0 + estimated_cost > self.daily_limit_cents {
Outcome::fault(format!(
"429 예산 초과: 테넌트 {}가 오늘 {}센트 사용 (한도: {})",
tenant, entry.0, self.daily_limit_cents
))
} else {
entry.0 += estimated_cost;
Outcome::next(req)
}
}
}#6. SSE 스트리밍 프록시
StreamingTransition을 사용하여 LLM 스트리밍 응답을 SSE 이벤트로 프록시한다.
use ranvier_core::streaming::StreamingTransition;
use futures_util::StreamExt;
pub struct StreamingLlmProxy;
#[async_trait]
impl StreamingTransition for StreamingLlmProxy {
type Input = LlmRequest;
type Item = String; // SSE 텍스트 청크
type Error = String;
type Result = LlmResponse;
async fn run_streaming(
&self,
req: Self::Input,
_resources: &(),
bus: &mut Bus,
sender: tokio::sync::mpsc::Sender<Self::Item>,
) -> Outcome<Self::Result, Self::Error> {
let client = reqwest::Client::new();
let api_key = std::env::var("OPENAI_API_KEY")
.map_err(|_| "OPENAI_API_KEY 미설정".to_string())?;
let body = serde_json::json!({
"model": req.model,
"messages": req.messages.iter().map(|m| {
serde_json::json!({"role": m.role, "content": m.content})
}).collect::<Vec<_>>(),
"max_tokens": req.max_tokens,
"stream": true,
});
let response = client
.post("https://api.openai.com/v1/chat/completions")
.bearer_auth(&api_key)
.json(&body)
.send()
.await
.map_err(|e| e.to_string())?;
let mut full_content = String::new();
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let bytes = chunk.map_err(|e| e.to_string())?;
let text = String::from_utf8_lossy(&bytes).to_string();
// SSE 데이터 라인 파싱
for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" { break; }
if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
if let Some(content) = json["choices"][0]["delta"]["content"].as_str() {
full_content.push_str(content);
let _ = sender.send(content.to_string()).await;
}
}
}
}
}
Outcome::next(LlmResponse {
content: full_content,
model: req.model,
provider: "openai".into(),
usage: TokenUsage { input_tokens: 0, output_tokens: 0, total_tokens: 0 },
})
}
}SSE 엔드포인트 등록:
use ranvier_http::prelude::*;
Ranvier::http()
.post_sse_typed("/api/chat/stream", streaming_gateway_pipeline())
.run(())
.await?;#7. 자동 폴백
1차 프로바이더 실패 시 폴백 프로바이더로 투명하게 재시도한다.
let failover_pipeline = Axon::typed::<LlmRequest, LlmResponse>("llm-failover")
.then_fn("try-primary-then-fallback", |req, _res, bus| async move {
// 모델 이름 기반으로 1차 시도
let primary_result = if req.model.starts_with("claude") {
let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
AnthropicProvider::new(key).run(req.clone(), &(), bus).await
} else {
OpenAiProvider::new().run(req.clone(), &(), bus).await
};
match primary_result {
Outcome::Next(resp) => Outcome::next(resp),
Outcome::Fault(err) => {
tracing::warn!(error = %err, "1차 프로바이더 실패, 폴백 시도");
// 폴백: 다른 프로바이더에 맞게 모델 매핑
let fallback_req = LlmRequest {
model: if req.model.starts_with("claude") {
"gpt-4o".into()
} else {
"claude-sonnet-4-5-20250929".into()
},
..req
};
if fallback_req.model.starts_with("gpt") {
OpenAiProvider::new().run(fallback_req, &(), bus).await
} else {
let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
AnthropicProvider::new(key).run(fallback_req, &(), bus).await
}
}
other => other,
}
});#8. 비용 추적 및 감사
비용 귀속을 위해 모든 LLM 호출을 감사 해시 체인에 기록한다.
use ranvier_audit::prelude::*;
let audited_gateway = Axon::typed::<LlmRequest, LlmResponse>("audited-llm")
.then(TokenCountGuard::with_limit(128_000))
.then_fn("call-and-audit", |req, _res, bus| async move {
let result = OpenAiProvider::new().run(req, &(), bus).await;
if let Outcome::Next(ref resp) = result {
if let Ok(audit) = bus.get_cloned::<AuditTrail>() {
let tenant = bus.get_cloned::<super::TenantContext>()
.map(|t| t.tenant_id)
.unwrap_or_else(|_| "unknown".into());
audit.record(AuditEntry {
action: "llm.call".into(),
actor: tenant,
resource: format!(
"provider={}, model={}, tokens={}",
resp.provider, resp.model, resp.usage.total_tokens
),
timestamp: chrono::Utc::now(),
}).await;
}
}
result
});#9. 전체 게이트웨이 서버 예제
use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ranvier::http()
.bind("0.0.0.0:8080")
.guard(RequestIdGuard::new())
.guard(AccessLogGuard::new())
.guard(CorsGuard::<()>::permissive())
.guard(AuthGuard::bearer(vec![
std::env::var("GATEWAY_API_KEY")?,
]))
.guard(RateLimitGuard::new(60, 60_000)) // 60 req/min
// 동기 완성
.post_typed("/api/chat", gateway_pipeline())
// SSE 스트리밍
.post_sse_typed("/api/chat/stream", streaming_gateway_pipeline())
.run(())
.await?;
Ok(())
}
fn gateway_pipeline() -> Axon<LlmRequest, LlmResponse, String, ()> {
Axon::typed::<LlmRequest, String>("llm-gateway")
.then(TokenCountGuard::with_limit(128_000))
.then_fn("route", |req, _res, bus| async move {
// 모델 이름으로 자동 라우팅
if req.model.starts_with("claude") {
let key = std::env::var("ANTHROPIC_API_KEY").unwrap_or_default();
AnthropicProvider::new(key).run(req, &(), bus).await
} else {
OpenAiProvider::new().run(req, &(), bus).await
}
})
}#See Also
- LLM Pipeline Cookbook — 단일 프로바이더 에이전트 파이프라인 패턴
- Streaming Patterns Cookbook — StreamingTransition 상세
- Guard Patterns Cookbook — 토큰/비용 제한 Guard 조합
- Multi-Tenant Isolation Cookbook — 테넌트별 비용 예산