How does futures::stream::StreamExt::buffer_unordered handle concurrent stream item processing?
futures::stream::StreamExt::buffer_unordered transforms a stream into one that processes multiple futures concurrently, maintaining a buffer of in-flight futures up to a specified limit while yielding results in completion order rather than submission order. It enables controlled concurrency where you specify the maximum number of concurrent operations, and results are yielded as they complete regardless of which future was started first.
Basic buffer_unordered Usage
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn basic_example() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
// Process up to 2 futures concurrently:
let results: Vec<_> = stream
.buffer_unordered(2)
.map(|item| async move {
sleep(Duration::from_millis(100)).await;
item * 2
})
.collect()
.await;
println!("{:?}", results); // Results in completion order, not input order
}buffer_unordered takes a concurrency limit and processes that many futures simultaneously.
Understanding Concurrency Limits
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration, Instant};
async fn concurrency_limit() {
let start = Instant::now();
let stream = stream::iter(0..10);
let results: Vec<_> = stream
.map(|i| async move {
sleep(Duration::from_millis(100)).await;
i
})
.buffer_unordered(3) // Max 3 concurrent
.collect()
.await;
let elapsed = start.elapsed();
println!("Time: {:?}", elapsed);
// ~333ms: 10 items / 3 concurrent = ~4 batches * 100ms
}The concurrency limit controls how many futures run simultaneously.
Completion Order vs Submission Order
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn completion_order() {
let stream = stream::iter(vec![
(1, 100), // Task 1: 100ms
(2, 50), // Task 2: 50ms
(3, 10), // Task 3: 10ms
]);
let results: Vec<_> = stream
.map(|(id, delay)| async move {
sleep(Duration::from_millis(delay)).await;
id
})
.buffer_unordered(3)
.collect()
.await;
// Results arrive in completion order:
// 3 (10ms) -> 2 (50ms) -> 1 (100ms)
println!("Completion order: {:?}", results);
// Output: Completion order: [3, 2, 1]
}Results are yielded as futures complete, not in input order.
Buffering vs Concurrent Execution
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn buffering_explanation() {
// Without buffer_unordered (sequential):
let stream = stream::iter(0..3);
let results: Vec<_> = stream
.then(|i| async move {
sleep(Duration::from_millis(100)).await;
i
})
.collect()
.await;
// Total time: 300ms (3 * 100ms)
// With buffer_unordered (concurrent):
let stream = stream::iter(0..3);
let results: Vec<_> = stream
.map(|i| async move {
sleep(Duration::from_millis(100)).await;
i
})
.buffer_unordered(3)
.collect()
.await;
// Total time: ~100ms (all run simultaneously)
}buffer_unordered starts futures as items arrive, not waiting for previous completions.
Concurrency Limit Behavior
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration, Instant};
async fn limit_behavior() {
let start = Instant::now();
let stream = stream::iter(0..6);
// Buffer size 2 means only 2 concurrent futures
let results: Vec<_> = stream
.map(|i| async move {
let start = Instant::now();
sleep(Duration::from_millis(100)).await;
(i, start.elapsed())
})
.buffer_unordered(2)
.collect()
.await;
println!("Total time: {:?}", start.elapsed());
// ~300ms: 6 items / 2 concurrent = 3 batches * 100ms
}When buffer is full, new items wait for slots to become available.
Error Handling
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn error_handling() -> Result<(), Box<dyn std::error::Error>> {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let results: Result<Vec<_>, _> = stream
.map(|i| async move {
if i == 3 {
Err("Item 3 failed")
} else {
Ok(i * 2)
}
})
.buffer_unordered(2)
.try_collect()
.await;
match results {
Ok(items) => println!("Success: {:?}", items),
Err(e) => println!("Error: {}", e),
}
Ok(())
}Use try_collect to stop on first error, or handle errors per-item.
Per-Item Error Handling
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn per_item_errors() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let results: Vec<_> = stream
.map(|i| async move {
if i == 3 {
Err(format!("Item {} failed", i))
} else {
Ok(i * 2)
}
})
.buffer_unordered(2)
.collect()
.await;
for result in results {
match result {
Ok(value) => println!("Success: {}", value),
Err(e) => println!("Error: {}", e),
}
}
}Collect Result values and handle each individually instead of failing fast.
Backpressure
use futures::stream::{self, StreamExt};
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
async fn backpressure() {
// Simulate a slow consumer:
let (tx, rx) = mpsc::channel(3); // Small buffer
let producer = async {
let stream = stream::iter(0..10);
stream
.map(|i| async move {
println!("Processing item {}", i);
sleep(Duration::from_millis(50)).await;
i
})
.buffer_unordered(2) // Max 2 concurrent
.for_each(|i| async {
println!("Sending item {}", i);
tx.send(i).await.unwrap();
})
.await;
};
let consumer = async {
while let Some(i) = rx.recv().await {
println!("Consuming item {}", i);
sleep(Duration::from_millis(100)).await;
}
};
tokio::join!(producer, consumer);
}buffer_unordered naturally provides backpressure based on concurrency limit.
Ordered Alternative: buffer_ordered
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn ordered_vs_unordered() {
// buffer_unordered: yields in completion order
let stream = stream::iter(vec![(1, 100), (2, 10), (3, 50)]);
let unordered: Vec<_> = stream
.map(|(id, delay)| async move {
sleep(Duration::from_millis(delay)).await;
id
})
.buffer_unordered(3)
.collect()
.await;
println!("Unordered: {:?}", unordered); // [2, 3, 1]
// buffer_ordered: yields in submission order
let stream = stream::iter(vec![(1, 100), (2, 10), (3, 50)]);
let ordered: Vec<_> = stream
.map(|(id, delay)| async move {
sleep(Duration::from_millis(delay)).await;
id
})
.buffered(3) // Note: buffered, not buffer_unordered
.collect()
.await;
println!("Ordered: {:?}", ordered); // [1, 2, 3]
}Use buffered when you need results in submission order.
Dynamic Concurrency
use futures::stream::{self, StreamExt};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{sleep, Duration};
async fn dynamic_concurrency() {
let active_count = Arc::new(AtomicUsize::new(0));
let max_concurrent = Arc::new(AtomicUsize::new(0));
let stream = stream::iter(0..20);
let results: Vec<_> = stream
.map(|i| {
let active = active_count.clone();
let max = max_concurrent.clone();
async move {
let current = active.fetch_add(1, Ordering::SeqCst) + 1;
max.fetch_max(current, Ordering::SeqCst);
sleep(Duration::from_millis(50)).await;
active.fetch_sub(1, Ordering::SeqCst);
i
}
})
.buffer_unordered(5)
.collect()
.await;
println!("Max concurrent: {}", max_concurrent.load(Ordering::SeqCst));
// Should be ~5 (the buffer size)
}Concurrency limit is respected even with varying workloads.
HTTP Requests Example
use futures::stream::{self, StreamExt};
use reqwest::Client;
use std::time::Duration;
async fn http_requests() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let urls = vec![
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
];
// Fetch URLs with max 2 concurrent connections:
let responses: Vec<_> = stream::iter(urls)
.map(|url| {
let client = client.clone();
async move {
client.get(url)
.timeout(Duration::from_secs(5))
.send()
.await
}
})
.buffer_unordered(2)
.collect()
.await;
for (i, response) in responses.into_iter().enumerate() {
match response {
Ok(resp) => println!("Request {}: {}", i, resp.status()),
Err(e) => println!("Request {} failed: {}", i, e),
}
}
Ok(())
}Control concurrent HTTP requests to avoid overwhelming servers.
Database Queries Example
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
struct Database;
impl Database {
async fn query(&self, id: u32) -> String {
sleep(Duration::from_millis(50)).await;
format!("Result for {}", id)
}
}
async fn database_queries() {
let db = Database;
let user_ids = 0..100;
// Process queries with bounded concurrency:
let results: Vec<_> = stream::iter(user_ids)
.map(|id| {
let db = &db;
async move {
db.query(id).await
}
})
.buffer_unordered(10) // Max 10 concurrent DB queries
.collect()
.await;
println!("Got {} results", results.len());
}Limit concurrent database connections to stay within pool limits.
File Processing Example
use futures::stream::{self, StreamExt};
use tokio::fs;
use tokio::io::AsyncReadExt;
async fn process_files() -> Result<(), Box<dyn std::error::Error>> {
let files = vec!["file1.txt", "file2.txt", "file3.txt"];
let contents: Vec<_> = stream::iter(files)
.map(|path| async move {
let mut file = fs::File::open(path).await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
Ok::<_, std::io::Error>((path, contents))
})
.buffer_unordered(3)
.try_collect()
.await?;
for (path, content) in contents {
println!("{}: {} bytes", path, content.len());
}
Ok(())
}Process multiple files concurrently with bounded parallelism.
Combining with then
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn combined_operations() {
let stream = stream::iter(0..5);
// then: sequential, map + buffer_unordered: concurrent
let results: Vec<_> = stream
.map(|i| async move {
// First operation
sleep(Duration::from_millis(10)).await;
let doubled = i * 2;
doubled
})
.buffer_unordered(5)
.then(|doubled| async move {
// Second operation (sequential after buffer)
sleep(Duration::from_millis(20)).await;
doubled + 1
})
.collect()
.await;
println!("{:?}", results);
}Combine sequential and concurrent stages in a pipeline.
Cancellation
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration, timeout};
async fn cancellation() {
let stream = stream::iter(0..);
// Take first 5 items, then drop stream
let results: Vec<_> = stream
.map(|i| async move {
sleep(Duration::from_millis(50)).await;
i
})
.buffer_unordered(10)
.take(5)
.collect()
.await;
println!("Results: {:?}", results);
// Remaining in-flight futures are cancelled
}Dropping the stream cancels in-flight futures.
Timeout Per Item
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration, timeout};
async fn per_item_timeout() {
let stream = stream::iter(0..5);
let results: Vec<_> = stream
.map(|i| async move {
let sleep_time = if i == 2 { 200 } else { 50 };
sleep(Duration::from_millis(sleep_time)).await;
i
})
.buffer_unordered(3)
.map(|result| async {
match timeout(Duration::from_millis(100), async { result }).await {
Ok(value) => Ok(value),
Err(_) => Err("timeout"),
}
})
.collect()
.await;
println!("{:?}", results);
}Add timeouts after concurrent execution for per-item cancellation.
Collecting Into HashMap
use futures::stream::{self, StreamExt};
use std::collections::HashMap;
async fn collect_to_map() {
let stream = stream::iter(vec![
("a", 1),
("b", 2),
("c", 3),
("d", 4),
]);
let map: HashMap<_, _> = stream
.map(|(key, value)| async move {
// Simulate async processing
(key.to_uppercase(), value * 2)
})
.buffer_unordered(2)
.collect()
.await;
println!("{:?}", map);
}collect can aggregate into any FromIterator type.
Real-World: Rate-Limited API Client
use futures::stream::{self, StreamExt};
use tokio::time::{Duration, Instant};
struct RateLimitedClient {
requests_per_second: u32,
}
impl RateLimitedClient {
async fn fetch(&self, id: u32) -> String {
format!("Data for {}", id)
}
async fn fetch_batch(&self, ids: Vec<u32>) -> Vec<(u32, Result<String, String>)> {
// Max concurrent requests respecting rate limits
let results: Vec<_> = stream::iter(&ids)
.map(|&id| async move {
let result = self.fetch(id).await;
(id, Ok(result))
})
.buffer_unordered(self.requests_per_second as usize)
.collect()
.await;
results
}
}Combine rate limiting with concurrency control.
Memory Considerations
use futures::stream::{self, StreamExt};
async fn memory_usage() {
// Large buffer: more concurrent, higher memory
let results: Vec<_> = stream::iter(0..1000)
.map(|i| async move {
// Each future uses memory
vec![i; 1000]
})
.buffer_unordered(100) // 100 concurrent allocations
.collect()
.await;
// Smaller buffer: less memory, longer time
let results: Vec<_> = stream::iter(0..1000)
.map(|i| async move {
vec![i; 1000]
})
.buffer_unordered(10) // Only 10 concurrent allocations
.collect()
.await;
}Choose buffer size balancing throughput and memory.
Comparison: buffer_unordered vs join!
use futures::stream::{self, StreamExt};
use futures::future::join_all;
use tokio::time::{sleep, Duration};
async fn comparison() {
// buffer_unordered: processes items as they arrive from stream
let stream = stream::iter(0..5);
let results: Vec<_> = stream
.map(|i| async move {
sleep(Duration::from_millis(50)).await;
i
})
.buffer_unordered(3)
.collect()
.await;
// join_all: processes all known futures at once
let futures: Vec<_> = (0..5)
.map(|i| async move {
sleep(Duration::from_millis(50)).await;
i
})
.collect();
let results: Vec<_> = join_all(futures).await;
// Key difference: buffer_unordered works with streams,
// join_all works with collections of futures
}buffer_unordered enables incremental processing of infinite streams.
Key Points
fn key_points() {
// 1. buffer_unordered limits concurrent futures
// 2. Results arrive in completion order, not submission order
// 3. New futures start when buffer has capacity
// 4. Use buffered() for submission order
// 5. Concurrency limit provides natural backpressure
// 6. try_collect stops on first error
// 7. collect<Result> continues on errors
// 8. Cancellation drops in-flight futures
// 9. Works with infinite streams
// 10. Memory usage scales with buffer size
// 11. Combine with take() for early termination
// 12. Use for HTTP, database, file I/O
// 13. then() after buffer_unordered for sequential stages
// 14. Map creates futures, buffer_unordered runs them
// 15. Buffer size is max concurrent, not queue size
}Key insight: buffer_unordered provides controlled concurrency for stream processing by maintaining a bounded pool of in-flight futures. The key distinction from sequential processing is that new futures start as soon as there's capacity in the buffer, not after previous futures completeāthis means with a buffer size of 10, you could have up to 10 futures running simultaneously, each at different stages of execution. The completion-order delivery is essential for responsiveness: if you have 3 concurrent requests and the first takes 10 seconds while the others take 1 second, buffer_unordered yields the fast results immediately rather than waiting for the slow one. The concurrency limit serves dual purposes: it prevents resource exhaustion (too many concurrent connections, memory allocations, or database queries) and provides implicit backpressure to the stream source. When you need ordered results with concurrent execution, use buffered() insteadāit maintains the same concurrency benefits but reorders output to match input. The pattern of stream.map(...).buffer_unordered(n).collect() is idiomatic for turning a synchronous iterator into concurrent async operations while maintaining control over resource usage.
