#스트리밍 패턴 쿡북
버전: 0.38.0 | 업데이트: 2026-03-22 | 적용 대상: ranvier-core 0.37+, ranvier-runtime 0.37+, ranvier-http 0.37+, ranvier-macros 0.38+ (streaming feature) | 카테고리: 쿡북
#개요
StreamingTransition은 단일 Outcome을 반환하는 대신 Stream<Item>을 생성하는
트레이트입니다. 일반 Transition이 요청-응답 패턴에 적합한 반면, StreamingTransition은
데이터를 점진적으로 전달하는 시나리오에 맞게 설계되었습니다.
주요 활용 사례:
- LLM 토큰 스트리밍: 생성된 토큰을 실시간으로 클라이언트에 전송
- 대용량 파일 처리: 메모리 제약 없이 청크 단위로 데이터 처리
- 실시간 이벤트 피드: WebSocket 또는 SSE를 통한 지속적 이벤트 전달
#1. StreamingTransition 정의
StreamingTransition<In> 트레이트는 세 가지 연관 타입과 run_stream 메서드를
요구합니다. 반환 타입은 Pin<Box<dyn Stream<Item = Self::Item> + Send>>이며,
비동기 스트림을 힙에 고정하여 반환합니다.
use async_trait::async_trait;
use futures_core::Stream;
use ranvier_core::bus::Bus;
use ranvier_core::streaming::StreamingTransition;
use std::pin::Pin;
#[derive(Clone)]
struct TokenStream;
#[async_trait]
impl StreamingTransition<ClassifiedChat> for TokenStream {
type Item = ChatChunk;
type Error = String;
type Resources = ();
async fn run_stream(
&self,
input: ClassifiedChat,
_resources: &(),
_bus: &mut Bus,
) -> Result<Pin<Box<dyn Stream<Item = ChatChunk> + Send>>, String> {
let stream = async_stream::stream! {
for token in generate_tokens(&input.message) {
yield ChatChunk { text: token, index: 0 };
}
};
Ok(Box::pin(stream))
}
}Item은 스트림이 산출하는 개별 요소의 타입이며, Error와 Resources는
일반 Transition과 동일한 역할을 합니다.
#2. StreamingAxon 구성
then_stream()을 호출하면 일반 Axon이 StreamingAxon으로 변환됩니다.
이 호출은 터미널이므로 이후 .then()을 추가할 수 없습니다.
use ranvier_runtime::Axon;
let pipeline = Axon::typed::<ChatRequest, String>("chat-stream")
.then(ClassifyIntent) // ChatRequest -> ClassifiedChat
.then_stream(TokenStream); // ClassifiedChat -> Stream<ChatChunk>
// 결과 타입: StreamingAxon<ChatRequest, ChatChunk, String, ()>then_stream()은 파이프라인의 마지막 단계에서만 사용합니다. 스트리밍 이전에
필요한 전처리(분류, 검증 등)는 일반 .then()으로 구성하십시오.
#3. `#[streaming_transition]` 매크로 (v0.38.0+)
StreamingTransition 트레이트를 수동으로 구현하는 대신, #[streaming_transition]
어트리뷰트 매크로를 사용할 수 있습니다. 일반 async 함수에서 구조체와 트레이트
구현을 자동 생성하여 보일러플레이트를 ~15줄 줄여줍니다.
use ranvier_macros::streaming_transition;
use futures_core::Stream;
#[streaming_transition]
async fn synthesize_stream(
input: ClassifiedChat,
resources: &AppResources,
bus: &mut Bus,
) -> Result<impl Stream<Item = ChatChunk> + Send, LlmError> {
let stream = resources.llm.chat_stream(&input.prompt).await?;
Ok(stream)
}매크로의 동작:
- 함수 이름(
synthesize_stream)으로 제로 사이즈 구조체를 생성 StreamingTransition<ClassifiedChat>트레이트를 구현 (Item = ChatChunk,Error = LlmError,Resources = AppResources)- 반환값을
Box::pin()으로 자동 래핑하여 트레이트 시그니처에 맞춤 - 원본 함수는
__ranvier_fn_synthesize_stream으로 보존 (직접 호출 가능)
파라미터 바인딩은 #[transition]과 동일한 규칙을 따릅니다:
- 1개 파라미터:
(input)— Resources =() - 2개 파라미터:
(input, &Resources)또는(input, &mut Bus) - 3개 파라미터:
(input, &Resources, &mut Bus)
피처 게이트: ranvier-macros에 streaming 피처를 활성화해야 합니다.
ranvier-macros = { version = "0.38", features = ["streaming"] }#4. `map_items()` 항목별 변환 (v0.38.0+)
map_items()는 then_stream() 이후 스트림의 각 아이템에 변환을 적용합니다.
PII 필터링, 토큰 카운팅, 포맷 정규화 등 아이템 수준의 후처리에 유용합니다.
let pipeline = Axon::typed::<ChatRequest, String>("chat-stream")
.then(ClassifyIntent)
.then_stream(synthesize_stream)
.map_items(|chunk: ChatChunk| {
ChatChunk {
text: redact_pii(&chunk.text),
..chunk
}
});제약 조건:
map_items()는 아이템 타입을 유지합니다 —FnMut(Item) -> Item- 여러
map_items()호출을 이어서 연결할 수 있습니다 - 클로저는
Send + Sync + 'static이어야 합니다 (요청 간 공유) map_items노드가 Schematic에 등록되어 시각화에 표시됩니다
타입을 변경하는 변환이 필요한 경우, StreamExt::map을 사용한 수동 스트림
래핑을 활용하십시오.
#5. SSE HTTP 엔드포인트
post_sse_typed()는 StreamingAxon을 Server-Sent Events 엔드포인트로 노출합니다.
각 스트림 아이템은 SSE data: 프레임으로 직렬화되어 전송됩니다.
use ranvier_http::prelude::*;
Ranvier::http()
.bind("127.0.0.1:3000")
.guard(CorsGuard::<()>::new(cors_config))
.guard(AccessLogGuard::<()>::new())
.post_sse_typed::<ChatRequest, _, _>("/api/chat/stream", streaming_pipeline)
.post_typed::<ChatRequest, _, _>("/api/chat", batch_pipeline)
.run(())
.await?;클라이언트는 text/event-stream 응답을 수신하며, 각 아이템은 다음 형식으로
전달됩니다:
data: {"text":"Hello","index":0}
data: {"text":" world","index":1}
각 data: 라인 뒤에 빈 줄(\n\n)이 위치하여 SSE 프레임 경계를 표시합니다.
#6. 스트림 타임아웃
then_stream_with_timeout()은 StreamTimeoutConfig를 통해 세 가지 수준의
타임아웃을 적용합니다. 타임아웃 초과 시 스트림이 종료되고 오류가 반환됩니다.
use ranvier_core::streaming::StreamTimeoutConfig;
use std::time::Duration;
let pipeline = Axon::typed::<ChatRequest, String>("chat-timeout")
.then(ClassifyIntent)
.then_stream_with_timeout(TokenStream, StreamTimeoutConfig {
init_timeout: Some(Duration::from_secs(10)),
idle_timeout: Some(Duration::from_secs(30)),
total_timeout: Some(Duration::from_secs(300)),
});세 가지 타임아웃 유형:
init_timeout-- 첫 번째 아이템이 산출될 때까지의 최대 대기 시간. LLM cold start 지연을 감지하는 데 유용합니다.idle_timeout-- 연속된 아이템 간 최대 간격. 스트림 정체를 방지합니다.total_timeout-- 스트림 전체의 최대 지속 시간. 무한 스트림에 대한 상한을 설정합니다.
모든 필드는 Option이며, None으로 설정하면 해당 타임아웃을 비활성화합니다.
#7. 스트림 수집 (테스트)
collect_into_vec()는 StreamingAxon을 일반 Axon으로 변환하여 모든 스트림
아이템을 Vec로 수집합니다. 테스트 또는 배치 처리에 유용합니다.
let collecting_pipeline = streaming_pipeline.collect_into_vec();
// 타입: Axon<ChatRequest, Vec<ChatChunk>, String, ()>
// 일반 Axon처럼 실행 가능
let result = collecting_pipeline.run(input, &(), &mut bus).await?;
assert_eq!(result.len(), 15);프로덕션에서는 SSE 스트리밍을, 테스트에서는 수집 후 일괄 검증을 수행하는 이중 전략을 권장합니다.
#8. ranvier-test로 테스트
TestAxon::run_stream()은 스트리밍 파이프라인을 테스트 환경에서 실행하고,
assert_stream_items! 매크로는 수집된 아이템의 개수와 내용을 검증합니다.
use ranvier_test::{TestBus, TestAxon, assert_stream_items};
#[tokio::test]
async fn test_streaming_pipeline() {
let items = TestAxon::run_stream(
streaming_pipeline,
input,
&(),
TestBus::new(),
).await.unwrap();
// item count assertion
assert_stream_items!(items, 15);
// first item content assertion
assert_stream_items!(items, 15, |first| {
assert_eq!(first.text, "Hello");
});
}assert_stream_items!의 세 번째 인자는 첫 번째 아이템에 대한 클로저로,
스트림 시작 시점의 데이터를 검증할 수 있습니다.
#9. 피처 플래그
스트리밍 기능은 streaming 피처 플래그로 게이트되어 있습니다. 필요한 크레이트에
해당 피처를 활성화해야 합니다.
[dependencies]
ranvier-http = { version = "0.37", features = ["streaming"] }
# 또는 개별적으로 활성화:
ranvier-runtime = { version = "0.37", features = ["streaming"] }
ranvier-core = { version = "0.37", features = ["streaming"] }ranvier-http의 streaming 피처는 ranvier-runtime/streaming과
ranvier-core/streaming을 전이적으로 활성화합니다. HTTP 계층에서 SSE를
사용하는 경우 ranvier-http에만 피처를 지정하면 충분합니다.
참고: DD-4 StreamingTransition 아키텍처