When using futures::stream::unfold, how do you manage state between iterations?

futures::stream::unfold creates a stream from an initial state and a function that produces the next element along with an updated state. The state is passed from one iteration to the next through the function's return value: the closure receives the current state, returns an Option<(Item, State)>, and the returned state becomes the input for the next call. This creates a functional iteration pattern where state is explicitly threaded through each step rather than stored in a mutable variable. The stream terminates when the closure returns None, discarding the final state.

Basic unfold State Pattern

use futures::stream::{Stream, unfold};
 
fn main() {
    // unfold takes: initial_state, async_function
    // Function signature: State -> Option<(Item, NextState)>
    
    let stream = unfold(0, |state| async move {
        if state < 5 {
            // Continue: return Some((item, next_state))
            Some((state, state + 1))
        } else {
            // Terminate: return None
            None
        }
    });
    
    // State flows: 0 -> 1 -> 2 -> 3 -> 4 -> 5 -> None
    // Items emitted: 0, 1, 2, 3, 4
    
    futures::executor::block_on(async {
        futures::pin_mut!(stream);
        while let Some(value) = futures::StreamExt::next(&mut stream).await {
            println!("Value: {}", value);
        }
    });
}

State is passed as the first argument to the closure and returned as part of the next state tuple.

State as Accumulator

use futures::stream::{Stream, unfold};
 
fn main() {
    // Track running sum as state
    let stream = unfold((0i32, 0i32), |(count, sum)| async move {
        if count < 10 {
            let new_sum = sum + count;
            Some((new_sum, (count + 1, new_sum)))
        } else {
            None
        }
    });
    
    futures::executor::block_on(async {
        futures::pin_mut!(stream);
        while let Some(value) = futures::StreamExt::next(&mut stream).await {
            println!("Running sum: {}", value);
        }
    });
}

Multiple pieces of state can be combined into a tuple.

Async State Updates

use futures::stream::{Stream, unfold};
 
struct PageState {
    page: u32,
    total_items: u32,
}
 
async fn fetch_page(page: u32) -> Vec<String> {
    // Simulate async fetch
    format!("Page {} data", page).into()
}
 
fn main() {
    let stream = unfold(PageState { page: 0, total_items: 0 }, |state| async move {
        let items = fetch_page(state.page).await;
        
        if items.is_empty() {
            None
        } else {
            let new_total = state.total_items + items.len() as u32;
            let next_state = PageState {
                page: state.page + 1,
                total_items: new_total,
            };
            Some((items, next_state))
        }
    });
    
    // State includes both current page and accumulated count
    // Each async iteration can perform I/O before updating state
}

State can be updated after async operations complete.

Complex State with Enums

use futures::stream::{Stream, unfold};
 
enum PaginationState {
    Starting,
    Fetching { page: u32, items: Vec<String> },
    Done,
}
 
struct PaginatedData {
    all_items: Vec<String>,
}
 
fn main() {
    let stream = unfold(PaginationState::Starting, |state| async move {
        match state {
            PaginationState::Starting => {
                // Transition to fetching first page
                let items = vec!["item1".to_string(), "item2".to_string()];
                let next_state = PaginationState::Fetching { page: 1, items };
                Some((vec!["starting".to_string()], next_state))
            }
            PaginationState::Fetching { page, items } => {
                if page < 3 {
                    let mut new_items = items.clone();
                    new_items.push(format!("item_page_{}", page));
                    let next_state = PaginationState::Fetching { 
                        page: page + 1, 
                        items: new_items 
                    };
                    Some((items, next_state))
                } else {
                    let next_state = PaginationState::Done;
                    Some((items, next_state))
                }
            }
            PaginationState::Done => {
                None
            }
        }
    });
}

Enums encode different states with different data requirements.

State Transition Patterns

use futures::stream::{Stream, unfold};
 
// State machine pattern
enum MachineState {
    Idle,
    Processing { data: Vec<i32>, step: usize },
    Complete { result: i32 },
}
 
fn main() {
    let stream = unfold(MachineState::Idle, |state| async move {
        match state {
            MachineState::Idle => {
                // Initialize and transition
                let next = MachineState::Processing { 
                    data: vec![1, 2, 3, 4, 5], 
                    step: 0 
                };
                Some((0, next))
            }
            MachineState::Processing { data, step } => {
                if step < data.len() {
                    let value = data[step];
                    let next = MachineState::Processing { 
                        data: data.clone(), 
                        step: step + 1 
                    };
                    Some((value, next))
                } else {
                    let sum: i32 = data.iter().sum();
                    let next = MachineState::Complete { result: sum };
                    Some((sum, next))
                }
            }
            MachineState::Complete { .. } => {
                None
            }
        }
    });
}

