Loading pageā¦
Rust walkthroughs
Loading pageā¦
futures::stream::Fold and async fn for stateful stream aggregation?futures::stream::Fold is a stream combinator that maintains an accumulator through each async operation, producing a single final result after the stream completes. An async fn with manual iteration can achieve the same result using local variables for state. The trade-offs center on composability versus control: Fold integrates naturally into stream pipelines with .fold() method chaining and produces a self-contained Fold struct that implements Future, while async fn gives you explicit control over the loop, early termination, error recovery, and the ability to interleave other operations. Fold is well-suited for simple aggregation where the same async function is applied to each element, but it can't easily express patterns like "aggregate until a condition, then switch behavior" or "maintain multiple independent state variables with complex interactions."
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Fold aggregates a stream into a single value
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let sum = stream
.fold(0, |acc, item| async move {
acc + item
})
.await;
println!("Sum: {}", sum); // 15
// Fold with async operation on each item
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
async fn process_item(item: i32) -> i32 {
// Simulate async work
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
item * 2
}
let sum = stream
.fold(0, |acc, item| async move {
let processed = process_item(item).await;
acc + processed
})
.await;
println!("Processed sum: {}", sum); // 30
}Fold takes an initial value and an async function that receives the accumulator and each item.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Same aggregation with async fn
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let sum = aggregate_stream(stream).await;
println!("Sum: {}", sum); // 15
// With async processing
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
async fn process_item(item: i32) -> i32 {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
item * 2
}
let sum = aggregate_with_processing(stream).await;
println!("Processed sum: {}", sum); // 30
}
async fn aggregate_stream(mut stream: impl futures::Stream<Item = i32> + Unpin) -> i32 {
let mut acc = 0;
while let Some(item) = stream.next().await {
acc += item;
}
acc
}
async fn aggregate_with_processing(mut stream: impl futures::Stream<Item = i32> + Unpin) -> i32 {
let mut acc = 0;
while let Some(item) = stream.next().await {
let processed = process_item(item).await;
acc += processed;
}
acc
}async fn uses local variables for state and explicit loop control.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Fold integrates into stream pipelines
let result = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.filter(|x| async { x % 2 == 0 }) // Keep evens
.map(|x| x * 2) // Double them
.take(3) // Take first 3
.fold(0, |acc, x| async move { acc + x }); // Sum
println!("Pipeline result: {}", result.await); // 24 (4+8+12)
// async fn requires manual pipeline steps
let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let result = manual_pipeline(stream).await;
println!("Manual result: {}", result); // 24
}
async fn manual_pipeline(mut stream: impl futures::Stream<Item = i32> + Unpin) -> i32 {
let mut acc = 0;
let mut count = 0;
while let Some(item) = stream.next().await {
// Filter
if item % 2 != 0 {
continue;
}
// Map
let mapped = item * 2;
// Take
count += 1;
if count > 3 {
break;
}
// Fold
acc += mapped;
}
acc
}Fold naturally composes with other stream combinators; async fn must implement each step manually.
use futures::stream::{self, StreamExt};
#[derive(Debug, Default)]
struct Aggregation {
sum: i32,
count: usize,
min: Option<i32>,
max: Option<i32>,
}
#[tokio::main]
async fn main() {
// Fold with multiple state values requires a struct or tuple
let stream = stream::iter(vec![3, 1, 4, 1, 5, 9, 2, 6]);
let result = stream
.fold(Aggregation::default(), |mut acc, item| async move {
acc.sum += item;
acc.count += 1;
acc.min = Some(acc.min.map_or(item, |m| m.min(item)));
acc.max = Some(acc.max.map_or(item, |m| m.max(item)));
acc
})
.await;
println!("Fold result: {:?}", result);
// async fn can use multiple local variables naturally
let stream = stream::iter(vec![3, 1, 4, 1, 5, 9, 2, 6]);
let result = manual_aggregate(stream).await;
println!("async fn result: {:?}", result);
}
async fn manual_aggregate(mut stream: impl futures::Stream<Item = i32> + Unpin) -> Aggregation {
let mut sum = 0;
let mut count = 0;
let mut min: Option<i32> = None;
let mut max: Option<i32> = None;
while let Some(item) = stream.next().await {
sum += item;
count += 1;
min = Some(min.map_or(item, |m| m.min(item)));
max = Some(max.map_or(item, |m| m.max(item)));
}
Aggregation { sum, count, min, max }
}async fn can use multiple local variables naturally; Fold must bundle state into a single accumulator.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Fold always processes the entire stream
let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let result = stream
.fold(0, |acc, item| async move {
// Can't break early here!
// Even if we return early, fold continues
if item > 5 {
return acc; // Just skip, but stream still runs
}
acc + item
})
.await;
println!("Fold result: {}", result); // Processes all 10 items
// async fn can break early
let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let result = early_termination(stream).await;
println!("Early termination result: {}", result);
}
async fn early_termination(mut stream: impl futures::Stream<Item = i32> + Unpin) -> i32 {
let mut sum = 0;
while let Some(item) = stream.next().await {
if item > 5 {
break; // Stop processing
}
sum += item;
}
sum
}async fn can break early; Fold always consumes the entire stream.
use futures::stream::{self, StreamExt};
use std::error::Error;
#[derive(Debug)]
enum ProcessingError {
InvalidItem(i32),
Overflow,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Fold propagates errors through Result type
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let result: Result<i32, ProcessingError> = stream
.fold(Ok(0), |acc, item| async move {
let acc = acc?;
if item > 10 {
return Err(ProcessingError::InvalidItem(item));
}
if acc > 100 {
return Err(ProcessingError::Overflow);
}
Ok(acc + item)
})
.await;
println!("Fold result: {:?}", result);
// async fn with ? operator for early error return
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let result = process_with_errors(stream).await?;
println!("async fn result: {}", result);
Ok(())
}
async fn process_with_errors(
mut stream: impl futures::Stream<Item = i32> + Unpin
) -> Result<i32, ProcessingError> {
let mut sum = 0;
while let Some(item) = stream.next().await {
if item > 10 {
return Err(ProcessingError::InvalidItem(item));
}
if sum > 100 {
return Err(ProcessingError::Overflow);
}
sum += item;
}
Ok(sum)
}Both approaches can use Result for error handling, but async fn allows natural ? propagation.
use futures::stream::{self, StreamExt};
#[derive(Debug, Clone, PartialEq)]
enum State {
Collecting,
ThresholdReached,
Finalizing,
}
#[tokio::main]
async fn main() {
// Complex state machine with async fn
let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let result = state_machine_aggregate(stream).await;
println!("State machine result: {:?}", result);
// Fold can't easily express state transitions
// You'd need to encode everything in the accumulator
let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let (final_state, sum) = stream
.fold((State::Collecting, 0), |(state, acc), item| async move {
match state {
State::Collecting => {
let new_sum = acc + item;
if new_sum >= 15 {
(State::ThresholdReached, new_sum)
} else {
(State::Collecting, new_sum)
}
}
State::ThresholdReached => {
(State::Finalizing, acc + item / 2) // Different behavior
}
State::Finalizing => {
(State::Finalizing, acc) // Stop accumulating
}
}
})
.await;
println!("Fold state machine: {:?}, sum: {}", final_state, sum);
}
async fn state_machine_aggregate(mut stream: impl futures::Stream<Item = i32> + Unpin) -> i32 {
let mut state = State::Collecting;
let mut sum = 0;
while let Some(item) = stream.next().await {
match state {
State::Collecting => {
sum += item;
if sum >= 15 {
state = State::ThresholdReached;
}
}
State::ThresholdReached => {
sum += item / 2; // Different behavior after threshold
state = State::Finalizing;
}
State::Finalizing => {
// Could do cleanup, log, etc.
break; // Can exit early!
}
}
}
sum
}async fn naturally expresses state machines with early exits; Fold must encode all logic in the accumulator.
use futures::stream::{self, StreamExt};
use std::time::Instant;
#[tokio::main]
async fn main() {
// async fn can interleave other async operations
let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let result = aggregate_with_logging(stream).await;
println!("Result with logging: {}", result);
// Fold can't easily interleave operations outside the fold function
// You'd need to include them in the async block
let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let start = Instant::now();
// Can't do this easily with fold:
// - Log every N items
// - Check timeout periodically
// - Update external state
let result = stream
.fold(0, |acc, item| async move {
// Can log inside, but can't access external variables easily
acc + item
})
.await;
println!("Fold elapsed: {:?}", start.elapsed());
}
async fn aggregate_with_logging(mut stream: impl futures::Stream<Item = i32> + Unpin) -> i32 {
let mut sum = 0;
let mut count = 0;
let start = Instant::now();
while let Some(item) = stream.next().await {
sum += item;
count += 1;
// Log progress every 3 items
if count % 3 == 0 {
println!("Progress: {} items, sum = {}", count, sum);
}
// Check for timeout
if start.elapsed().as_secs() > 5 {
println!("Timeout reached, stopping early");
break;
}
// Could also do other async work
// tokio::time::sleep(Duration::from_millis(1)).await;
}
sum
}async fn allows interleaving arbitrary operations; Fold is limited to the fold function.
use futures::stream::{Stream, Fold};
use std::pin::Pin;
// Fold returns a specific Future type
// pub struct Fold<St, Fut, T, F> { ... }
// where
// St: Stream,
// F: FnMut(T, St::Item) -> Fut,
// Fut: Future<Output = T>,
// async fn returns an anonymous Future impl
#[tokio::main]
async fn main() {
// Both can be used as futures
use futures::stream::{self, StreamExt};
// Fold produces a named type
let stream = stream::iter(vec![1, 2, 3]);
let fold_future = stream.fold(0, |acc, x| async move { acc + x });
// Can store in a Box<dyn Future> if needed
let boxed: Pin<Box<dyn std::future::Future<Output = i32>>> = Box::pin(fold_future);
let result = boxed.await;
println!("Boxed fold result: {}", result);
// async fn also produces a future
async fn fold_fn(stream: impl Stream<Item = i32> + Unpin) -> i32 {
use futures::StreamExt;
let mut sum = 0;
let mut stream = stream;
while let Some(x) = stream.next().await {
sum += x;
}
sum
}
let stream = stream::iter(vec![1, 2, 3]);
let result = fold_fn(stream).await;
println!("async fn result: {}", result);
}Fold is a concrete type implementing Future; async fn creates an anonymous future type.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Fold creates a state machine that holds:
// - The accumulator
// - The stream
// - The async closure's state
// async fn creates a state machine that holds:
// - All local variables
// - The stream
// - The current execution state
// For simple aggregation, they're similar
// The difference is in how state is structured
// Fold's state is explicitly passed through each iteration
// async fn's state is implicit in local variables
// Example: Large accumulator
#[derive(Debug)]
struct LargeAccumulator {
data: Vec<String>,
counts: Vec<usize>,
flags: Vec<bool>,
}
impl LargeAccumulator {
fn new() -> Self {
Self {
data: Vec::new(),
counts: vec![0; 1000],
flags: vec![false; 1000],
}
}
}
// Fold: accumulator is moved into each async block
let stream = stream::iter(vec!["a", "b", "c"]);
let result = stream
.fold(LargeAccumulator::new(), |mut acc, item| async move {
acc.data.push(item.to_string());
acc
})
.await;
println!("Fold data: {:?}", result.data);
// async fn: accumulator stays in place
async fn accumulate_large(
mut stream: impl futures::Stream<Item = &'static str> + Unpin
) -> LargeAccumulator {
let mut acc = LargeAccumulator::new();
while let Some(item) = stream.next().await {
acc.data.push(item.to_string());
}
acc
}
let stream = stream::iter(vec!["a", "b", "c"]);
let result = accumulate_large(stream).await;
println!("async fn data: {:?}", result.data);
}Both create state machines; the difference is in how state is structured and moved.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// USE FOLD WHEN:
// 1. Simple aggregation with no early termination
// 2. Want to compose with other stream combinators
// 3. State can be expressed as a single accumulator
// 4. Same operation applied to each element
let sum = stream::iter(1..=10)
.filter(|x| async { x % 2 == 0 })
.fold(0, |acc, x| async move { acc + x })
.await;
println!("Even sum: {}", sum);
// USE async fn WHEN:
// 1. Need early termination (break, return)
// 2. Complex state with multiple variables
// 3. State machine transitions
// 4. Interleaved operations (logging, timeouts)
// 5. Complex error handling with ?
async fn complex_processing(
mut stream: impl futures::Stream<Item = i32> + Unpin
) -> i32 {
let mut sum = 0;
let mut count = 0;
let mut last_even = None;
while let Some(item) = stream.next().await {
count += 1;
// Early termination condition
if count > 100 {
break;
}
// Complex logic
if item % 2 == 0 {
last_even = Some(item);
sum += item;
} else if let Some(last) = last_even {
sum += last; // Use last even
}
}
sum
}
let result = complex_processing(stream::iter(1..=20)).await;
println!("Complex result: {}", result);
}Choose based on complexity: Fold for simple pipelines, async fn for complex logic.
Comparison table:
| Aspect | Fold | async fn |
|--------|------|----------|
| Early termination | No (always consumes full stream) | Yes (break, return) |
| State management | Single accumulator | Multiple local variables |
| Composability | Excellent (stream pipeline) | Manual (implement each step) |
| State transitions | Must encode in accumulator | Natural match expressions |
| Interleaved operations | Limited to fold function | Anywhere in the function |
| Error handling | Result in accumulator | Natural ? propagation |
| Type | Fold<St, Fut, T, F> | Anonymous future |
| Clarity | Declarative | Imperative |
Use Fold when:
.filter(), .map(), etc.Use async fn when:
break or early return?await multiple different futures per iterationKey insight: Fold and async fn represent two paradigms for the same problem. Fold is the functional approach: you declare what happens to each element through composition, and the framework handles iteration. This works beautifully for simple aggregations that fit the "combine each element with accumulator" pattern. async fn is the imperative approach: you control every aspect of the loop, from iteration to state management to termination. This flexibility becomes essential when your aggregation logic doesn't fit the fold patternāwhen you need to break early, transition between phases, or interleave operations that don't belong in the fold function. The performance difference is negligible for most use cases; the real trade-off is expressiveness versus composability. A good rule of thumb: start with Fold when building stream pipelines, reach for async fn when the aggregation logic becomes complex enough that you're fighting the fold pattern rather than benefiting from it.