Loading pageā¦
Rust walkthroughs
Loading pageā¦
futures::stream::unfold create streams from async state machines?futures::stream::unfold creates a stream by repeatedly applying an async function to a state value, producing elements until the function returns None. The function takes the current state and returns Option<(Item, NextState)>āSome yields an element and continues with the new state, while None terminates the stream. This pattern transforms stateful computation into a stream abstraction, enabling lazy, pull-based iteration over values generated from previous results. The state machine captures all variables needed for future iterations, making unfold ideal for pagination, recursive traversal, and stateful generators.
use futures::stream::{Stream, unfold};
// unfold signature:
pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
// ...
}
// T = state type
// F = async function that takes state, returns Option<(Item, new_state)>
// Item = stream element type
// None = stream endsunfold creates a stream from an initial state and an async function that produces elements and new states.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Create a stream of numbers 0, 1, 2, 3, 4
let stream = stream::unfold(0, |state| async move {
if state < 5 {
Some((state, state + 1))
} else {
None // Stream ends
}
});
let values: Vec<i32> = stream.collect().await;
println!("{:?}", values); // [0, 1, 2, 3, 4]
}The stream starts with state 0, yields each value, and increments state until state >= 5.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Each iteration:
// 1. Call the unfold function with current state
// 2. Await the resulting future
// 3. If Some((item, new_state)):
// - Yield item to the stream
// - Set state = new_state
// - Repeat from step 1
// 4. If None: Stream ends
let stream = stream::unfold(1, |state| async move {
println!("Current state: {}", state);
if state <= 3 {
let next_state = state * 2;
Some((state, next_state))
} else {
None
}
});
// Output:
// Current state: 1
// Current state: 2
// Current state: 4
// [1, 2, 4]
let values: Vec<i32> = stream.collect().await;
println!("{:?}", values);
}The state carries information between iterationsāthe next state can depend on the current state.
use futures::stream::{self, StreamExt};
// Finite stream: ends when function returns None
fn finite_stream() -> impl Stream<Item = i32> {
stream::unfold(0, |n| async move {
if n < 10 {
Some((n, n + 1))
} else {
None
}
})
}
// Infinite stream: never returns None
fn infinite_stream() -> impl Stream<Item = i32> {
stream::unfold(0, |n| async move {
Some((n, n + 1))
})
}
#[tokio::main]
async fn main() {
// Finite: collect all
let finite: Vec<i32> = finite_stream().collect().await;
println!("Finite: {:?}", finite);
// Infinite: must limit somehow
let infinite: Vec<i32> = infinite_stream()
.take(10)
.collect()
.await;
println!("Infinite (first 10): {:?}", infinite);
}unfold can create both finite and infinite streamsāthe difference is whether the function can return None.
use futures::stream::{self, StreamExt};
#[derive(Debug)]
struct FibonacciState {
current: u64,
next: u64,
}
fn fibonacci_stream() -> impl Stream<Item = u64> {
stream::unfold(
FibonacciState { current: 0, next: 1 },
|state| async move {
let current = state.current;
let next = state.next;
Some((
current,
FibonacciState {
current: next,
next: current + next,
},
))
},
)
}
#[tokio::main]
async fn main() {
let fibs: Vec<u64> = fibonacci_stream()
.take(10)
.collect()
.await;
println!("Fibonacci: {:?}", fibs);
// Fibonacci: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
}State can be any type, enabling complex stateful computations.
use futures::stream::{self, StreamExt};
fn countdown_stream(start: i32) -> impl Stream<Item = i32> {
// The start value is captured by the async block
stream::unfold(start, |current| async move {
if current >= 0 {
Some((current, current - 1))
} else {
None
}
})
}
#[tokio::main]
async fn main() {
let values: Vec<i32> = countdown_stream(5).collect().await;
println!("{:?}", values); // [5, 4, 3, 2, 1, 0]
}Variables from the outer scope are captured into the state machine.
use futures::stream::{self, StreamExt};
use std::time::Duration;
async fn fetch_page(page: u32) -> Vec<String> {
// Simulate async fetch
tokio::time::sleep(Duration::from_millis(100)).await;
(0..3).map(|i| format!("Page {} Item {}", page, i)).collect()
}
fn paginated_stream() -> impl Stream<Item = String> {
stream::unfold(1, |page| async move {
let items = fetch_page(page).await;
if items.is_empty() || page > 3 {
None
} else {
// Create a stream of items, then return the next page state
let next_page = page + 1;
Some((
futures::stream::iter(items),
next_page,
))
}
})
.flatten()
}
#[tokio::main]
async fn main() {
let items: Vec<String> = paginated_stream().collect().await;
for item in items {
println!("{}", item);
}
}The unfold function can perform async operations, making it ideal for I/O-driven streams.
use futures::stream::{self, StreamExt};
#[derive(Debug)]
struct Page<T> {
items: Vec<T>,
next_token: Option<String>,
}
async fn fetch_items(token: Option<String>) -> Page<String> {
// Simulated API call
let start = token
.and_then(|t| t.parse::<usize>().ok())
.unwrap_or(0);
if start >= 10 {
Page {
items: vec![],
next_token: None,
}
} else {
Page {
items: (start..start + 3).map(|i| format!("Item {}", i)).collect(),
next_token: Some((start + 3).to_string()),
}
}
}
fn paginated_items() -> impl Stream<Item = String> {
stream::unfold(None, |token| async move {
let page = fetch_items(token).await;
if page.items.is_empty() {
None
} else {
Some((
futures::stream::iter(page.items),
page.next_token,
))
}
})
.flatten()
}
#[tokio::main]
async fn main() {
let items: Vec<String> = paginated_items().collect().await;
println!("{:?}", items);
// ["Item 0", "Item 1", "Item 2", "Item 3", ...]
}Pagination uses state to track the continuation token between API calls.
use futures::stream::{self, StreamExt};
enum StreamError {
Failed(usize),
}
// Stream that can produce errors
fn fallible_stream() -> impl Stream<Item = Result<String, StreamError>> {
stream::unfold(0, |count| async move {
if count >= 5 {
None // Stream ends
} else if count == 3 {
// Emit error but continue
Some((Err(StreamError::Failed(count)), count + 1))
} else {
Some((Ok(format!("Item {}", count)), count + 1))
}
})
}
#[tokio::main]
async fn main() {
let results: Vec<Result<String, StreamError>> = fallible_stream().collect().await;
for result in results {
match result {
Ok(item) => println!("Success: {}", item),
Err(e) => println!("Error: {:?}", e),
}
}
}The stream can yield Result values to handle errors without terminating.
use futures::stream::{self, StreamExt};
// Stream that ends on error
fn error_terminating_stream() -> impl Stream<Item = Result<String, String>> {
stream::unfold(0, |count| async move {
if count >= 5 {
None
} else if count == 3 {
// Return None to end stream, but we need a different approach
// for errors that terminate
None
} else {
Some((Ok(format!("Item {}", count)), count + 1))
}
})
}
// Alternative: use TryStream and try_take_while
use futures::stream::TryStreamExt;
fn try_stream_example() -> impl futures::stream::TryStream<Ok = String, Error = String> {
stream::unfold(0, |count| async move {
if count >= 5 {
None
} else if count == 3 {
Some((Err("Failed at 3".to_string()), count + 1))
} else {
Some((Ok(format!("Item {}", count)), count + 1))
}
})
.map_err(|e: Result<String, String>| e.unwrap_err())
}For error-terminating streams, consider using TryStream extensions.
use futures::stream::{self, StreamExt};
fn enhanced_stream() -> impl Stream<Item = i32> {
stream::unfold(0, |n| async move {
Some((n, n + 1))
})
.filter(|n| async move { n % 2 == 0 }) // Keep only even
.take(5) // Take first 5
.map(|n| n * 2) // Double each
}
#[tokio::main]
async fn main() {
let values: Vec<i32> = enhanced_stream().collect().await;
println!("{:?}", values); // [0, 4, 8, 12, 16]
}unfold streams work with all standard stream combinators.
use futures::stream::{Stream, Unfold};
use std::pin::Pin;
use std::task::{Context, Poll};
// The Unfold type returned by stream::unfold is roughly:
// struct Unfold<T, F, Fut> {
// state: Option<T>, // None means stream ended
// f: F, // The unfold function
// fut: Option<Fut>, // In-progress future
// }
// impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
// where
// F: FnMut(T) -> Fut,
// Fut: Future<Output = Option<(Item, T)>>,
// {
// type Item = Item;
//
// fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Item>> {
// // 1. If state is None, return None (stream ended)
// // 2. Poll the future
// // 3. If ready and Some((item, new_state)):
// // - Update state to new_state
// // - Return Poll::Ready(Some(item))
// // 4. If ready and None:
// // - Set state to None
// // - Return Poll::Ready(None)
// // 5. If pending: return Poll::Pending
// }
// }Unfold stores the state and manages the async computation lifecycle.
use futures::stream::{self, StreamExt};
// Tree structure to traverse
#[derive(Debug)]
struct TreeNode {
value: i32,
children: Vec<TreeNode>,
}
fn tree_nodes(root: TreeNode) -> impl Stream<Item = i32> {
stream::unfold(vec![root], |mut stack| async move {
// Pop from stack
if let Some(node) = stack.pop() {
// Push children in reverse order for depth-first
for child in node.children.into_iter().rev() {
stack.push(child);
}
Some((node.value, stack))
} else {
None
}
})
}
#[tokio::main]
async fn main() {
let tree = TreeNode {
value: 1,
children: vec![
TreeNode {
value: 2,
children: vec![
TreeNode { value: 4, children: vec![] },
TreeNode { value: 5, children: vec![] },
],
},
TreeNode {
value: 3,
children: vec![
TreeNode { value: 6, children: vec![] },
],
},
],
};
let values: Vec<i32> = tree_nodes(tree).collect().await;
println!("{:?}", values); // [1, 2, 4, 5, 3, 6]
}The state can be a stack or queue to implement traversal algorithms.
use futures::stream::{self, StreamExt};
// State with multiple tracked values
struct CounterState {
current: i32,
step: i32,
max: i32,
}
fn step_counter(start: i32, step: i32, max: i32) -> impl Stream<Item = i32> {
stream::unfold(
CounterState { current: start, step, max },
|state| async move {
if state.current <= state.max {
Some((state.current, CounterState {
current: state.current + state.step,
step: state.step,
max: state.max,
}))
} else {
None
}
},
)
}
#[tokio::main]
async fn main() {
let values: Vec<i32> = step_counter(0, 5, 20).collect().await;
println!("{:?}", values); // [0, 5, 10, 15, 20]
}Group related state variables into a struct for clarity.
use futures::stream::{self, StreamExt};
// iter: from known collection
fn from_iter() -> impl Stream<Item = i32> {
stream::iter(vec![1, 2, 3])
}
// once: single value
fn from_once() -> impl Stream<Item = i32> {
stream::once(async { 42 })
}
// repeat: infinite repetition
fn from_repeat() -> impl Stream<Item = i32> {
stream::repeat(5).take(3)
}
// unfold: stateful generation
fn from_unfold() -> impl Stream<Item = i32> {
stream::unfold(0, |n| async move {
if n < 3 {
Some((n, n + 1))
} else {
None
}
})
}
// When to use unfold:
// - Next value depends on previous state
// - Async computation needed for each value
// - Infinite or unknown length stream
// - State carries across iterationsUse unfold when stream values depend on previous state or require async computation.
use futures::stream::{self, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
// unfold is lazy - nothing happens until polled
let stream = stream::unfold(0, |n| async move {
println!("Computing {}", n);
tokio::time::sleep(Duration::from_millis(100)).await;
if n < 3 {
Some((n, n + 1))
} else {
None
}
});
println!("Stream created, nothing computed yet");
// Each value is computed on demand
let mut stream = std::pin::pin!(stream);
while let Some(value) = stream.next().await {
println!("Got: {}", value);
}
}unfold is lazyācomputation only happens when the stream is polled for the next value.
use futures::stream::{self, StreamExt};
#[derive(Debug, Clone)]
struct Record {
id: u64,
data: String,
}
struct QueryResult {
records: Vec<Record>,
last_key: Option<u64>,
}
async fn query_page(last_key: Option<u64>) -> QueryResult {
// Simulated database query
let start = last_key.map(|k| k + 1).unwrap_or(0);
if start >= 100 {
QueryResult {
records: vec![],
last_key: None,
}
} else {
QueryResult {
records: (start..start + 10)
.map(|id| Record {
id,
data: format!("Data for {}", id),
})
.collect(),
last_key: Some(start + 9),
}
}
}
fn query_all() -> impl Stream<Item = Record> {
stream::unfold(None, |last_key| async move {
let result = query_page(last_key).await;
if result.records.is_empty() {
None
} else {
Some((
futures::stream::iter(result.records),
result.last_key,
))
}
})
.flatten()
}
#[tokio::main]
async fn main() {
let records: Vec<Record> = query_all().collect().await;
println!("Fetched {} records", records.len());
}Database pagination with continuation tokens maps naturally to unfold.
use futures::stream::{self, StreamExt};
use std::io::BufRead;
async fn read_lines(file: std::fs::File) -> impl Stream<Item = String> {
let reader = std::io::BufReader::new(file);
let lines = reader.lines();
// Convert to stream
stream::unfold(lines, |mut lines| async move {
match lines.next() {
Some(Ok(line)) => Some((line, lines)),
Some(Err(_)) => None, // Error ends stream
None => None, // EOF
}
})
}File reading with state tracking for position and buffered reader.
use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
struct Event {
id: u64,
timestamp: Instant,
}
async fn poll_event(last_id: u64) -> Option<Event> {
// Simulate event polling
tokio::time::sleep(Duration::from_millis(50)).await;
if last_id < 10 {
Some(Event {
id: last_id + 1,
timestamp: Instant::now(),
})
} else {
None
}
}
fn event_stream() -> impl Stream<Item = Event> {
stream::unfold(0, |last_id| async move {
match poll_event(last_id).await {
Some(event) => Some((event, event.id)),
None => None,
}
})
}
#[tokio::main]
async fn main() {
let events: Vec<Event> = event_stream().collect().await;
for event in events {
println!("Event {} at {:?}", event.id, event.timestamp);
}
}Event polling uses state to track the last seen event ID.
unfold components:
| Component | Purpose |
|-----------|---------|
| Initial state | Starting point for computation |
| Async function | Produces elements and new states |
| Some((item, state)) | Yield item, continue with new state |
| None | End the stream |
When to use unfold:
| Use Case | Why unfold | |----------|------------| | Pagination | State tracks continuation token | | Stateful generation | State carries between iterations | | Async-driven streams | Function can await futures | | Tree/graph traversal | State can be a work queue | | Event polling | State tracks position/timestamp | | Infinite sequences | Never returns None | | Lazy evaluation | Computation on demand only |
Common patterns:
| Pattern | State Type |
|---------|------------|
| Counter | i32 or usize |
| Pagination | Option<Cursor> |
| Fibonacci | (u64, u64) or struct |
| Tree traversal | Vec<Node> (stack) |
| Time-based | Instant or Duration |
Key insight: futures::stream::unfold bridges stateful computation and stream abstraction by encoding the state machine in the closure's captured state. Each element is generated lazily by invoking the async function with the current state, and the function controls both the yielded element and the next state. This enables elegant expression of pagination, recursive traversal, and stateful sequences without explicit state managementāthe state is implicitly threaded through each iteration. The async nature allows I/O operations between elements, making it ideal for database pagination, event polling, and other async-driven sequences. The stream ends when the function returns None, giving precise control over termination. Combined with stream combinators like filter, map, and take, unfold provides a powerful foundation for lazy, async, stateful iteration.