State machines with explicit transitions work naturally with unfold.

Capturing External State

use futures::stream::{Stream, unfold};
use std::sync::Arc;
use tokio::sync::Mutex;
 
#[tokio::main]
async fn main() {
    // External shared state
    let counter = Arc::new(Mutex::new(0u32));
    
    // State can include Arc references
    let counter_clone = counter.clone();
    let stream = unfold(0u32, move |local_state| {
        let counter = counter_clone.clone();
        async move {
            if local_state < 5 {
                // Update shared state
                let mut guard = counter.lock().await;
                *guard += 1;
                let shared_value = *guard;
                
                // Combine shared and local state
                let item = (local_state, shared_value);
                let next_state = local_state + 1;
                Some((item, next_state))
            } else {
                None
            }
        }
    });
    
    futures::pin_mut!(stream);
    while let Some((local, shared)) = futures::StreamExt::next(&mut stream).await {
        println!("Local: {}, Shared: {}", local, shared);
    }
}

State can include references to external shared state via Arc.

State Ownership and Moves

use futures::stream::{Stream, unfold};
 
struct OwnedState {
    buffer: Vec<u8>,
    position: usize,
}
 
fn main() {
    let stream = unfold(
        OwnedState { 
            buffer: vec![1, 2, 3, 4, 5], 
            position: 0 
        },
        |state| async move {
            // State is moved into the closure
            // Must either return it or consume it
            
            if state.position < state.buffer.len() {
                let item = state.buffer[state.position];
                let next_state = OwnedState {
                    buffer: state.buffer,  // Move buffer forward
                    position: state.position + 1,
                };
                Some((item, next_state))
            } else {
                // State is dropped when returning None
                None
            }
        }
    );
}

State is moved into each iteration; ownership transfers through return value.

Cloning State When Needed

use futures::stream::{Stream, unfold};
 
#[derive(Clone)]
struct CloneableState {
    data: Vec<String>,
    index: usize,
}
 
fn main() {
    let initial = CloneableState {
        data: vec!["a".into(), "b".into(), "c".into()],
        index: 0,
    };
    
    let stream = unfold(initial, |state| async move {
        if state.index < state.data.len() {
            let item = state.data[state.index].clone();
            let next_state = CloneableState {
                data: state.data,  // Clone required if using elsewhere
                index: state.index + 1,
            };
            Some((item, next_state))
        } else {
            None
        }
    });
    
    // For expensive-to-clone state, consider:
    // - Arc<State> for shared ownership
    // - References with lifetimes (complex)
    // - Restructuring to avoid cloning
}

Clone state when it needs to persist beyond the stream.

Using Arc for Shared State in Stream

use futures::stream::{Stream, unfold};
use std::sync::Arc;
 
struct HeavyState {
    large_data: Vec<u8>,
    cursor: usize,
}
 
fn main() {
    let heavy = Arc::new(HeavyState {
        large_data: vec![0u8; 1_000_000],
        cursor: 0,
    });
    
    let stream = unfold(heavy, |state| async move {
        if state.cursor < state.large_data.len() {
            let item = state.large_data[state.cursor];
            let next_state = Arc::new(HeavyState {
                large_data: state.large_data.clone(),  // Arc::clone cheaper than Vec clone
                cursor: state.cursor + 1,
            });
            Some((item, next_state))
        } else {
            None
        }
    });
}

Arc allows sharing expensive state across iterations.

State with Cleanup

use futures::stream::{Stream, unfold};
use std::fs::File;
use std::io::{BufReader, BufRead};
 
fn main() {
    // File handle in state allows cleanup
    let file = File::open("data.txt").expect("file");
    let reader = BufReader::new(file);
    
    let stream = unfold(Some(reader.lines()), |mut lines| async move {
        match &mut lines {
            Some(iterator) => {
                match iterator.next() {
                    Some(Ok(line)) => {
                        Some((line, Some(iterator.clone())))
                    }
                    Some(Err(_)) => {
                        None  // Error terminates
                    }
                    None => {
                        None  // End of file
                    }
                }
            }
            None => None,
        }
    });
    
    // When stream ends, state (including file handle) is dropped
}

