How does futures::stream::unfold create streams from stateful async functions?

futures::stream::unfold creates a stream by repeatedly calling an async function with state that evolves with each element yielded, returning Some((item, next_state)) to continue the stream or None to terminate it. This pattern lets you generate streams from computation rather than static data, maintaining state across iterations without explicit mutable state management, making it ideal for pagination, polling, sequences, and any scenario where each element depends on previous computation.

Basic unfold Syntax

use futures::stream::{self, Stream};
 
fn basic_unfold() {
    // unfold takes:
    // 1. Initial state (T)
    // 2. Async function: |state| -> impl Future<Output = Option<(Item, NextState)>>
    
    let stream = stream::unfold(0, |count| async move {
        if count < 3 {
            // Yield an item and the next state
            Some((format!("Item {}", count), count + 1))
        } else {
            // None terminates the stream
            None
        }
    });
    
    // This creates a stream that yields:
    // "Item 0", "Item 1", "Item 2"
    // Then terminates
}

unfold takes initial state and an async closure that returns Some((item, new_state)) to continue or None to end.

The State Transition Pattern

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn state_transitions() {
    // State transitions: 0 -> 1 -> 2 -> 3 -> Done
    let stream = stream::unfold(0, |state| async move {
        match state {
            n if n < 3 => {
                println!("State: {}, yielding item", n);
                Some((n, n + 1))  // yield n, next state is n+1
            }
            _ => {
                println!("State: {}, terminating", state);
                None  // terminate
            }
        }
    });
    
    let collected: Vec<_> = stream.collect().await;
    println!("Collected: {:?}", collected);  // [0, 1, 2]
}

Each iteration receives the previous state and returns the next state along with an item.

Generating Sequences

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn sequences() {
    // Fibonacci sequence
    let fib_stream = stream::unfold((0, 1), |(a, b)| async move {
        Some((a, (b, a + b)))
    });
    
    // This would run forever, so take first 10
    let first_10: Vec<_> = fib_stream.take(10).collect().await;
    println!("Fibonacci: {:?}", first_10);
    // [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
}

unfold can generate infinite sequences when the closure always returns Some.

Pagination Pattern

use futures::stream::{self, StreamExt};
 
#[derive(Debug)]
struct Page {
    items: Vec<String>,
    has_next: bool,
    next_token: Option<String>,
}
 
async fn fetch_page(token: Option<String>) -> Page {
    // Simulated API call
    match token {
        None => Page {
            items: vec!["a".to_string(), "b".to_string()],
            has_next: true,
            next_token: Some("page2".to_string()),
        },
        Some(t) if t == "page2" => Page {
            items: vec!["c".to_string(), "d".to_string()],
            has_next: true,
            next_token: Some("page3".to_string()),
        },
        Some(t) if t == "page3" => Page {
            items: vec!["e".to_string()],
            has_next: false,
            next_token: None,
        },
        _ => panic!("unexpected token"),
    }
}
 
#[tokio::main]
async fn pagination_example() {
    // State: Option<String> (pagination token)
    let all_items: Vec<String> = stream::unfold(None, |token| async move {
        let page = fetch_page(token.clone()).await;
        
        if page.has_next {
            // Continue with items and next token
            Some((page.items, page.next_token))
        } else if !page.items.is_empty() {
            // Last page with items, but no next page
            Some((page.items, None))  // Yield items, then None triggers termination
        } else {
            // No more items, terminate
            None
        }
    })
    .flat_map(|items| stream::iter(items))
    .collect()
    .await;
    
    println!("All items: {:?}", all_items);
    // ["a", "b", "c", "d", "e"]
}

unfold naturally handles pagination with the continuation token as state.

Proper Pagination Termination

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn pagination_termination() {
    // Cleaner pagination pattern
    let stream = stream::unfold(None as Option<String>, |token| async move {
        let page = fetch_page(token).await;
        
        let next_token = if page.has_next {
            page.next_token
        } else {
            None  // This will cause termination on next iteration
        };
        
        if page.items.is_empty() && next_token.is_none() {
            // Terminate immediately
            None
        } else {
            // Yield items and continue (or terminate next time)
            Some((page.items, next_token))
        }
    });
    
    // Each stream item is a Vec<String>
    // Use flat_map to flatten pages into items
    let items: Vec<String> = stream
        .flat_map(|items| stream::iter(items))
        .collect()
        .await;
}

