#Streaming Patterns Cookbook
Version: 0.38.0 | Updated: 2026-03-22 | Applies to: ranvier-core 0.37+, ranvier-runtime 0.37+, ranvier-http 0.37+, ranvier-macros 0.38+ (streaming feature) | Category: Cookbook
#Overview
StreamingTransition is the streaming counterpart to Transition. Where a regular
Transition produces a single Outcome, a StreamingTransition produces a
Stream<Item> -- an async iterator of typed values emitted incrementally.
Use cases: LLM token streaming, large-file chunk processing, real-time event feeds, and any scenario where consumers should receive partial results before the full computation completes.
#1. Defining a StreamingTransition
Implement the StreamingTransition<In> trait. The run_stream method returns a
pinned Stream of items instead of a single Outcome.
use async_trait::async_trait;
use futures_core::Stream;
use ranvier_core::bus::Bus;
use ranvier_core::streaming::StreamingTransition;
use std::pin::Pin;
#[derive(Debug, Clone)]
struct ChatChunk {
text: String,
index: u32,
}
#[derive(Clone)]
struct TokenStream;
#[async_trait]
impl StreamingTransition<ClassifiedChat> for TokenStream {
type Item = ChatChunk;
type Error = String;
type Resources = ();
async fn run_stream(
&self,
input: ClassifiedChat,
_resources: &(),
_bus: &mut Bus,
) -> Result<Pin<Box<dyn Stream<Item = ChatChunk> + Send>>, String> {
let stream = async_stream::stream! {
for (i, token) in generate_tokens(&input.message).enumerate() {
yield ChatChunk { text: token, index: i as u32 };
}
};
Ok(Box::pin(stream))
}
}The trait mirrors the regular Transition signature but replaces the return type:
instead of Outcome<Out, E>, it returns Result<Pin<Box<dyn Stream<Item> + Send>>, E>.
#2. Building a StreamingAxon
Use then_stream() to attach a StreamingTransition to an Axon pipeline.
This call is terminal -- no further .then() calls are allowed after it,
because the pipeline output is now a stream rather than a single value.
use ranvier_runtime::Axon;
let pipeline = Axon::typed::<ChatRequest, String>("chat-stream")
.then(ClassifyIntent) // ChatRequest -> ClassifiedChat
.then_stream(TokenStream); // ClassifiedChat -> Stream<ChatChunk>The result is a StreamingAxon<ChatRequest, ChatChunk, String, ()> that can be
passed to SSE endpoints or collected for testing.
#3. `#[streaming_transition]` Macro (v0.38.0+)
Instead of implementing the StreamingTransition trait manually, use the
#[streaming_transition] attribute macro. It generates the struct and trait
impl from a plain async function, cutting about 15 lines of boilerplate.
use ranvier_macros::streaming_transition;
use futures_core::Stream;
#[streaming_transition]
async fn synthesize_stream(
input: ClassifiedChat,
resources: &AppResources,
bus: &mut Bus,
) -> Result<impl Stream<Item = ChatChunk> + Send, LlmError> {
let stream = resources.llm.chat_stream(&input.prompt).await?;
Ok(stream)
}The macro:
- Creates a zero-sized struct named
synthesize_stream - Implements
StreamingTransition<ClassifiedChat>withItem = ChatChunk,Error = LlmError,Resources = AppResources - Auto-wraps the return value with
Box::pin()for the trait signature - Preserves the original function as
__ranvier_fn_synthesize_streamfor direct calls
Parameter binding follows the same rules as #[transition]:
- 1 param:
(input)โ Resources =() - 2 params:
(input, &Resources)or(input, &mut Bus) - 3 params:
(input, &Resources, &mut Bus)
Feature gate: Requires ranvier-macros with the streaming feature enabled.
ranvier-macros = { version = "0.38", features = ["streaming"] }#4. `map_items()` Per-Item Transform (v0.38.0+)
Use map_items() to transform each item in the stream after then_stream().
This is useful for PII filtering, token counting, format normalization, and
other item-level post-processing.
let pipeline = Axon::typed::<ChatRequest, String>("chat-stream")
.then(ClassifyIntent)
.then_stream(synthesize_stream)
.map_items(|chunk: ChatChunk| {
ChatChunk {
text: redact_pii(&chunk.text),
..chunk
}
});Constraints:
map_items()preserves the item type โFnMut(Item) -> Item- Multiple
map_items()calls can be chained - The closure must be
Send + Sync + 'static(shared across requests) - A
map_itemsnode is registered in the Schematic for visualization
For type-changing transformations, use manual stream wrapping with
StreamExt::map.
#5. SSE HTTP Endpoints
Register a StreamingAxon with post_sse_typed() to expose it as a Server-Sent
Events endpoint. Each stream item is serialized as a JSON SSE frame.
use ranvier_http::prelude::*;
use ranvier_guard::prelude::*;
Ranvier::http()
.bind("127.0.0.1:3000")
.guard(CorsGuard::<()>::new(cors_config))
.guard(AccessLogGuard::<()>::new())
.post_sse_typed::<ChatRequest, _, _>("/api/chat/stream", streaming_pipeline)
.post_typed::<ChatRequest, _, _>("/api/chat", batch_pipeline)
.run(())
.await?;Each ChatChunk item is emitted as an SSE frame:
data: {"text":"Hello","index":0}
data: {"text":" world","index":1}
The response uses Content-Type: text/event-stream and keeps the connection
open until the stream completes or the client disconnects.
#6. Stream Timeouts
Use then_stream_with_timeout() with StreamTimeoutConfig to enforce time
limits on streaming pipelines. Three independent timeout types are supported.
use ranvier_core::streaming::StreamTimeoutConfig;
use ranvier_runtime::Axon;
use std::time::Duration;
let pipeline = Axon::typed::<ChatRequest, String>("chat-timeout")
.then(ClassifyIntent)
.then_stream_with_timeout(TokenStream, StreamTimeoutConfig {
init_timeout: Some(Duration::from_secs(10)),
idle_timeout: Some(Duration::from_secs(30)),
total_timeout: Some(Duration::from_secs(300)),
});init_timeout: Maximum time to wait for the first item. Guards against streams that never start producing.idle_timeout: Maximum gap between consecutive items. Detects stalled producers mid-stream.total_timeout: Maximum wall-clock duration for the entire stream, regardless of item cadence.
When any timeout fires, the stream terminates and a TimelineEvent::StreamTimeout
is recorded indicating which timeout type was triggered.
#7. Collecting Streams (Testing)
Convert a StreamingAxon into a regular Axon that collects all items into
a Vec. This is useful in integration tests where you need the complete output.
let collecting_pipeline = streaming_pipeline.collect_into_vec();
// Type: Axon<ChatRequest, Vec<ChatChunk>, String, ()>
let mut bus = Bus::new();
let outcome = collecting_pipeline.run(request, &(), &mut bus).await;
match outcome {
Outcome::Next(chunks) => assert_eq!(chunks.len(), 15),
Outcome::Fault(e) => panic!("stream failed: {e}"),
}collect_into_vec() buffers all stream items in memory, so use it only for
bounded streams in test scenarios.
#8. Testing with ranvier-test
The ranvier-test crate provides TestAxon::run_stream() for streaming
pipelines and the assert_stream_items! macro for concise assertions.
use ranvier_test::{TestBus, TestAxon, assert_stream_items};
#[tokio::test]
async fn test_streaming_pipeline() {
let items = TestAxon::run_stream(
streaming_pipeline,
input,
&(),
TestBus::new(),
)
.await
.unwrap();
// Assert item count
assert_stream_items!(items, 15);
// Assert count + inspect first item
assert_stream_items!(items, 15, |first| {
assert_eq!(first.text, "Hello");
});
}TestAxon::run_stream() collects all items internally and returns Vec<Item>,
making it easy to verify both count and content.
#9. Feature Flags
Streaming support requires the streaming feature flag. Enable it on the
crates you use:
[dependencies]
ranvier-http = { version = "0.37", features = ["streaming"] }
# Or enable on individual crates:
ranvier-runtime = { version = "0.37", features = ["streaming"] }
ranvier-core = { version = "0.37", features = ["streaming"] }The streaming feature pulls in futures-core and async-stream as
transitive dependencies. Without this feature, StreamingTransition,
StreamingAxon, post_sse_typed(), and related APIs are not available.
See also: DD-4 StreamingTransition Architecture