#Streaming Patterns Cookbook

Version: 0.38.0 | Updated: 2026-03-22 | Applies to: ranvier-core 0.37+, ranvier-runtime 0.37+, ranvier-http 0.37+, ranvier-macros 0.38+ (streaming feature) | Category: Cookbook


#Overview

StreamingTransition is the streaming counterpart to Transition. Where a regular Transition produces a single Outcome, a StreamingTransition produces a Stream<Item> -- an async iterator of typed values emitted incrementally.

Use cases: LLM token streaming, large-file chunk processing, real-time event feeds, and any scenario where consumers should receive partial results before the full computation completes.


#1. Defining a StreamingTransition

Implement the StreamingTransition<In> trait. The run_stream method returns a pinned Stream of items instead of a single Outcome.

use async_trait::async_trait;
use futures_core::Stream;
use ranvier_core::bus::Bus;
use ranvier_core::streaming::StreamingTransition;
use std::pin::Pin;

#[derive(Debug, Clone)]
struct ChatChunk {
    text: String,
    index: u32,
}

#[derive(Clone)]
struct TokenStream;

#[async_trait]
impl StreamingTransition<ClassifiedChat> for TokenStream {
    type Item = ChatChunk;
    type Error = String;
    type Resources = ();

    async fn run_stream(
        &self,
        input: ClassifiedChat,
        _resources: &(),
        _bus: &mut Bus,
    ) -> Result<Pin<Box<dyn Stream<Item = ChatChunk> + Send>>, String> {
        let stream = async_stream::stream! {
            for (i, token) in generate_tokens(&input.message).enumerate() {
                yield ChatChunk { text: token, index: i as u32 };
            }
        };
        Ok(Box::pin(stream))
    }
}

The trait mirrors the regular Transition signature but replaces the return type: instead of Outcome<Out, E>, it returns Result<Pin<Box<dyn Stream<Item> + Send>>, E>.


#2. Building a StreamingAxon

Use then_stream() to attach a StreamingTransition to an Axon pipeline. This call is terminal -- no further .then() calls are allowed after it, because the pipeline output is now a stream rather than a single value.

use ranvier_runtime::Axon;

let pipeline = Axon::typed::<ChatRequest, String>("chat-stream")
    .then(ClassifyIntent)       // ChatRequest -> ClassifiedChat
    .then_stream(TokenStream);  // ClassifiedChat -> Stream<ChatChunk>

The result is a StreamingAxon<ChatRequest, ChatChunk, String, ()> that can be passed to SSE endpoints or collected for testing.


#3. `#[streaming_transition]` Macro (v0.38.0+)

Instead of implementing the StreamingTransition trait manually, use the #[streaming_transition] attribute macro. It generates the struct and trait impl from a plain async function, cutting about 15 lines of boilerplate.

use ranvier_macros::streaming_transition;
use futures_core::Stream;

#[streaming_transition]
async fn synthesize_stream(
    input: ClassifiedChat,
    resources: &AppResources,
    bus: &mut Bus,
) -> Result<impl Stream<Item = ChatChunk> + Send, LlmError> {
    let stream = resources.llm.chat_stream(&input.prompt).await?;
    Ok(stream)
}

The macro:

  • Creates a zero-sized struct named synthesize_stream
  • Implements StreamingTransition<ClassifiedChat> with Item = ChatChunk, Error = LlmError, Resources = AppResources
  • Auto-wraps the return value with Box::pin() for the trait signature
  • Preserves the original function as __ranvier_fn_synthesize_stream for direct calls

Parameter binding follows the same rules as #[transition]:

  • 1 param: (input) โ€” Resources = ()
  • 2 params: (input, &Resources) or (input, &mut Bus)
  • 3 params: (input, &Resources, &mut Bus)

Feature gate: Requires ranvier-macros with the streaming feature enabled.

ranvier-macros = { version = "0.38", features = ["streaming"] }

#4. `map_items()` Per-Item Transform (v0.38.0+)

Use map_items() to transform each item in the stream after then_stream(). This is useful for PII filtering, token counting, format normalization, and other item-level post-processing.

let pipeline = Axon::typed::<ChatRequest, String>("chat-stream")
    .then(ClassifyIntent)
    .then_stream(synthesize_stream)
    .map_items(|chunk: ChatChunk| {
        ChatChunk {
            text: redact_pii(&chunk.text),
            ..chunk
        }
    });