Handle empty pages and final termination explicitly in the unfold logic.

Polling Pattern

use futures::stream::{self, StreamExt};
use std::time::Duration;
 
#[derive(Debug)]
struct PollResult {
    status: String,
    done: bool,
}
 
async fn check_status() -> PollResult {
    // Simulated polling
    static mut COUNT: u32 = 0;
    unsafe {
        COUNT += 1;
        if COUNT >= 3 {
            PollResult { status: "completed".into(), done: true }
        } else {
            PollResult { status: "pending".into(), done: false }
        }
    }
}
 
#[tokio::main]
async fn polling_example() {
    let poll_stream = stream::unfold((), |_| async move {
        tokio::time::sleep(Duration::from_millis(100)).await;
        
        let result = check_status().await;
        
        if result.done {
            Some((result, None))  // Yield final result, then terminate
        } else {
            Some((result, Some(())))  // Yield, continue polling
        }
    });
    
    // Take until we see done
    let statuses: Vec<_> = poll_stream
        .take_while(|r| futures::future::ready(!r.done))
        .collect()
        .await;
    
    println!("Statuses: {:?}", statuses);
}

Use unfold for polling scenarios where you check status repeatedly.

Async State Operations

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn async_state() {
    // State can be complex and async operations can update it
    let stream = stream::unfold(
        vec![1, 2, 3, 4, 5],
        |mut numbers| async move {
            if numbers.is_empty() {
                None
            } else {
                // Async operation on state
                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
                
                let number = numbers.remove(0);
                let doubled = number * 2;
                
                Some((doubled, numbers))
            }
        }
    );
    
    let results: Vec<_> = stream.collect().await;
    println!("Doubled: {:?}", results);  // [2, 4, 6, 8, 10]
}

The state mutation happens inside the async closure, allowing async state updates.

Comparison with iter and successors

use futures::stream::{self, StreamExt, Successors};
 
#[tokio::main]
async fn comparison() {
    // stream::iter: static collection
    let static_stream = stream::iter(vec![1, 2, 3]);
    // Yields elements from existing collection
    // No state, no async
    
    // stream::successors: pure function, no async
    let successor_stream = stream::successors(Some(1), |n| Some(n + 1));
    // Yields 1, 2, 3, 4, ... infinitely
    // Function is synchronous: fn(Option<T>) -> Option<T>
    
    // stream::unfold: async function with state
    let unfold_stream = stream::unfold(1, |n| async move {
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
        if n <= 5 {
            Some((n, n + 1))
        } else {
            None
        }
    });
    
    // Key difference:
    // - iter: known collection, no computation
    // - successors: synchronous computation
    // - unfold: async computation
}

Use iter for static data, successors for sync computation, unfold for async computation.

Error Handling in unfold

use futures::stream::{self, StreamExt};
 
#[derive(Debug)]
enum StreamError {
    Failed(String),
}
 
#[tokio::main]
async fn error_handling() {
    // unfold produces Stream, so use StreamExt methods for errors
    let stream = stream::unfold(0, |count| async move {
        if count >= 5 {
            None  // Terminate normally
        } else if count == 3 {
            // Can't return Err from unfold directly
            // Must handle errors in stream items or use separate stream type
            None  // Or yield an error variant
        } else {
            Some((count, count + 1))
        }
    });
    
    // Option 1: Yield Result types
    let result_stream = stream::unfold(0, |count| async move {
        if count >= 5 {
            None
        } else if count == 3 {
            Some((Err(StreamError::Failed("at 3".into())), count + 1))
        } else {
            Some((Ok(count), count + 1))
        }
    });
    
    // Filter/collect results
    let results: Vec<_> = result_stream.collect().await;
    println!("Results: {:?}", results);
}

Handle errors by yielding Result items, then filter or handle them downstream.

Multiple State Components

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn complex_state() {
    // State can be a struct or tuple
    let stream = stream::unfold(
        (0, 1.0),  // (counter, factor)
        |(count, factor)| async move {
            if count >= 5 {
                None
            } else {
                let value = count as f64 * factor;
                Some((value, (count + 1, factor * 1.5)))
            }
        }
    );
    
    let values: Vec<_> = stream.collect().await;
    println!("Values: {:?}", values);
    // [0.0, 1.5, 4.5, 10.125, 20.25]
    // count * factor at each step
}

Use tuples or structs to maintain multiple pieces of state.

Rate Limiting with State

