How does futures::stream::unfold create streams from async state machines?

futures::stream::unfold creates a stream by repeatedly applying an async function to a state value, producing elements until the function returns None. The function takes the current state and returns Option<(Item, NextState)>Some yields an element and continues with the new state, while None terminates the stream. This pattern transforms stateful computation into a stream abstraction, enabling lazy, pull-based iteration over values generated from previous results. The state machine captures all variables needed for future iterations, making unfold ideal for pagination, recursive traversal, and stateful generators.

What is unfold?

use futures::stream::{Stream, unfold};
 
// unfold signature:
pub fn unfold<T, F, Fut, Item>(init: T, f: F) -> Unfold<T, F, Fut>
where
    F: FnMut(T) -> Fut,
    Fut: Future<Output = Option<(Item, T)>>,
{
    // ...
}
 
// T = state type
// F = async function that takes state, returns Option<(Item, new_state)>
// Item = stream element type
// None = stream ends

unfold creates a stream from an initial state and an async function that produces elements and new states.

Basic Stream Creation

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Create a stream of numbers 0, 1, 2, 3, 4
    let stream = stream::unfold(0, |state| async move {
        if state < 5 {
            Some((state, state + 1))
        } else {
            None // Stream ends
        }
    });
    
    let values: Vec<i32> = stream.collect().await;
    println!("{:?}", values); // [0, 1, 2, 3, 4]
}

The stream starts with state 0, yields each value, and increments state until state >= 5.

How the State Machine Works

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Each iteration:
    // 1. Call the unfold function with current state
    // 2. Await the resulting future
    // 3. If Some((item, new_state)):
    //    - Yield item to the stream
    //    - Set state = new_state
    //    - Repeat from step 1
    // 4. If None: Stream ends
    
    let stream = stream::unfold(1, |state| async move {
        println!("Current state: {}", state);
        if state <= 3 {
            let next_state = state * 2;
            Some((state, next_state))
        } else {
            None
        }
    });
    
    // Output:
    // Current state: 1
    // Current state: 2
    // Current state: 4
    // [1, 2, 4]
    
    let values: Vec<i32> = stream.collect().await;
    println!("{:?}", values);
}

The state carries information between iterations—the next state can depend on the current state.

Finite vs Infinite Streams

use futures::stream::{self, StreamExt};
 
// Finite stream: ends when function returns None
fn finite_stream() -> impl Stream<Item = i32> {
    stream::unfold(0, |n| async move {
        if n < 10 {
            Some((n, n + 1))
        } else {
            None
        }
    })
}
 
// Infinite stream: never returns None
fn infinite_stream() -> impl Stream<Item = i32> {
    stream::unfold(0, |n| async move {
        Some((n, n + 1))
    })
}
 
#[tokio::main]
async fn main() {
    // Finite: collect all
    let finite: Vec<i32> = finite_stream().collect().await;
    println!("Finite: {:?}", finite);
    
    // Infinite: must limit somehow
    let infinite: Vec<i32> = infinite_stream()
        .take(10)
        .collect()
        .await;
    println!("Infinite (first 10): {:?}", infinite);
}

unfold can create both finite and infinite streams—the difference is whether the function can return None.

State Can Be Complex

use futures::stream::{self, StreamExt};
 
#[derive(Debug)]
struct FibonacciState {
    current: u64,
    next: u64,
}
 
fn fibonacci_stream() -> impl Stream<Item = u64> {
    stream::unfold(
        FibonacciState { current: 0, next: 1 },
        |state| async move {
            let current = state.current;
            let next = state.next;
            
            Some((
                current,
                FibonacciState {
                    current: next,
                    next: current + next,
                },
            ))
        },
    )
}
 
#[tokio::main]
async fn main() {
    let fibs: Vec<u64> = fibonacci_stream()
        .take(10)
        .collect()
        .await;
    println!("Fibonacci: {:?}", fibs);
    // Fibonacci: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
}

State can be any type, enabling complex stateful computations.

Capturing External State

use futures::stream::{self, StreamExt};
 
fn countdown_stream(start: i32) -> impl Stream<Item = i32> {
    // The start value is captured by the async block
    stream::unfold(start, |current| async move {
        if current >= 0 {
            Some((current, current - 1))
        } else {
            None
        }
    })
}
 
#[tokio::main]
async fn main() {
    let values: Vec<i32> = countdown_stream(5).collect().await;
    println!("{:?}", values); // [5, 4, 3, 2, 1, 0]
}

