How does futures::stream::StreamExt::fold accumulate values from a stream asynchronously?
StreamExt::fold asynchronously consumes a stream by applying a folding function to each element, maintaining an accumulator state that evolves with each item and producing a final result when the stream ends. Unlike Iterator::fold which blocks synchronously, StreamExt::fold yields control between items, allowing other tasks to run while waiting for the next stream element.
The Method Signature
use futures::stream::{Stream, StreamExt};
// The fold method signature:
fn fold<T, F, Fut>(self, init: T, f: F) -> Fold<Self, T, Fut>
where
F: FnMut(T, Self::Item) -> Fut,
Fut: Future<Output = T>,
Self: Sized;
// Parameters:
// - init: Initial accumulator value
// - f: Async function that takes accumulator and item, returns new accumulator
// - Returns: Fold future that resolves to final accumulatorThe fold function receives the current accumulator and stream item, returning a future that produces the next accumulator value.
Basic Usage Pattern
use futures::stream::{self, StreamExt};
async fn basic_fold() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let sum = stream
.fold(0, |acc, x| async move {
acc + x
})
.await;
println!("Sum: {}", sum); // Sum: 15
}
// Compare with synchronous fold:
fn sync_fold() {
let sum = vec![1, 2, 3, 4, 5]
.into_iter()
.fold(0, |acc, x| acc + x);
println!("Sum: {}", sum); // Sum: 15
}The async version allows awaiting between items while maintaining the same accumulation pattern.
Async Folding with I/O Operations
use futures::stream::{self, StreamExt};
use tokio::fs;
use std::path::PathBuf;
async fn fold_with_io() -> std::io::Result<u64> {
let paths = vec![
"file1.txt".to_string(),
"file2.txt".to_string(),
"file3.txt".to_string(),
];
let stream = stream::iter(paths);
// Accumulate total file sizes
let total_size = stream
.fold(0u64, |acc, path| async move {
// Async I/O in the fold function
match fs::metadata(&path).await {
Ok(metadata) => acc + metadata.len(),
Err(_) => acc, // Skip files that can't be read
}
})
.await;
Ok(total_size)
}The fold function can perform async operations like file I/O between items.
Concurrent vs Sequential Processing
use futures::stream::{self, StreamExt};
// fold processes items SEQUENTIALLY
async fn sequential_example() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let result = stream.fold(0, |acc, x| async move {
println!("Processing {}", x);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
acc + x
}).await;
// Total time: ~500ms (5 items * 100ms each)
// Items processed one after another
}
// For CONCURRENT processing, use different methods:
async fn concurrent_example() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
// Use buffer or fold with join
let futures: Vec<_> = stream
.map(|x| async move {
println!("Processing {}", x);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
x
})
.buffer_unordered(5) // Process up to 5 concurrently
.collect()
.await;
// Or use try_fold for fallible operations with concurrency control
}fold is inherently sequential—each item waits for the previous one to complete.
State Management in Fold
use futures::stream::{self, StreamExt};
#[derive(Debug)]
struct Accumulator {
sum: i32,
count: usize,
max: Option<i32>,
min: Option<i32>,
}
async fn complex_accumulation() {
let stream = stream::iter(vec![5, 3, 8, 1, 9, 4]);
let init = Accumulator {
sum: 0,
count: 0,
max: None,
min: None,
};
let result = stream
.fold(init, |mut acc, x| async move {
acc.sum += x;
acc.count += 1;
acc.max = Some(acc.max.map_or(x, |m| m.max(x)));
acc.min = Some(acc.min.map_or(x, |m| m.min(x)));
acc
})
.await;
println!("Stats: {:?}", result);
// Stats: Accumulator { sum: 30, count: 6, max: Some(9), min: Some(1) }
}The accumulator can hold complex state that evolves across stream items.
Error Handling with try_fold
use futures::stream::{self, StreamExt, TryStreamExt};
// For fallible streams, use try_fold:
async fn fallible_fold() -> Result<i32, String> {
let stream = stream::iter(vec![
Ok(1),
Ok(2),
Err("failure at 3".to_string()),
Ok(4), // Never processed
]);
let result = stream
.try_fold(0, |acc, x| async move {
Ok(acc + x)
})
.await;
match result {
Ok(sum) => println!("Sum: {}", sum),
Err(e) => println!("Error: {}", e), // Error: failure at 3
}
result
}
// try_fold short-circuits on first errortry_fold is the fallible version that stops on the first Err.
Combining with Other Stream Combinators
use futures::stream::{self, StreamExt};
async fn combined_operations() {
let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let result = stream
.filter(|x| async move { x % 2 == 0 }) // Keep evens: 2, 4, 6, 8, 10
.map(|x| async move { x * 2 }) // Double: 4, 8, 12, 16, 20
.fold(Vec::new(), |mut acc, x| async move {
acc.push(x);
acc
})
.await;
println!("Result: {:?}", result); // [4, 8, 12, 16, 20]
}
// Common pattern: filter -> transform -> fold
async fn pipeline_pattern() {
let stream = stream::iter(vec!["a", "bb", "ccc", "dd", "e"]);
let total_length = stream
.filter(|s| async move { s.len() > 1 }) // "bb", "ccc", "dd"
.fold(0, |acc, s| async move { acc + s.len() })
.await;
println!("Total length: {}", total_length); // 7 (2 + 3 + 2)
}fold integrates seamlessly into stream pipelines with other combinators.
Working with Timeouts
use futures::stream::{self, StreamExt};
use tokio::time::{timeout, Duration};
async fn fold_with_timeout() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let result = stream
.fold(0, |acc, x| async move {
// Simulate variable processing time
if x == 3 {
tokio::time::sleep(Duration::from_secs(10)).await;
}
acc + x
});
// Wrap the entire fold with a timeout
match timeout(Duration::from_secs(5), result).await {
Ok(sum) => println!("Sum: {}", sum),
Err(_) => println!("Fold timed out"),
}
}
// Or timeout individual items:
async fn fold_per_item_timeout() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let result = stream
.then(|x| async move {
timeout(Duration::from_millis(100), async {
// Some async work
x * 2
})
.await
.unwrap_or(0) // Default on timeout
})
.fold(0, |acc, x| async move { acc + x })
.await;
}Timeouts can wrap the entire fold or individual item processing.
Comparison with Other Accumulation Methods
use futures::stream::{self, StreamExt};
async fn accumulation_comparison() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
// fold: Single accumulator, sequential
let sum = stream
.fold(0, |acc, x| async move { acc + x })
.await;
println!("fold: {}", sum); // 15
// collect: Gather all items into a collection
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let vec: Vec<i32> = stream.collect().await;
println!("collect: {:?}", vec); // [1, 2, 3, 4, 5]
// reduce: Fold without initial value, returns Option<T>
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let max = stream
.reduce(|a, b| async move { a.max(b) })
.await;
println!("reduce: {:?}", max); // Some(5)
// try_fold: Fallible accumulation
let stream = stream::iter(vec![Ok(1), Ok(2), Err("oops")]);
let result = stream
.try_fold(0, |acc, x| async move { Ok(acc + x) })
.await;
println!("try_fold: {:?}", result); // Err("oops")
}fold, collect, reduce, and try_fold serve different accumulation needs.
Practical Example: Building a Map from Stream
use futures::stream::{self, StreamExt};
use std::collections::HashMap;
async fn build_map() {
let data = vec![
("a", 1),
("b", 2),
("c", 3),
("a", 4), // Duplicate key
];
let stream = stream::iter(data);
let map = stream
.fold(HashMap::new(), |mut acc, (k, v)| async move {
// Aggregate values for duplicate keys
acc.entry(k.to_string())
.and_modify(|e| *e += v)
.or_insert(v);
acc
})
.await;
println!("Map: {:?}", map); // {"a": 5, "b": 2, "c": 3}
}Fold can build complex data structures from stream items.
Async Closures and Captures
use futures::stream::{self, StreamExt};
async fn closures_and_captures() {
let multiplier = 10;
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
// Capture external values
let result = stream
.fold(0, move |acc, x| {
let m = multiplier; // Capture by move
async move {
acc + x * m
}
})
.await;
println!("Result: {}", result); // 150 (0 + 10 + 20 + 30 + 40 + 50)
// Mutable state requires careful handling
let stream = stream::iter(vec![1, 2, 3]);
let mut counter = 0;
// This won't work - can't mutably borrow across await points
// stream.fold(0, |acc, x| async {
// counter += 1; // Error!
// acc + x
// }).await;
// Instead, carry state in the accumulator:
let stream = stream::iter(vec![1, 2, 3]);
let (sum, count) = stream
.fold((0, 0), |(sum, count), x| async move {
(sum + x, count + 1)
})
.await;
println!("Sum: {}, Count: {}", sum, count);
}Captures and mutable state require careful handling in async folds.
Cancellation and Dropping
use futures::stream::{self, StreamExt};
use tokio::time::{timeout, Duration};
async fn cancellation_behavior() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let fold_future = stream.fold(0, |acc, x| async move {
if x == 3 {
// Long delay on item 3
tokio::time::sleep(Duration::from_secs(10)).await;
}
acc + x
});
// If cancelled before completion:
match timeout(Duration::from_millis(100), fold_future).await {
Ok(result) => println!("Completed: {}", result),
Err(_) => {
println!("Cancelled before completion");
// Stream is dropped, no further items processed
}
}
}
// fold is cancellation-safe: dropping the future drops the streamThe fold future can be cancelled at any point; partial progress is lost.
Real-World Example: Processing Kafka Messages
use futures::stream::{StreamExt, TryStreamExt};
// Simulated Kafka message
struct Message {
key: String,
value: Vec<u8>,
partition: i32,
}
async fn process_messages(messages: impl futures::stream::Stream<Item = Message>) -> usize {
messages
.fold(0usize, |count, msg| async move {
// Process each message
process_single_message(&msg).await;
count + 1
})
.await
}
async fn process_single_message(msg: &Message) {
// Async processing: database write, HTTP call, etc.
}
// With error handling:
async fn process_with_retry(
messages: impl futures::stream::Stream<Item = Result<Message, Error>>
) -> Result<usize, Error> {
messages
.try_fold(0usize, |count, msg| async move {
process_single_message(&msg).await;
Ok(count + 1)
})
.await
}Fold is natural for counting or accumulating results from message streams.
Performance Considerations
use futures::stream::{self, StreamExt};
async fn performance_patterns() {
// BAD: Collecting then folding
let stream = stream::iter(0..1000);
let vec: Vec<i32> = stream.collect().await;
let sum: i32 = vec.into_iter().fold(0, |a, b| a + b);
// Requires storing all items in memory
// GOOD: Fold directly on stream
let stream = stream::iter(0..1000);
let sum = stream.fold(0, |acc, x| async move { acc + x }).await;
// No intermediate allocation
// When to use collect:
// - Need multiple passes over data
// - Need to sort or shuffle
// - Need to know all values before processing
// When to use fold:
// - Single pass is sufficient
// - Computing a single result
// - Memory efficiency matters
}Fold avoids intermediate allocations compared to collect-and-fold.
Implementation Details
use futures::stream::{Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
// Simplified implementation of fold:
struct Fold<St, T, Fut> {
stream: Option<St>,
acc: Option<T>,
future: Option<Fut>,
}
impl<St, T, Fut> Future for Fold<St, T, Fut>
where
St: Stream,
Fut: Future<Output = T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>)
-> std::task::Poll<T>
{
// Loop: poll stream for next item, then poll fold future
loop {
// If fold future exists, poll it
if let Some(fut) = self.future.as_mut() {
// When fold function completes, update accumulator
// Continue polling stream for next item
}
// Poll stream for next item
// If item available, create new fold future
// If stream complete, return accumulator
}
}
}The fold future internally manages state between stream items and fold function results.
Key Points Summary
fn key_points() {
// 1. fold accumulates stream items into a single result
// 2. The fold function is async: FnMut(T, Item) -> Future<Output = T>
// 3. Items are processed SEQUENTIALLY, not concurrently
// 4. The accumulator is passed through each async operation
// 5. Returns the final accumulator when stream ends
// 6. use try_fold for fallible streams with error short-circuit
// 7. use collect to gather all items into a collection
// 8. use reduce when you have no initial value
// 9. fold is memory-efficient: no intermediate collection
// 10. Cancellation drops the stream and loses partial progress
// 11. Captures require careful handling across await points
// 12. fold integrates with other stream combinators (filter, map)
// 13. The fold function can perform async I/O between items
// 14. For concurrent processing, use buffer_unordered instead
}Key insight: StreamExt::fold provides the same accumulation pattern as Iterator::fold but with async support, processing items sequentially while yielding control between each item. The fold function receives the current accumulator and stream item, returns a future that produces the next accumulator value, and the final result is returned when the stream ends. Unlike concurrent stream processing methods like buffer_unordered, fold's sequential nature makes it appropriate for operations where each item depends on the accumulated state or when async side effects need to be ordered. For fallible streams, try_fold short-circuits on the first error, stopping the stream and returning the error. The primary advantage over collect-then-fold is memory efficiency: no intermediate allocation is needed when the accumulator can be updated incrementally. The pattern naturally fits counting, summing, building maps, or any operation where a single result aggregates stream contents while performing async work between items.