State cleanup happens automatically when the stream terminates.

Combining with Other Streams

use futures::stream::{Stream, unfold, StreamExt};
 
fn main() {
    let numbers = unfold(0, |n| async move {
        if n < 5 {
            Some((n, n + 1))
        } else {
            None
        }
    });
    
    // Transform state while maintaining stream
    let doubled: impl Stream<Item = i32> = numbers
        .map(|n| n * 2);
    
    // Or use fold to compute final state
    futures::executor::block_on(async {
        let sum: i32 = unfold(1, |n| async move {
            if n <= 5 {
                Some((n, n + 1))
            } else {
                None
            }
        })
        .fold(0, |acc, n| async move { acc + n })
        .await;
        
        println!("Sum: {}", sum);  // 15
    });
}

State can be accumulated downstream using combinators like fold.

State as Configuration

use futures::stream::{Stream, unfold};
 
struct Config {
    prefix: String,
    max_items: usize,
}
 
fn main() {
    let config = Config {
        prefix: "item-".to_string(),
        max_items: 10,
    };
    
    let stream = unfold((config, 0), |(config, count)| async move {
        if count < config.max_items {
            let item = format!("{}{}", config.prefix, count);
            Some((item, (config, count + 1)))
        } else {
            None
        }
    });
    
    // Config is carried through all iterations unchanged
    // Only count changes
}

Configuration can be part of the state tuple, passed unchanged.

Infinite Streams with State

use futures::stream::{Stream, unfold};
use std::time::{Duration, Instant};
 
#[tokio::main]
async fn main() {
    // Infinite stream: never returns None
    let infinite = unfold(0u32, |n| async move {
        tokio::time::sleep(Duration::from_millis(100)).await;
        Some((n, n + 1))  // Always continue
    });
    
    // Must use timeout or take to avoid running forever
    futures::pin_mut!(infinite);
    
    // Take first 5 items
    let mut count = 0;
    while let Some(n) = futures::StreamExt::next(&mut infinite).await {
        println!("Value: {}", n);
        count += 1;
        if count >= 5 {
            break;
        }
    }
    
    // Or use take combinator
    let finite = unfold(0u32, |n| async move {
        tokio::time::sleep(Duration::from_millis(100)).await;
        Some((n, n + 1))
    }).take(5);
}

State can evolve indefinitely; terminate with external controls.

State Backpressure

use futures::stream::{Stream, unfold};
 
struct BackpressureState {
    pending: Vec<i32>,
    max_pending: usize,
}
 
fn main() {
    let stream = unfold(
        BackpressureState { 
            pending: Vec::new(), 
            max_pending: 3 
        },
        |mut state| async move {
            // Simulate adding items
            if state.pending.len() < state.max_pending {
                state.pending.push(state.pending.len() as i32);
            }
            
            if let Some(item) = state.pending.pop() {
                // Return item, keep state
                Some((item, state))
            } else {
                // No more items
                None
            }
        }
    );
}

State can track capacity for backpressure.

Error Handling in State

use futures::stream::{Stream, unfold};
 
enum StreamState<T> {
    Active(T),
    Error(String),
    Done,
}
 
fn main() {
    let stream = unfold(
        StreamState::Active(0i32),
        |state| async move {
            match state {
                StreamState::Active(n) => {
                    if n < 3 {
                        // Simulate potential error
                        if n == 2 {
                            Some((Err("simulated error"), StreamState::Error("failed at 2".into())))
                        } else {
                            Some((Ok(n), StreamState::Active(n + 1)))
                        }
                    } else {
                        None
                    }
                }
                StreamState::Error(e) => {
                    // Could emit error and continue, or terminate
                    None
                }
                StreamState::Done => None,
            }
        }
    );
}

State can track error conditions across iterations.

Comparison: unfold vs Loop with Channel

use futures::stream::{Stream, unfold};
use tokio::sync::mpsc;
 
#[tokio::main]
async fn main() {
    // Approach 1: unfold (functional, state in parameter)
    let stream1 = unfold(0, |n| async move {
        if n < 5 {
            Some((n, n + 1))
        } else {
            None
        }
    });
    
    // Approach 2: channel (imperative, state in variable)
    let (tx, mut rx) = mpsc::channel(10);
    tokio::spawn(async move {
        let mut n = 0;
        while n < 5 {
            tx.send(n).await.unwrap();
            n += 1;
        }
    });
    
    // unfold advantages:
    // - State is explicit and type-safe
    // - Termination is clear (return None)
    // - No separate task management
    // - Easier to reason about
    
    // Channel advantages:
    // - Can have multiple producers
    // - Can be driven by external events
    // - Supports buffering
}