Variables from the outer scope are captured into the state machine.

Async Operations in unfold

use futures::stream::{self, StreamExt};
use std::time::Duration;
 
async fn fetch_page(page: u32) -> Vec<String> {
    // Simulate async fetch
    tokio::time::sleep(Duration::from_millis(100)).await;
    (0..3).map(|i| format!("Page {} Item {}", page, i)).collect()
}
 
fn paginated_stream() -> impl Stream<Item = String> {
    stream::unfold(1, |page| async move {
        let items = fetch_page(page).await;
        
        if items.is_empty() || page > 3 {
            None
        } else {
            // Create a stream of items, then return the next page state
            let next_page = page + 1;
            Some((
                futures::stream::iter(items),
                next_page,
            ))
        }
    })
    .flatten()
}
 
#[tokio::main]
async fn main() {
    let items: Vec<String> = paginated_stream().collect().await;
    for item in items {
        println!("{}", item);
    }
}

The unfold function can perform async operations, making it ideal for I/O-driven streams.

Pagination Pattern

use futures::stream::{self, StreamExt};
 
#[derive(Debug)]
struct Page<T> {
    items: Vec<T>,
    next_token: Option<String>,
}
 
async fn fetch_items(token: Option<String>) -> Page<String> {
    // Simulated API call
    let start = token
        .and_then(|t| t.parse::<usize>().ok())
        .unwrap_or(0);
    
    if start >= 10 {
        Page {
            items: vec![],
            next_token: None,
        }
    } else {
        Page {
            items: (start..start + 3).map(|i| format!("Item {}", i)).collect(),
            next_token: Some((start + 3).to_string()),
        }
    }
}
 
fn paginated_items() -> impl Stream<Item = String> {
    stream::unfold(None, |token| async move {
        let page = fetch_items(token).await;
        
        if page.items.is_empty() {
            None
        } else {
            Some((
                futures::stream::iter(page.items),
                page.next_token,
            ))
        }
    })
    .flatten()
}
 
#[tokio::main]
async fn main() {
    let items: Vec<String> = paginated_items().collect().await;
    println!("{:?}", items);
    // ["Item 0", "Item 1", "Item 2", "Item 3", ...]
}

Pagination uses state to track the continuation token between API calls.

Error Handling with unfold

use futures::stream::{self, StreamExt};
 
enum StreamError {
    Failed(usize),
}
 
// Stream that can produce errors
fn fallible_stream() -> impl Stream<Item = Result<String, StreamError>> {
    stream::unfold(0, |count| async move {
        if count >= 5 {
            None // Stream ends
        } else if count == 3 {
            // Emit error but continue
            Some((Err(StreamError::Failed(count)), count + 1))
        } else {
            Some((Ok(format!("Item {}", count)), count + 1))
        }
    })
}
 
#[tokio::main]
async fn main() {
    let results: Vec<Result<String, StreamError>> = fallible_stream().collect().await;
    for result in results {
        match result {
            Ok(item) => println!("Success: {}", item),
            Err(e) => println!("Error: {:?}", e),
        }
    }
}

The stream can yield Result values to handle errors without terminating.

Early Termination with Errors

use futures::stream::{self, StreamExt};
 
// Stream that ends on error
fn error_terminating_stream() -> impl Stream<Item = Result<String, String>> {
    stream::unfold(0, |count| async move {
        if count >= 5 {
            None
        } else if count == 3 {
            // Return None to end stream, but we need a different approach
            // for errors that terminate
            None
        } else {
            Some((Ok(format!("Item {}", count)), count + 1))
        }
    })
}
 
// Alternative: use TryStream and try_take_while
use futures::stream::TryStreamExt;
 
fn try_stream_example() -> impl futures::stream::TryStream<Ok = String, Error = String> {
    stream::unfold(0, |count| async move {
        if count >= 5 {
            None
        } else if count == 3 {
            Some((Err("Failed at 3".to_string()), count + 1))
        } else {
            Some((Ok(format!("Item {}", count)), count + 1))
        }
    })
    .map_err(|e: Result<String, String>| e.unwrap_err())
}

For error-terminating streams, consider using TryStream extensions.

Combining with Other Stream Methods

use futures::stream::{self, StreamExt};
 
fn enhanced_stream() -> impl Stream<Item = i32> {
    stream::unfold(0, |n| async move {
        Some((n, n + 1))
    })
    .filter(|n| async move { n % 2 == 0 })  // Keep only even
    .take(5)                                  // Take first 5
    .map(|n| n * 2)                           // Double each
}
 