Constraints:

  • map_items() preserves the item type โ€” FnMut(Item) -> Item
  • Multiple map_items() calls can be chained
  • The closure must be Send + Sync + 'static (shared across requests)
  • A map_items node is registered in the Schematic for visualization

For type-changing transformations, use manual stream wrapping with StreamExt::map.


#5. SSE HTTP Endpoints

Register a StreamingAxon with post_sse_typed() to expose it as a Server-Sent Events endpoint. Each stream item is serialized as a JSON SSE frame.

use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;

Ranvier::http()
    .bind("127.0.0.1:3000")
    .guard(CorsGuard::<()>::new(cors_config))
    .guard(AccessLogGuard::<()>::new())
    .post_sse_typed::<ChatRequest, _, _>("/api/chat/stream", streaming_pipeline)
    .post_typed::<ChatRequest, _, _>("/api/chat", batch_pipeline)
    .run(())
    .await?;

Each ChatChunk item is emitted as an SSE frame:

data: {"text":"Hello","index":0}

data: {"text":" world","index":1}

The response uses Content-Type: text/event-stream and keeps the connection open until the stream completes or the client disconnects.


#6. Stream Timeouts

Use then_stream_with_timeout() with StreamTimeoutConfig to enforce time limits on streaming pipelines. Three independent timeout types are supported.

use ranvier_core::streaming::StreamTimeoutConfig;
use ranvier_runtime::Axon;
use std::time::Duration;

let pipeline = Axon::typed::<ChatRequest, String>("chat-timeout")
    .then(ClassifyIntent)
    .then_stream_with_timeout(TokenStream, StreamTimeoutConfig {
        init_timeout: Some(Duration::from_secs(10)),
        idle_timeout: Some(Duration::from_secs(30)),
        total_timeout: Some(Duration::from_secs(300)),
    });
  • init_timeout: Maximum time to wait for the first item. Guards against streams that never start producing.
  • idle_timeout: Maximum gap between consecutive items. Detects stalled producers mid-stream.
  • total_timeout: Maximum wall-clock duration for the entire stream, regardless of item cadence.

When any timeout fires, the stream terminates and a TimelineEvent::StreamTimeout is recorded indicating which timeout type was triggered.


#7. Collecting Streams (Testing)

Convert a StreamingAxon into a regular Axon that collects all items into a Vec. This is useful in integration tests where you need the complete output.

let collecting_pipeline = streaming_pipeline.collect_into_vec();
// Type: Axon<ChatRequest, Vec<ChatChunk>, String, ()>

let mut bus = Bus::new();
let outcome = collecting_pipeline.run(request, &(), &mut bus).await;
match outcome {
    Outcome::Next(chunks) => assert_eq!(chunks.len(), 15),
    Outcome::Fault(e) => panic!("stream failed: {e}"),
}

collect_into_vec() buffers all stream items in memory, so use it only for bounded streams in test scenarios.


#8. Testing with ranvier-test

The ranvier-test crate provides TestAxon::run_stream() for streaming pipelines and the assert_stream_items! macro for concise assertions.

use ranvier_test::{TestBus, TestAxon, assert_stream_items};

#[tokio::test]
async fn test_streaming_pipeline() {
    let items = TestAxon::run_stream(
        streaming_pipeline,
        input,
        &(),
        TestBus::new(),
    )
    .await
    .unwrap();

    // Assert item count
    assert_stream_items!(items, 15);

    // Assert count + inspect first item
    assert_stream_items!(items, 15, |first| {
        assert_eq!(first.text, "Hello");
    });
}

TestAxon::run_stream() collects all items internally and returns Vec<Item>, making it easy to verify both count and content.


#9. Feature Flags

Streaming support requires the streaming feature flag. Enable it on the crates you use:

[dependencies]
ranvier-http = { version = "0.37", features = ["streaming"] }

# Or enable on individual crates:
ranvier-runtime = { version = "0.37", features = ["streaming"] }
ranvier-core = { version = "0.37", features = ["streaming"] }

The streaming feature pulls in futures-core and async-stream as transitive dependencies. Without this feature, StreamingTransition, StreamingAxon, post_sse_typed(), and related APIs are not available.


See also: DD-4 StreamingTransition Architecture