#스트리밍 패턴 쿡북

버전: 0.38.0 | 업데이트: 2026-03-22 | 적용 대상: ranvier-core 0.37+, ranvier-runtime 0.37+, ranvier-http 0.37+, ranvier-macros 0.38+ (streaming feature) | 카테고리: 쿡북


#개요

StreamingTransition은 단일 Outcome을 반환하는 대신 Stream<Item>을 생성하는 트레이트입니다. 일반 Transition이 요청-응답 패턴에 적합한 반면, StreamingTransition은 데이터를 점진적으로 전달하는 시나리오에 맞게 설계되었습니다.

주요 활용 사례:

  • LLM 토큰 스트리밍: 생성된 토큰을 실시간으로 클라이언트에 전송
  • 대용량 파일 처리: 메모리 제약 없이 청크 단위로 데이터 처리
  • 실시간 이벤트 피드: WebSocket 또는 SSE를 통한 지속적 이벤트 전달

#1. StreamingTransition 정의

StreamingTransition<In> 트레이트는 세 가지 연관 타입과 run_stream 메서드를 요구합니다. 반환 타입은 Pin<Box<dyn Stream<Item = Self::Item> + Send>>이며, 비동기 스트림을 힙에 고정하여 반환합니다.

use async_trait::async_trait;
use futures_core::Stream;
use ranvier_core::bus::Bus;
use ranvier_core::streaming::StreamingTransition;
use std::pin::Pin;

#[derive(Clone)]
struct TokenStream;

#[async_trait]
impl StreamingTransition<ClassifiedChat> for TokenStream {
    type Item = ChatChunk;
    type Error = String;
    type Resources = ();

    async fn run_stream(
        &self,
        input: ClassifiedChat,
        _resources: &(),
        _bus: &mut Bus,
    ) -> Result<Pin<Box<dyn Stream<Item = ChatChunk> + Send>>, String> {
        let stream = async_stream::stream! {
            for token in generate_tokens(&input.message) {
                yield ChatChunk { text: token, index: 0 };
            }
        };
        Ok(Box::pin(stream))
    }
}

Item은 스트림이 산출하는 개별 요소의 타입이며, ErrorResources는 일반 Transition과 동일한 역할을 합니다.


#2. StreamingAxon 구성

then_stream()을 호출하면 일반 AxonStreamingAxon으로 변환됩니다. 이 호출은 터미널이므로 이후 .then()을 추가할 수 없습니다.

use ranvier_runtime::Axon;

let pipeline = Axon::typed::<ChatRequest, String>("chat-stream")
    .then(ClassifyIntent)       // ChatRequest -> ClassifiedChat
    .then_stream(TokenStream);  // ClassifiedChat -> Stream<ChatChunk>
// 결과 타입: StreamingAxon<ChatRequest, ChatChunk, String, ()>

then_stream()은 파이프라인의 마지막 단계에서만 사용합니다. 스트리밍 이전에 필요한 전처리(분류, 검증 등)는 일반 .then()으로 구성하십시오.


#3. `#[streaming_transition]` 매크로 (v0.38.0+)

StreamingTransition 트레이트를 수동으로 구현하는 대신, #[streaming_transition] 어트리뷰트 매크로를 사용할 수 있습니다. 일반 async 함수에서 구조체와 트레이트 구현을 자동 생성하여 보일러플레이트를 ~15줄 줄여줍니다.

use ranvier_macros::streaming_transition;
use futures_core::Stream;

#[streaming_transition]
async fn synthesize_stream(
    input: ClassifiedChat,
    resources: &AppResources,
    bus: &mut Bus,
) -> Result<impl Stream<Item = ChatChunk> + Send, LlmError> {
    let stream = resources.llm.chat_stream(&input.prompt).await?;
    Ok(stream)
}

매크로의 동작:

  • 함수 이름(synthesize_stream)으로 제로 사이즈 구조체를 생성
  • StreamingTransition<ClassifiedChat> 트레이트를 구현 (Item = ChatChunk, Error = LlmError, Resources = AppResources)
  • 반환값을 Box::pin()으로 자동 래핑하여 트레이트 시그니처에 맞춤
  • 원본 함수는 __ranvier_fn_synthesize_stream으로 보존 (직접 호출 가능)

파라미터 바인딩#[transition]과 동일한 규칙을 따릅니다:

  • 1개 파라미터: (input) — Resources = ()
  • 2개 파라미터: (input, &Resources) 또는 (input, &mut Bus)
  • 3개 파라미터: (input, &Resources, &mut Bus)

피처 게이트: ranvier-macrosstreaming 피처를 활성화해야 합니다.

ranvier-macros = { version = "0.38", features = ["streaming"] }

#4. `map_items()` 항목별 변환 (v0.38.0+)

map_items()then_stream() 이후 스트림의 각 아이템에 변환을 적용합니다. PII 필터링, 토큰 카운팅, 포맷 정규화 등 아이템 수준의 후처리에 유용합니다.

let pipeline = Axon::typed::<ChatRequest, String>("chat-stream")
    .then(ClassifyIntent)
    .then_stream(synthesize_stream)
    .map_items(|chunk: ChatChunk| {
        ChatChunk {
            text: redact_pii(&chunk.text),
            ..chunk
        }
    });