#[tokio::main]
async fn main() {
    let values: Vec<i32> = enhanced_stream().collect().await;
    println!("{:?}", values); // [0, 4, 8, 12, 16]
}

unfold streams work with all standard stream combinators.

State Machine Implementation

use futures::stream::{Stream, Unfold};
use std::pin::Pin;
use std::task::{Context, Poll};
 
// The Unfold type returned by stream::unfold is roughly:
// struct Unfold<T, F, Fut> {
//     state: Option<T>,    // None means stream ended
//     f: F,                // The unfold function
//     fut: Option<Fut>,    // In-progress future
// }
 
// impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
// where
//     F: FnMut(T) -> Fut,
//     Fut: Future<Output = Option<(Item, T)>>,
// {
//     type Item = Item;
//
//     fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Item>> {
//         // 1. If state is None, return None (stream ended)
//         // 2. Poll the future
//         // 3. If ready and Some((item, new_state)):
//         //    - Update state to new_state
//         //    - Return Poll::Ready(Some(item))
//         // 4. If ready and None:
//         //    - Set state to None
//         //    - Return Poll::Ready(None)
//         // 5. If pending: return Poll::Pending
//     }
// }

Unfold stores the state and manages the async computation lifecycle.

Recursive State Transitions

use futures::stream::{self, StreamExt};
 
// Tree structure to traverse
#[derive(Debug)]
struct TreeNode {
    value: i32,
    children: Vec<TreeNode>,
}
 
fn tree_nodes(root: TreeNode) -> impl Stream<Item = i32> {
    stream::unfold(vec![root], |mut stack| async move {
        // Pop from stack
        if let Some(node) = stack.pop() {
            // Push children in reverse order for depth-first
            for child in node.children.into_iter().rev() {
                stack.push(child);
            }
            Some((node.value, stack))
        } else {
            None
        }
    })
}
 
#[tokio::main]
async fn main() {
    let tree = TreeNode {
        value: 1,
        children: vec![
            TreeNode {
                value: 2,
                children: vec![
                    TreeNode { value: 4, children: vec![] },
                    TreeNode { value: 5, children: vec![] },
                ],
            },
            TreeNode {
                value: 3,
                children: vec![
                    TreeNode { value: 6, children: vec![] },
                ],
            },
        ],
    };
    
    let values: Vec<i32> = tree_nodes(tree).collect().await;
    println!("{:?}", values); // [1, 2, 4, 5, 3, 6]
}

The state can be a stack or queue to implement traversal algorithms.

Multiple State Values

use futures::stream::{self, StreamExt};
 
// State with multiple tracked values
struct CounterState {
    current: i32,
    step: i32,
    max: i32,
}
 
fn step_counter(start: i32, step: i32, max: i32) -> impl Stream<Item = i32> {
    stream::unfold(
        CounterState { current: start, step, max },
        |state| async move {
            if state.current <= state.max {
                Some((state.current, CounterState {
                    current: state.current + state.step,
                    step: state.step,
                    max: state.max,
                }))
            } else {
                None
            }
        },
    )
}
 
#[tokio::main]
async fn main() {
    let values: Vec<i32> = step_counter(0, 5, 20).collect().await;
    println!("{:?}", values); // [0, 5, 10, 15, 20]
}

Group related state variables into a struct for clarity.

Comparison with Other Stream Creation Methods

use futures::stream::{self, StreamExt};
 
// iter: from known collection
fn from_iter() -> impl Stream<Item = i32> {
    stream::iter(vec![1, 2, 3])
}
 
// once: single value
fn from_once() -> impl Stream<Item = i32> {
    stream::once(async { 42 })
}
 
// repeat: infinite repetition
fn from_repeat() -> impl Stream<Item = i32> {
    stream::repeat(5).take(3)
}
 
// unfold: stateful generation
fn from_unfold() -> impl Stream<Item = i32> {
    stream::unfold(0, |n| async move {
        if n < 3 {
            Some((n, n + 1))
        } else {
            None
        }
    })
}
 
// When to use unfold:
// - Next value depends on previous state
// - Async computation needed for each value
// - Infinite or unknown length stream
// - State carries across iterations

Use unfold when stream values depend on previous state or require async computation.

Backpressure and Laziness

