Loading pageā¦
Rust walkthroughs
Loading pageā¦
futures::stream::unfold, how do you manage state between iterations?futures::stream::unfold creates a stream from an initial state and a function that produces the next element along with an updated state. The state is passed from one iteration to the next through the function's return value: the closure receives the current state, returns an Option<(Item, State)>, and the returned state becomes the input for the next call. This creates a functional iteration pattern where state is explicitly threaded through each step rather than stored in a mutable variable. The stream terminates when the closure returns None, discarding the final state.
use futures::stream::{Stream, unfold};
fn main() {
// unfold takes: initial_state, async_function
// Function signature: State -> Option<(Item, NextState)>
let stream = unfold(0, |state| async move {
if state < 5 {
// Continue: return Some((item, next_state))
Some((state, state + 1))
} else {
// Terminate: return None
None
}
});
// State flows: 0 -> 1 -> 2 -> 3 -> 4 -> 5 -> None
// Items emitted: 0, 1, 2, 3, 4
futures::executor::block_on(async {
futures::pin_mut!(stream);
while let Some(value) = futures::StreamExt::next(&mut stream).await {
println!("Value: {}", value);
}
});
}State is passed as the first argument to the closure and returned as part of the next state tuple.
use futures::stream::{Stream, unfold};
fn main() {
// Track running sum as state
let stream = unfold((0i32, 0i32), |(count, sum)| async move {
if count < 10 {
let new_sum = sum + count;
Some((new_sum, (count + 1, new_sum)))
} else {
None
}
});
futures::executor::block_on(async {
futures::pin_mut!(stream);
while let Some(value) = futures::StreamExt::next(&mut stream).await {
println!("Running sum: {}", value);
}
});
}Multiple pieces of state can be combined into a tuple.
use futures::stream::{Stream, unfold};
struct PageState {
page: u32,
total_items: u32,
}
async fn fetch_page(page: u32) -> Vec<String> {
// Simulate async fetch
format!("Page {} data", page).into()
}
fn main() {
let stream = unfold(PageState { page: 0, total_items: 0 }, |state| async move {
let items = fetch_page(state.page).await;
if items.is_empty() {
None
} else {
let new_total = state.total_items + items.len() as u32;
let next_state = PageState {
page: state.page + 1,
total_items: new_total,
};
Some((items, next_state))
}
});
// State includes both current page and accumulated count
// Each async iteration can perform I/O before updating state
}State can be updated after async operations complete.
use futures::stream::{Stream, unfold};
enum PaginationState {
Starting,
Fetching { page: u32, items: Vec<String> },
Done,
}
struct PaginatedData {
all_items: Vec<String>,
}
fn main() {
let stream = unfold(PaginationState::Starting, |state| async move {
match state {
PaginationState::Starting => {
// Transition to fetching first page
let items = vec!["item1".to_string(), "item2".to_string()];
let next_state = PaginationState::Fetching { page: 1, items };
Some((vec!["starting".to_string()], next_state))
}
PaginationState::Fetching { page, items } => {
if page < 3 {
let mut new_items = items.clone();
new_items.push(format!("item_page_{}", page));
let next_state = PaginationState::Fetching {
page: page + 1,
items: new_items
};
Some((items, next_state))
} else {
let next_state = PaginationState::Done;
Some((items, next_state))
}
}
PaginationState::Done => {
None
}
}
});
}Enums encode different states with different data requirements.
use futures::stream::{Stream, unfold};
// State machine pattern
enum MachineState {
Idle,
Processing { data: Vec<i32>, step: usize },
Complete { result: i32 },
}
fn main() {
let stream = unfold(MachineState::Idle, |state| async move {
match state {
MachineState::Idle => {
// Initialize and transition
let next = MachineState::Processing {
data: vec![1, 2, 3, 4, 5],
step: 0
};
Some((0, next))
}
MachineState::Processing { data, step } => {
if step < data.len() {
let value = data[step];
let next = MachineState::Processing {
data: data.clone(),
step: step + 1
};
Some((value, next))
} else {
let sum: i32 = data.iter().sum();
let next = MachineState::Complete { result: sum };
Some((sum, next))
}
}
MachineState::Complete { .. } => {
None
}
}
});
}State machines with explicit transitions work naturally with unfold.
use futures::stream::{Stream, unfold};
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
// External shared state
let counter = Arc::new(Mutex::new(0u32));
// State can include Arc references
let counter_clone = counter.clone();
let stream = unfold(0u32, move |local_state| {
let counter = counter_clone.clone();
async move {
if local_state < 5 {
// Update shared state
let mut guard = counter.lock().await;
*guard += 1;
let shared_value = *guard;
// Combine shared and local state
let item = (local_state, shared_value);
let next_state = local_state + 1;
Some((item, next_state))
} else {
None
}
}
});
futures::pin_mut!(stream);
while let Some((local, shared)) = futures::StreamExt::next(&mut stream).await {
println!("Local: {}, Shared: {}", local, shared);
}
}State can include references to external shared state via Arc.
use futures::stream::{Stream, unfold};
struct OwnedState {
buffer: Vec<u8>,
position: usize,
}
fn main() {
let stream = unfold(
OwnedState {
buffer: vec![1, 2, 3, 4, 5],
position: 0
},
|state| async move {
// State is moved into the closure
// Must either return it or consume it
if state.position < state.buffer.len() {
let item = state.buffer[state.position];
let next_state = OwnedState {
buffer: state.buffer, // Move buffer forward
position: state.position + 1,
};
Some((item, next_state))
} else {
// State is dropped when returning None
None
}
}
);
}State is moved into each iteration; ownership transfers through return value.
use futures::stream::{Stream, unfold};
#[derive(Clone)]
struct CloneableState {
data: Vec<String>,
index: usize,
}
fn main() {
let initial = CloneableState {
data: vec!["a".into(), "b".into(), "c".into()],
index: 0,
};
let stream = unfold(initial, |state| async move {
if state.index < state.data.len() {
let item = state.data[state.index].clone();
let next_state = CloneableState {
data: state.data, // Clone required if using elsewhere
index: state.index + 1,
};
Some((item, next_state))
} else {
None
}
});
// For expensive-to-clone state, consider:
// - Arc<State> for shared ownership
// - References with lifetimes (complex)
// - Restructuring to avoid cloning
}Clone state when it needs to persist beyond the stream.
use futures::stream::{Stream, unfold};
use std::sync::Arc;
struct HeavyState {
large_data: Vec<u8>,
cursor: usize,
}
fn main() {
let heavy = Arc::new(HeavyState {
large_data: vec![0u8; 1_000_000],
cursor: 0,
});
let stream = unfold(heavy, |state| async move {
if state.cursor < state.large_data.len() {
let item = state.large_data[state.cursor];
let next_state = Arc::new(HeavyState {
large_data: state.large_data.clone(), // Arc::clone cheaper than Vec clone
cursor: state.cursor + 1,
});
Some((item, next_state))
} else {
None
}
});
}Arc allows sharing expensive state across iterations.
use futures::stream::{Stream, unfold};
use std::fs::File;
use std::io::{BufReader, BufRead};
fn main() {
// File handle in state allows cleanup
let file = File::open("data.txt").expect("file");
let reader = BufReader::new(file);
let stream = unfold(Some(reader.lines()), |mut lines| async move {
match &mut lines {
Some(iterator) => {
match iterator.next() {
Some(Ok(line)) => {
Some((line, Some(iterator.clone())))
}
Some(Err(_)) => {
None // Error terminates
}
None => {
None // End of file
}
}
}
None => None,
}
});
// When stream ends, state (including file handle) is dropped
}State cleanup happens automatically when the stream terminates.
use futures::stream::{Stream, unfold, StreamExt};
fn main() {
let numbers = unfold(0, |n| async move {
if n < 5 {
Some((n, n + 1))
} else {
None
}
});
// Transform state while maintaining stream
let doubled: impl Stream<Item = i32> = numbers
.map(|n| n * 2);
// Or use fold to compute final state
futures::executor::block_on(async {
let sum: i32 = unfold(1, |n| async move {
if n <= 5 {
Some((n, n + 1))
} else {
None
}
})
.fold(0, |acc, n| async move { acc + n })
.await;
println!("Sum: {}", sum); // 15
});
}State can be accumulated downstream using combinators like fold.
use futures::stream::{Stream, unfold};
struct Config {
prefix: String,
max_items: usize,
}
fn main() {
let config = Config {
prefix: "item-".to_string(),
max_items: 10,
};
let stream = unfold((config, 0), |(config, count)| async move {
if count < config.max_items {
let item = format!("{}{}", config.prefix, count);
Some((item, (config, count + 1)))
} else {
None
}
});
// Config is carried through all iterations unchanged
// Only count changes
}Configuration can be part of the state tuple, passed unchanged.
use futures::stream::{Stream, unfold};
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() {
// Infinite stream: never returns None
let infinite = unfold(0u32, |n| async move {
tokio::time::sleep(Duration::from_millis(100)).await;
Some((n, n + 1)) // Always continue
});
// Must use timeout or take to avoid running forever
futures::pin_mut!(infinite);
// Take first 5 items
let mut count = 0;
while let Some(n) = futures::StreamExt::next(&mut infinite).await {
println!("Value: {}", n);
count += 1;
if count >= 5 {
break;
}
}
// Or use take combinator
let finite = unfold(0u32, |n| async move {
tokio::time::sleep(Duration::from_millis(100)).await;
Some((n, n + 1))
}).take(5);
}State can evolve indefinitely; terminate with external controls.
use futures::stream::{Stream, unfold};
struct BackpressureState {
pending: Vec<i32>,
max_pending: usize,
}
fn main() {
let stream = unfold(
BackpressureState {
pending: Vec::new(),
max_pending: 3
},
|mut state| async move {
// Simulate adding items
if state.pending.len() < state.max_pending {
state.pending.push(state.pending.len() as i32);
}
if let Some(item) = state.pending.pop() {
// Return item, keep state
Some((item, state))
} else {
// No more items
None
}
}
);
}State can track capacity for backpressure.
use futures::stream::{Stream, unfold};
enum StreamState<T> {
Active(T),
Error(String),
Done,
}
fn main() {
let stream = unfold(
StreamState::Active(0i32),
|state| async move {
match state {
StreamState::Active(n) => {
if n < 3 {
// Simulate potential error
if n == 2 {
Some((Err("simulated error"), StreamState::Error("failed at 2".into())))
} else {
Some((Ok(n), StreamState::Active(n + 1)))
}
} else {
None
}
}
StreamState::Error(e) => {
// Could emit error and continue, or terminate
None
}
StreamState::Done => None,
}
}
);
}State can track error conditions across iterations.
use futures::stream::{Stream, unfold};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Approach 1: unfold (functional, state in parameter)
let stream1 = unfold(0, |n| async move {
if n < 5 {
Some((n, n + 1))
} else {
None
}
});
// Approach 2: channel (imperative, state in variable)
let (tx, mut rx) = mpsc::channel(10);
tokio::spawn(async move {
let mut n = 0;
while n < 5 {
tx.send(n).await.unwrap();
n += 1;
}
});
// unfold advantages:
// - State is explicit and type-safe
// - Termination is clear (return None)
// - No separate task management
// - Easier to reason about
// Channel advantages:
// - Can have multiple producers
// - Can be driven by external events
// - Supports buffering
}unfold is simpler for self-contained state machines.
use futures::stream::{Stream, unfold};
struct OuterState {
outer_count: u32,
inner_state: Option<InnerState>,
}
struct InnerState {
inner_count: u32,
max: u32,
}
fn main() {
let stream = unfold(
OuterState {
outer_count: 0,
inner_state: Some(InnerState { inner_count: 0, max: 3 })
},
|state| async move {
match state.inner_state {
Some(inner) if inner.inner_count < inner.max => {
// Process inner state
let next_inner = InnerState {
inner_count: inner.inner_count + 1,
max: inner.max
};
let next_outer = OuterState {
outer_count: state.outer_count,
inner_state: Some(next_inner),
};
Some((inner.inner_count, next_outer))
}
Some(_) => {
// Inner complete, move to next outer
let next_outer = OuterState {
outer_count: state.outer_count + 1,
inner_state: Some(InnerState { inner_count: 0, max: 3 }),
};
Some((state.outer_count, next_outer))
}
None => {
None
}
}
}
);
}Complex state machines can nest state structures.
use futures::stream::{Stream, unfold};
use serde::Deserialize;
#[derive(Clone)]
struct PaginationState {
url: String,
page: u32,
per_page: u32,
has_more: bool,
}
#[derive(Deserialize)]
struct ApiResponse {
items: Vec<String>,
total_pages: u32,
}
async fn fetch_page(url: &str, page: u32, per_page: u32) -> Result<ApiResponse, reqwest::Error> {
// Simulated async HTTP request
Ok(ApiResponse {
items: vec![format!("item_{}", page)],
total_pages: 10
})
}
#[tokio::main]
async fn main() {
let initial_state = PaginationState {
url: "https://api.example.com/items".to_string(),
page: 1,
per_page: 100,
has_more: true,
};
let stream = unfold(initial_state, |state| async move {
if !state.has_more {
return None;
}
let response = fetch_page(&state.url, state.page, state.per_page).await;
match response {
Ok(api) => {
let has_more = api.total_pages > state.page;
let next_state = PaginationState {
page: state.page + 1,
has_more,
..state
};
Some((api.items, next_state))
}
Err(_) => {
// Could retry or terminate
None
}
}
});
// Stream produces Vec<String> for each page
// State tracks pagination position
}Pagination is a common pattern for unfold with state tracking.
| State Approach | Use Case | Trade-off |
|----------------|----------|-----------|
| Simple value | Counter, index | Minimal overhead |
| Tuple | Multiple values | Destructuring needed |
| Struct | Named fields | More boilerplate |
| Enum | State machine | Clear transitions |
| Arc<State> | Shared/cheap clone | Reference counting overhead |
| Option<State> | Optional phases | Handle None cases |
State management in futures::stream::unfold follows a functional pattern where state flows through return values rather than being mutated in place:
The core mechanism: Each iteration receives the current state as a parameter and returns Option<(Item, NextState)>. The NextState becomes the input to the next iteration. This threading of state through return values makes the flow explicit and deterministicāevery call produces the same result for the same input state.
State evolution: State can change type through the stream's lifetime using enums. Start with Initializing, transition to Running { data }, then Completed. Each variant carries different data appropriate to that phase. The closure becomes a state transition function.
Memory considerations: State is moved into the closure and either returned for the next iteration or dropped when returning None. For expensive state, Arc allows cheap sharing across iterations. Cloning large state per iteration can be avoided by restructuring to keep unchanged data in Arc and only updating small delta values in the state.
Key insight: unfold provides pure functional state iteration without explicit mutation. The state machine is defined declaratively through the return value pattern. This contrasts with imperative approaches using mutable variables and loopsāunfold makes state transitions explicit and testable. The cost is that complex state requires careful structuring to avoid excessive cloning or Arc overhead. Choose unfold when you want explicit state threading; choose channels or imperative loops when state needs external mutation or complex lifetime management.