How does futures::stream::unfold create streams from stateful async functions?
futures::stream::unfold creates a stream by repeatedly calling an async function with state that evolves with each element yielded, returning Some((item, next_state)) to continue the stream or None to terminate it. This pattern lets you generate streams from computation rather than static data, maintaining state across iterations without explicit mutable state management, making it ideal for pagination, polling, sequences, and any scenario where each element depends on previous computation.
Basic unfold Syntax
use futures::stream::{self, Stream};
fn basic_unfold() {
// unfold takes:
// 1. Initial state (T)
// 2. Async function: |state| -> impl Future<Output = Option<(Item, NextState)>>
let stream = stream::unfold(0, |count| async move {
if count < 3 {
// Yield an item and the next state
Some((format!("Item {}", count), count + 1))
} else {
// None terminates the stream
None
}
});
// This creates a stream that yields:
// "Item 0", "Item 1", "Item 2"
// Then terminates
}unfold takes initial state and an async closure that returns Some((item, new_state)) to continue or None to end.
The State Transition Pattern
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn state_transitions() {
// State transitions: 0 -> 1 -> 2 -> 3 -> Done
let stream = stream::unfold(0, |state| async move {
match state {
n if n < 3 => {
println!("State: {}, yielding item", n);
Some((n, n + 1)) // yield n, next state is n+1
}
_ => {
println!("State: {}, terminating", state);
None // terminate
}
}
});
let collected: Vec<_> = stream.collect().await;
println!("Collected: {:?}", collected); // [0, 1, 2]
}Each iteration receives the previous state and returns the next state along with an item.
Generating Sequences
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn sequences() {
// Fibonacci sequence
let fib_stream = stream::unfold((0, 1), |(a, b)| async move {
Some((a, (b, a + b)))
});
// This would run forever, so take first 10
let first_10: Vec<_> = fib_stream.take(10).collect().await;
println!("Fibonacci: {:?}", first_10);
// [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
}unfold can generate infinite sequences when the closure always returns Some.
Pagination Pattern
use futures::stream::{self, StreamExt};
#[derive(Debug)]
struct Page {
items: Vec<String>,
has_next: bool,
next_token: Option<String>,
}
async fn fetch_page(token: Option<String>) -> Page {
// Simulated API call
match token {
None => Page {
items: vec!["a".to_string(), "b".to_string()],
has_next: true,
next_token: Some("page2".to_string()),
},
Some(t) if t == "page2" => Page {
items: vec!["c".to_string(), "d".to_string()],
has_next: true,
next_token: Some("page3".to_string()),
},
Some(t) if t == "page3" => Page {
items: vec!["e".to_string()],
has_next: false,
next_token: None,
},
_ => panic!("unexpected token"),
}
}
#[tokio::main]
async fn pagination_example() {
// State: Option<String> (pagination token)
let all_items: Vec<String> = stream::unfold(None, |token| async move {
let page = fetch_page(token.clone()).await;
if page.has_next {
// Continue with items and next token
Some((page.items, page.next_token))
} else if !page.items.is_empty() {
// Last page with items, but no next page
Some((page.items, None)) // Yield items, then None triggers termination
} else {
// No more items, terminate
None
}
})
.flat_map(|items| stream::iter(items))
.collect()
.await;
println!("All items: {:?}", all_items);
// ["a", "b", "c", "d", "e"]
}unfold naturally handles pagination with the continuation token as state.
Proper Pagination Termination
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn pagination_termination() {
// Cleaner pagination pattern
let stream = stream::unfold(None as Option<String>, |token| async move {
let page = fetch_page(token).await;
let next_token = if page.has_next {
page.next_token
} else {
None // This will cause termination on next iteration
};
if page.items.is_empty() && next_token.is_none() {
// Terminate immediately
None
} else {
// Yield items and continue (or terminate next time)
Some((page.items, next_token))
}
});
// Each stream item is a Vec<String>
// Use flat_map to flatten pages into items
let items: Vec<String> = stream
.flat_map(|items| stream::iter(items))
.collect()
.await;
}Handle empty pages and final termination explicitly in the unfold logic.
Polling Pattern
use futures::stream::{self, StreamExt};
use std::time::Duration;
#[derive(Debug)]
struct PollResult {
status: String,
done: bool,
}
async fn check_status() -> PollResult {
// Simulated polling
static mut COUNT: u32 = 0;
unsafe {
COUNT += 1;
if COUNT >= 3 {
PollResult { status: "completed".into(), done: true }
} else {
PollResult { status: "pending".into(), done: false }
}
}
}
#[tokio::main]
async fn polling_example() {
let poll_stream = stream::unfold((), |_| async move {
tokio::time::sleep(Duration::from_millis(100)).await;
let result = check_status().await;
if result.done {
Some((result, None)) // Yield final result, then terminate
} else {
Some((result, Some(()))) // Yield, continue polling
}
});
// Take until we see done
let statuses: Vec<_> = poll_stream
.take_while(|r| futures::future::ready(!r.done))
.collect()
.await;
println!("Statuses: {:?}", statuses);
}Use unfold for polling scenarios where you check status repeatedly.
Async State Operations
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn async_state() {
// State can be complex and async operations can update it
let stream = stream::unfold(
vec![1, 2, 3, 4, 5],
|mut numbers| async move {
if numbers.is_empty() {
None
} else {
// Async operation on state
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let number = numbers.remove(0);
let doubled = number * 2;
Some((doubled, numbers))
}
}
);
let results: Vec<_> = stream.collect().await;
println!("Doubled: {:?}", results); // [2, 4, 6, 8, 10]
}The state mutation happens inside the async closure, allowing async state updates.
Comparison with iter and successors
use futures::stream::{self, StreamExt, Successors};
#[tokio::main]
async fn comparison() {
// stream::iter: static collection
let static_stream = stream::iter(vec![1, 2, 3]);
// Yields elements from existing collection
// No state, no async
// stream::successors: pure function, no async
let successor_stream = stream::successors(Some(1), |n| Some(n + 1));
// Yields 1, 2, 3, 4, ... infinitely
// Function is synchronous: fn(Option<T>) -> Option<T>
// stream::unfold: async function with state
let unfold_stream = stream::unfold(1, |n| async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
if n <= 5 {
Some((n, n + 1))
} else {
None
}
});
// Key difference:
// - iter: known collection, no computation
// - successors: synchronous computation
// - unfold: async computation
}Use iter for static data, successors for sync computation, unfold for async computation.
Error Handling in unfold
use futures::stream::{self, StreamExt};
#[derive(Debug)]
enum StreamError {
Failed(String),
}
#[tokio::main]
async fn error_handling() {
// unfold produces Stream, so use StreamExt methods for errors
let stream = stream::unfold(0, |count| async move {
if count >= 5 {
None // Terminate normally
} else if count == 3 {
// Can't return Err from unfold directly
// Must handle errors in stream items or use separate stream type
None // Or yield an error variant
} else {
Some((count, count + 1))
}
});
// Option 1: Yield Result types
let result_stream = stream::unfold(0, |count| async move {
if count >= 5 {
None
} else if count == 3 {
Some((Err(StreamError::Failed("at 3".into())), count + 1))
} else {
Some((Ok(count), count + 1))
}
});
// Filter/collect results
let results: Vec<_> = result_stream.collect().await;
println!("Results: {:?}", results);
}Handle errors by yielding Result items, then filter or handle them downstream.
Multiple State Components
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn complex_state() {
// State can be a struct or tuple
let stream = stream::unfold(
(0, 1.0), // (counter, factor)
|(count, factor)| async move {
if count >= 5 {
None
} else {
let value = count as f64 * factor;
Some((value, (count + 1, factor * 1.5)))
}
}
);
let values: Vec<_> = stream.collect().await;
println!("Values: {:?}", values);
// [0.0, 1.5, 4.5, 10.125, 20.25]
// count * factor at each step
}Use tuples or structs to maintain multiple pieces of state.
Rate Limiting with State
use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
#[tokio::main]
async fn rate_limiting() {
// State tracks rate limiting
let stream = stream::unfold(
(0u32, Instant::now()),
|(count, last_time)| async move {
// Ensure at least 100ms between items
let elapsed = last_time.elapsed();
if elapsed < Duration::from_millis(100) {
tokio::time::sleep(Duration::from_millis(100) - elapsed).await;
}
if count >= 10 {
None
} else {
Some((count, (count + 1, Instant::now())))
}
}
);
// Each item is yielded with rate limiting
let items: Vec<_> = stream.collect().await;
}Use state to track timing, counts, or other rate-limiting information.
Unfold with External State
use futures::stream::{self, StreamExt};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
#[tokio::main]
async fn external_state() {
// External state can be captured by the closure
let counter = Arc::new(AtomicU32::new(0));
let stream = stream::unfold(0, {
let counter = counter.clone();
move |n| {
let counter = counter.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
if n >= 5 {
None
} else {
Some((n, n + 1))
}
}
}
});
// External state is modified as stream progresses
let items: Vec<_> = stream.collect().await;
println!("Counter: {}", counter.load(Ordering::SeqCst)); // 6 (0-5 iterations)
}The closure can capture external state for coordination with other code.
Creating Custom Stream Types
use futures::stream::{Stream, Unfold};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
// unfold returns an Unfold type
// You can use it as a building block for custom streams
#[tokio::main]
async fn custom_stream() {
// Create a reusable "stream generator" pattern
fn make_counter_stream(start: u32, end: u32) -> impl Stream<Item = u32> {
stream::unfold(start, move |current| async move {
if current < end {
Some((current, current + 1))
} else {
None
}
})
}
let stream1 = make_counter_stream(0, 3);
let stream2 = make_counter_stream(10, 15);
let v1: Vec<_> = stream1.collect().await;
let v2: Vec<_> = stream2.collect().await;
println!("Stream 1: {:?}", v1); // [0, 1, 2]
println!("Stream 2: {:?}", v2); // [10, 11, 12, 13, 14]
}unfold lets you create reusable stream generators with parameters.
Cancellation and Cleanup
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn cancellation() {
let stream = stream::unfold(0, |count| async move {
// Simulate work
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
println!("Generating item {}", count);
Some((count, count + 1))
});
// Use take() to limit, or take_until() for conditional
let items: Vec<_> = stream.take(3).collect().await;
println!("Collected: {:?}", items);
// When take() is satisfied, the unfold closure is no longer called
// No explicit cleanup - the future is just dropped
}Streams created with unfold can be cancelled using standard StreamExt methods.
Synthesis
The unfold pattern:
stream::unfold(initial_state, |state| async move {
// Compute next value and next state
if should_continue(&state) {
Some((next_item, next_state)) // Yield and continue
} else {
None // Terminate
}
})When to use unfold:
// Pagination
stream::unfold(None, |token| async move {
let page = fetch_page(token).await;
if page.has_next {
Some((page.items, page.next_token))
} else {
Some((page.items, None)) // Last page
}
});
// Polling
stream::unfold((), |_| async move {
sleep(Duration::from_secs(1)).await;
let status = check_status().await;
Some((status, ()))
});
// Sequences (async)
stream::unfold(0, |n| async move {
let next = async_computation(n).await;
Some((n, n + 1))
});
// State machines
stream::unfold(State::Initial, |state| async move {
match state.transition().await {
Some((output, next_state)) => Some((output, next_state)),
None => None, // Terminal state
}
});Key insight: unfold bridges the gap between computation and streams. It's the asynchronous equivalent of iterator adaptersâyou provide a state transformation function, and unfold turns it into a stream. The state machine pattern (yield Some to continue, None to stop) naturally fits pagination, polling, sequence generation, and any scenario where each output depends on previous computation. Because the closure is async, you can perform I/O between itemsâfetching pages, polling APIs, or waiting between emissionsâwhile maintaining clean state management without explicit mutable variables or complex control flow.