unfold is simpler for self-contained state machines.

Nested State Management

use futures::stream::{Stream, unfold};
 
struct OuterState {
    outer_count: u32,
    inner_state: Option<InnerState>,
}
 
struct InnerState {
    inner_count: u32,
    max: u32,
}
 
fn main() {
    let stream = unfold(
        OuterState { 
            outer_count: 0, 
            inner_state: Some(InnerState { inner_count: 0, max: 3 }) 
        },
        |state| async move {
            match state.inner_state {
                Some(inner) if inner.inner_count < inner.max => {
                    // Process inner state
                    let next_inner = InnerState { 
                        inner_count: inner.inner_count + 1, 
                        max: inner.max 
                    };
                    let next_outer = OuterState {
                        outer_count: state.outer_count,
                        inner_state: Some(next_inner),
                    };
                    Some((inner.inner_count, next_outer))
                }
                Some(_) => {
                    // Inner complete, move to next outer
                    let next_outer = OuterState {
                        outer_count: state.outer_count + 1,
                        inner_state: Some(InnerState { inner_count: 0, max: 3 }),
                    };
                    Some((state.outer_count, next_outer))
                }
                None => {
                    None
                }
            }
        }
    );
}

Complex state machines can nest state structures.

Real-World Example: Pagination

use futures::stream::{Stream, unfold};
use serde::Deserialize;
 
#[derive(Clone)]
struct PaginationState {
    url: String,
    page: u32,
    per_page: u32,
    has_more: bool,
}
 
#[derive(Deserialize)]
struct ApiResponse {
    items: Vec<String>,
    total_pages: u32,
}
 
async fn fetch_page(url: &str, page: u32, per_page: u32) -> Result<ApiResponse, reqwest::Error> {
    // Simulated async HTTP request
    Ok(ApiResponse { 
        items: vec![format!("item_{}", page)], 
        total_pages: 10 
    })
}
 
#[tokio::main]
async fn main() {
    let initial_state = PaginationState {
        url: "https://api.example.com/items".to_string(),
        page: 1,
        per_page: 100,
        has_more: true,
    };
    
    let stream = unfold(initial_state, |state| async move {
        if !state.has_more {
            return None;
        }
        
        let response = fetch_page(&state.url, state.page, state.per_page).await;
        
        match response {
            Ok(api) => {
                let has_more = api.total_pages > state.page;
                let next_state = PaginationState {
                    page: state.page + 1,
                    has_more,
                    ..state
                };
                Some((api.items, next_state))
            }
            Err(_) => {
                // Could retry or terminate
                None
            }
        }
    });
    
    // Stream produces Vec<String> for each page
    // State tracks pagination position
}

Pagination is a common pattern for unfold with state tracking.

Summary Table

State Approach Use Case Trade-off
Simple value Counter, index Minimal overhead
Tuple Multiple values Destructuring needed
Struct Named fields More boilerplate
Enum State machine Clear transitions
Arc<State> Shared/cheap clone Reference counting overhead
Option<State> Optional phases Handle None cases

Synthesis

State management in futures::stream::unfold follows a functional pattern where state flows through return values rather than being mutated in place:

The core mechanism: Each iteration receives the current state as a parameter and returns Option<(Item, NextState)>. The NextState becomes the input to the next iteration. This threading of state through return values makes the flow explicit and deterministic—every call produces the same result for the same input state.

State evolution: State can change type through the stream's lifetime using enums. Start with Initializing, transition to Running { data }, then Completed. Each variant carries different data appropriate to that phase. The closure becomes a state transition function.

Memory considerations: State is moved into the closure and either returned for the next iteration or dropped when returning None. For expensive state, Arc allows cheap sharing across iterations. Cloning large state per iteration can be avoided by restructuring to keep unchanged data in Arc and only updating small delta values in the state.

Key insight: unfold provides pure functional state iteration without explicit mutation. The state machine is defined declaratively through the return value pattern. This contrasts with imperative approaches using mutable variables and loops—unfold makes state transitions explicit and testable. The cost is that complex state requires careful structuring to avoid excessive cloning or Arc overhead. Choose unfold when you want explicit state threading; choose channels or imperative loops when state needs external mutation or complex lifetime management.