제약 조건:

  • map_items()는 아이템 타입을 유지합니다 — FnMut(Item) -> Item
  • 여러 map_items() 호출을 이어서 연결할 수 있습니다
  • 클로저는 Send + Sync + 'static이어야 합니다 (요청 간 공유)
  • map_items 노드가 Schematic에 등록되어 시각화에 표시됩니다

타입을 변경하는 변환이 필요한 경우, StreamExt::map을 사용한 수동 스트림 래핑을 활용하십시오.


#5. SSE HTTP 엔드포인트

post_sse_typed()는 StreamingAxon을 Server-Sent Events 엔드포인트로 노출합니다. 각 스트림 아이템은 SSE data: 프레임으로 직렬화되어 전송됩니다.

use ranvier_http::prelude::*;

Ranvier::http()
    .bind("127.0.0.1:3000")
    .guard(CorsGuard::<()>::new(cors_config))
    .guard(AccessLogGuard::<()>::new())
    .post_sse_typed::<ChatRequest, _, _>("/api/chat/stream", streaming_pipeline)
    .post_typed::<ChatRequest, _, _>("/api/chat", batch_pipeline)
    .run(())
    .await?;

클라이언트는 text/event-stream 응답을 수신하며, 각 아이템은 다음 형식으로 전달됩니다:

data: {"text":"Hello","index":0}

data: {"text":" world","index":1}

data: 라인 뒤에 빈 줄(\n\n)이 위치하여 SSE 프레임 경계를 표시합니다.


#6. 스트림 타임아웃

then_stream_with_timeout()StreamTimeoutConfig를 통해 세 가지 수준의 타임아웃을 적용합니다. 타임아웃 초과 시 스트림이 종료되고 오류가 반환됩니다.

use ranvier_core::streaming::StreamTimeoutConfig;
use std::time::Duration;

let pipeline = Axon::typed::<ChatRequest, String>("chat-timeout")
    .then(ClassifyIntent)
    .then_stream_with_timeout(TokenStream, StreamTimeoutConfig {
        init_timeout: Some(Duration::from_secs(10)),
        idle_timeout: Some(Duration::from_secs(30)),
        total_timeout: Some(Duration::from_secs(300)),
    });

세 가지 타임아웃 유형:

  • init_timeout -- 첫 번째 아이템이 산출될 때까지의 최대 대기 시간. LLM cold start 지연을 감지하는 데 유용합니다.
  • idle_timeout -- 연속된 아이템 간 최대 간격. 스트림 정체를 방지합니다.
  • total_timeout -- 스트림 전체의 최대 지속 시간. 무한 스트림에 대한 상한을 설정합니다.

모든 필드는 Option이며, None으로 설정하면 해당 타임아웃을 비활성화합니다.


#7. 스트림 수집 (테스트)

collect_into_vec()는 StreamingAxon을 일반 Axon으로 변환하여 모든 스트림 아이템을 Vec로 수집합니다. 테스트 또는 배치 처리에 유용합니다.

let collecting_pipeline = streaming_pipeline.collect_into_vec();
// 타입: Axon<ChatRequest, Vec<ChatChunk>, String, ()>

// 일반 Axon처럼 실행 가능
let result = collecting_pipeline.run(input, &(), &mut bus).await?;
assert_eq!(result.len(), 15);

프로덕션에서는 SSE 스트리밍을, 테스트에서는 수집 후 일괄 검증을 수행하는 이중 전략을 권장합니다.


#8. ranvier-test로 테스트

TestAxon::run_stream()은 스트리밍 파이프라인을 테스트 환경에서 실행하고, assert_stream_items! 매크로는 수집된 아이템의 개수와 내용을 검증합니다.

use ranvier_test::{TestBus, TestAxon, assert_stream_items};

#[tokio::test]
async fn test_streaming_pipeline() {
    let items = TestAxon::run_stream(
        streaming_pipeline,
        input,
        &(),
        TestBus::new(),
    ).await.unwrap();

    // item count assertion
    assert_stream_items!(items, 15);

    // first item content assertion
    assert_stream_items!(items, 15, |first| {
        assert_eq!(first.text, "Hello");
    });
}

assert_stream_items!의 세 번째 인자는 첫 번째 아이템에 대한 클로저로, 스트림 시작 시점의 데이터를 검증할 수 있습니다.


#9. 피처 플래그

스트리밍 기능은 streaming 피처 플래그로 게이트되어 있습니다. 필요한 크레이트에 해당 피처를 활성화해야 합니다.

[dependencies]
ranvier-http = { version = "0.37", features = ["streaming"] }

# 또는 개별적으로 활성화:
ranvier-runtime = { version = "0.37", features = ["streaming"] }
ranvier-core = { version = "0.37", features = ["streaming"] }

ranvier-httpstreaming 피처는 ranvier-runtime/streamingranvier-core/streaming을 전이적으로 활성화합니다. HTTP 계층에서 SSE를 사용하는 경우 ranvier-http에만 피처를 지정하면 충분합니다.


참고: DD-4 StreamingTransition 아키텍처