use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
 
#[tokio::main]
async fn rate_limiting() {
    // State tracks rate limiting
    let stream = stream::unfold(
        (0u32, Instant::now()),
        |(count, last_time)| async move {
            // Ensure at least 100ms between items
            let elapsed = last_time.elapsed();
            if elapsed < Duration::from_millis(100) {
                tokio::time::sleep(Duration::from_millis(100) - elapsed).await;
            }
            
            if count >= 10 {
                None
            } else {
                Some((count, (count + 1, Instant::now())))
            }
        }
    );
    
    // Each item is yielded with rate limiting
    let items: Vec<_> = stream.collect().await;
}

Use state to track timing, counts, or other rate-limiting information.

Unfold with External State

use futures::stream::{self, StreamExt};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
 
#[tokio::main]
async fn external_state() {
    // External state can be captured by the closure
    let counter = Arc::new(AtomicU32::new(0));
    
    let stream = stream::unfold(0, {
        let counter = counter.clone();
        move |n| {
            let counter = counter.clone();
            async move {
                counter.fetch_add(1, Ordering::SeqCst);
                
                if n >= 5 {
                    None
                } else {
                    Some((n, n + 1))
                }
            }
        }
    });
    
    // External state is modified as stream progresses
    let items: Vec<_> = stream.collect().await;
    println!("Counter: {}", counter.load(Ordering::SeqCst));  // 6 (0-5 iterations)
}

The closure can capture external state for coordination with other code.

Creating Custom Stream Types

use futures::stream::{Stream, Unfold};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
 
// unfold returns an Unfold type
// You can use it as a building block for custom streams
 
#[tokio::main]
async fn custom_stream() {
    // Create a reusable "stream generator" pattern
    fn make_counter_stream(start: u32, end: u32) -> impl Stream<Item = u32> {
        stream::unfold(start, move |current| async move {
            if current < end {
                Some((current, current + 1))
            } else {
                None
            }
        })
    }
    
    let stream1 = make_counter_stream(0, 3);
    let stream2 = make_counter_stream(10, 15);
    
    let v1: Vec<_> = stream1.collect().await;
    let v2: Vec<_> = stream2.collect().await;
    
    println!("Stream 1: {:?}", v1);  // [0, 1, 2]
    println!("Stream 2: {:?}", v2);  // [10, 11, 12, 13, 14]
}

unfold lets you create reusable stream generators with parameters.

Cancellation and Cleanup

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn cancellation() {
    let stream = stream::unfold(0, |count| async move {
        // Simulate work
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
        
        println!("Generating item {}", count);
        Some((count, count + 1))
    });
    
    // Use take() to limit, or take_until() for conditional
    let items: Vec<_> = stream.take(3).collect().await;
    println!("Collected: {:?}", items);
    
    // When take() is satisfied, the unfold closure is no longer called
    // No explicit cleanup - the future is just dropped
}

Streams created with unfold can be cancelled using standard StreamExt methods.

Synthesis

The unfold pattern:

stream::unfold(initial_state, |state| async move {
    // Compute next value and next state
    if should_continue(&state) {
        Some((next_item, next_state))  // Yield and continue
    } else {
        None  // Terminate
    }
})

When to use unfold:

// Pagination
stream::unfold(None, |token| async move {
    let page = fetch_page(token).await;
    if page.has_next {
        Some((page.items, page.next_token))
    } else {
        Some((page.items, None))  // Last page
    }
});
 
// Polling
stream::unfold((), |_| async move {
    sleep(Duration::from_secs(1)).await;
    let status = check_status().await;
    Some((status, ()))
});
 
// Sequences (async)
stream::unfold(0, |n| async move {
    let next = async_computation(n).await;
    Some((n, n + 1))
});
 
// State machines
stream::unfold(State::Initial, |state| async move {
    match state.transition().await {
        Some((output, next_state)) => Some((output, next_state)),
        None => None,  // Terminal state
    }
});

Key insight: unfold bridges the gap between computation and streams. It's the asynchronous equivalent of iterator adapters—you provide a state transformation function, and unfold turns it into a stream. The state machine pattern (yield Some to continue, None to stop) naturally fits pagination, polling, sequence generation, and any scenario where each output depends on previous computation. Because the closure is async, you can perform I/O between items—fetching pages, polling APIs, or waiting between emissions—while maintaining clean state management without explicit mutable variables or complex control flow.