use futures::stream::{self, StreamExt};
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    // unfold is lazy - nothing happens until polled
    let stream = stream::unfold(0, |n| async move {
        println!("Computing {}", n);
        tokio::time::sleep(Duration::from_millis(100)).await;
        
        if n < 3 {
            Some((n, n + 1))
        } else {
            None
        }
    });
    
    println!("Stream created, nothing computed yet");
    
    // Each value is computed on demand
    let mut stream = std::pin::pin!(stream);
    
    while let Some(value) = stream.next().await {
        println!("Got: {}", value);
    }
}

unfold is lazy—computation only happens when the stream is polled for the next value.

Real-World Example: Database Pagination

use futures::stream::{self, StreamExt};
 
#[derive(Debug, Clone)]
struct Record {
    id: u64,
    data: String,
}
 
struct QueryResult {
    records: Vec<Record>,
    last_key: Option<u64>,
}
 
async fn query_page(last_key: Option<u64>) -> QueryResult {
    // Simulated database query
    let start = last_key.map(|k| k + 1).unwrap_or(0);
    
    if start >= 100 {
        QueryResult {
            records: vec![],
            last_key: None,
        }
    } else {
        QueryResult {
            records: (start..start + 10)
                .map(|id| Record {
                    id,
                    data: format!("Data for {}", id),
                })
                .collect(),
            last_key: Some(start + 9),
        }
    }
}
 
fn query_all() -> impl Stream<Item = Record> {
    stream::unfold(None, |last_key| async move {
        let result = query_page(last_key).await;
        
        if result.records.is_empty() {
            None
        } else {
            Some((
                futures::stream::iter(result.records),
                result.last_key,
            ))
        }
    })
    .flatten()
}
 
#[tokio::main]
async fn main() {
    let records: Vec<Record> = query_all().collect().await;
    println!("Fetched {} records", records.len());
}

Database pagination with continuation tokens maps naturally to unfold.

Real-World Example: File Line Reader

use futures::stream::{self, StreamExt};
use std::io::BufRead;
 
async fn read_lines(file: std::fs::File) -> impl Stream<Item = String> {
    let reader = std::io::BufReader::new(file);
    let lines = reader.lines();
    
    // Convert to stream
    stream::unfold(lines, |mut lines| async move {
        match lines.next() {
            Some(Ok(line)) => Some((line, lines)),
            Some(Err(_)) => None, // Error ends stream
            None => None, // EOF
        }
    })
}

File reading with state tracking for position and buffered reader.

Real-World Example: Event Source with Timeout

use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
 
struct Event {
    id: u64,
    timestamp: Instant,
}
 
async fn poll_event(last_id: u64) -> Option<Event> {
    // Simulate event polling
    tokio::time::sleep(Duration::from_millis(50)).await;
    
    if last_id < 10 {
        Some(Event {
            id: last_id + 1,
            timestamp: Instant::now(),
        })
    } else {
        None
    }
}
 
fn event_stream() -> impl Stream<Item = Event> {
    stream::unfold(0, |last_id| async move {
        match poll_event(last_id).await {
            Some(event) => Some((event, event.id)),
            None => None,
        }
    })
}
 
#[tokio::main]
async fn main() {
    let events: Vec<Event> = event_stream().collect().await;
    for event in events {
        println!("Event {} at {:?}", event.id, event.timestamp);
    }
}

Event polling uses state to track the last seen event ID.

Synthesis

unfold components:

Component Purpose
Initial state Starting point for computation
Async function Produces elements and new states
Some((item, state)) Yield item, continue with new state
None End the stream

When to use unfold:

Use Case Why unfold
Pagination State tracks continuation token
Stateful generation State carries between iterations
Async-driven streams Function can await futures
Tree/graph traversal State can be a work queue
Event polling State tracks position/timestamp
Infinite sequences Never returns None
Lazy evaluation Computation on demand only

Common patterns:

Pattern State Type
Counter i32 or usize
Pagination Option<Cursor>
Fibonacci (u64, u64) or struct
Tree traversal Vec<Node> (stack)
Time-based Instant or Duration

Key insight: futures::stream::unfold bridges stateful computation and stream abstraction by encoding the state machine in the closure's captured state. Each element is generated lazily by invoking the async function with the current state, and the function controls both the yielded element and the next state. This enables elegant expression of pagination, recursive traversal, and stateful sequences without explicit state management—the state is implicitly threaded through each iteration. The async nature allows I/O operations between elements, making it ideal for database pagination, event polling, and other async-driven sequences. The stream ends when the function returns None, giving precise control over termination. Combined with stream combinators like filter, map, and take, unfold provides a powerful foundation for lazy, async, stateful iteration.