Loading pageâŚ
Rust walkthroughs
Loading pageâŚ
futures::stream::BufferUnordered control concurrency compared to buffer?futures::stream::BufferUnordered buffers futures and runs them concurrently up to a specified limit, yielding results in completion order rather than the order they were submitted. The buffer combinator, by contrast, maintains submission orderâresults are yielded in the same order their corresponding futures appeared in the source stream, even if later futures complete earlier. Both limit concurrency to the specified buffer size, but BufferUnordered allows faster results to propagate immediately without waiting for earlier futures to complete, while buffer enforces strict ordering at the cost of potential head-of-line blocking where a slow future delays all subsequent results.
use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() {
// Create a stream of futures with varying completion times
let futures = vec![
async {
tokio::time::sleep(Duration::from_millis(300)).await;
"first"
},
async {
tokio::time::sleep(Duration::from_millis(100)).await;
"second"
},
async {
tokio::time::sleep(Duration::from_millis(200)).await;
"third"
},
];
let start = Instant::now();
let results: Vec<_> = stream::iter(futures)
.buffer(2) // Process at most 2 futures concurrently
.collect()
.await;
println!("Results: {:?}", results);
println!("Total time: {:?}", start.elapsed());
}buffer limits concurrency to 2 and yields results in submission order.
use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() {
let futures = vec![
async {
tokio::time::sleep(Duration::from_millis(300)).await;
"first"
},
async {
tokio::time::sleep(Duration::from_millis(100)).await;
"second"
},
async {
tokio::time::sleep(Duration::from_millis(200)).await;
"third"
},
];
let start = Instant::now();
let results: Vec<_> = stream::iter(futures)
.buffer_unordered(2) // Process at most 2 futures concurrently
.collect()
.await;
println!("Results: {:?}", results);
println!("Total time: {:?}", start.elapsed());
}buffer_unordered yields results as they complete, not in submission order.
use futures::stream::{self, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
let futures = vec![
async { "A" }, // Completes immediately
async {
tokio::time::sleep(Duration::from_millis(100)).await;
"B"
},
async { "C" }, // Completes immediately
async {
tokio::time::sleep(Duration::from_millis(50)).await;
"D"
},
];
// buffer: maintains order A, B, C, D
let buffer_results: Vec<_> = stream::iter(futures.clone())
.buffer(2)
.collect()
.await;
println!("buffer order: {:?}", buffer_results);
// Output: ["A", "B", "C", "D"]
// buffer_unordered: yields as they complete
let unordered_results: Vec<_> = stream::iter(futures)
.buffer_unordered(2)
.collect()
.await;
println!("buffer_unordered order: {:?}", unordered_results);
// Output: ["A", "C", "D", "B"] (or similar - A and C complete first)
}buffer preserves order; buffer_unordered yields on completion.
use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() {
let futures = vec![
async {
tokio::time::sleep(Duration::from_secs(1)).await;
"slow"
},
async { "fast1" },
async { "fast2" },
async { "fast3" },
];
// With buffer: fast results wait for slow result
let start = Instant::now();
let results: Vec<_> = stream::iter(futures.clone())
.buffer(10)
.collect()
.await;
println!("buffer: {:?} in {:?}", results, start.elapsed());
// Waits ~1 second, then yields all in order: ["slow", "fast1", "fast2", "fast3"]
// With buffer_unordered: fast results yield immediately
let start = Instant::now();
let results: Vec<_> = stream::iter(futures)
.buffer_unordered(10)
.collect()
.await;
println!("buffer_unordered: {:?} in {:?}", results, start.elapsed());
// Yields fast results first, slow last: ["fast1", "fast2", "fast3", "slow"]
}buffer suffers head-of-line blocking; buffer_unordered doesn't.
use futures::stream::{self, StreamExt};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[tokio::main]
async fn main() {
let concurrent_count = Arc::new(AtomicUsize::new(0));
let max_seen = Arc::new(AtomicUsize::new(0));
let make_future = |id: usize, concurrent: Arc<AtomicUsize>, max: Arc<AtomicUsize>| {
async move {
let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
max.fetch_max(current, Ordering::SeqCst);
println!("Starting {}", id);
tokio::time::sleep(Duration::from_millis(50)).await;
concurrent.fetch_sub(1, Ordering::SeqCst);
println!("Finishing {}", id);
id
}
};
let cc = concurrent_count.clone();
let mm = max_seen.clone();
// buffer_unordered limits concurrency to 3
let results: Vec<_> = stream::iter(0..10)
.map(move |id| make_future(id, cc.clone(), mm.clone()))
.buffer_unordered(3)
.collect()
.await;
println!("Max concurrent: {}", max_seen.load(Ordering::SeqCst));
// Max concurrent: 3 (never exceeds the limit)
println!("Results: {:?}", results);
}Both buffer and buffer_unordered enforce the concurrency limit.
use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() {
// When buffer is full, polling the stream blocks
let futures: Vec<_> = (0..10)
.map(|i| async move {
tokio::time::sleep(Duration::from_millis(100)).await;
i
})
.collect();
let start = Instant::now();
// Buffer size of 2 means:
// - At most 2 futures run concurrently
// - The stream is polled for new futures only when a slot opens
let results: Vec<_> = stream::iter(futures)
.buffer_unordered(2)
.collect()
.await;
println!("Results: {:?}", results);
println!("Time: {:?}", start.elapsed());
// 10 futures / 2 concurrent * 100ms = ~500ms total
}Buffer size controls both concurrency and backpressure.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// buffer_unordered(0) means no buffering
// Futures are polled one at a time (sequential)
let results: Vec<_> = stream::iter(0..5)
.map(|i| async move {
println!("Processing {}", i);
i * 2
})
.buffer_unordered(0)
.collect()
.await;
println!("Results: {:?}", results);
// Output: 0, 2, 4, 6, 8 (processed sequentially)
}Buffer size of 0 means sequential execution.
use futures::stream::{self, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
// From iterator - all futures known upfront
let results: Vec<_> = stream::iter(vec![
async { "a" },
async { "b" },
async { "c" },
])
.buffer_unordered(2)
.collect()
.await;
// From stream - futures produced on demand
let stream_results: Vec<_> = stream::unfold(0, |count| async move {
if count < 3 {
let future = async move {
tokio::time::sleep(Duration::from_millis(100)).await;
count
};
Some((future, count + 1))
} else {
None
}
})
.buffer_unordered(2)
.collect()
.await;
println!("Iterator results: {:?}", results);
println!("Stream results: {:?}", stream_results);
}Both work with iterators and on-demand streams.
use futures::stream::{self, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
// With Result types, both propagate errors
let futures = vec![
async { Ok::<_, &str>(1) },
async { Err("error") },
async { Ok(3) },
];
// buffer: yields results until error, then stops
// Order is preserved, error at position 1
// buffer_unordered: error may come at any point depending on completion
// Other futures continue running after error
let results: Vec<_> = stream::iter(futures)
.buffer_unordered(2)
.take_while(|r| futures::future::ready(r.is_ok()))
.collect()
.await;
println!("Results until error: {:?}", results);
}Error handling depends on when errors occur in completion order.
use futures::stream::{self, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
// When you need concurrency but ordered results:
// Option 1: Use buffer() for ordered results
let ordered: Vec<_> = stream::iter(vec![
async { tokio::time::sleep(Duration::from_millis(300)).await; "first" },
async { "second" },
async { "third" },
])
.buffer(3) // All concurrent, but ordered results
.collect()
.await;
println!("Ordered: {:?}", ordered);
// Option 2: Use buffer_unordered and sort afterward
let mut unordered: Vec<_> = stream::iter(vec![
async { tokio::time::sleep(Duration::from_millis(300)).await; ("first", 0) },
async { ("second", 1) },
async { ("third", 2) },
])
.buffer_unordered(3)
.collect()
.await;
unordered.sort_by_key(|(_, i)| *i);
println!("Sorted: {:?}", unordered);
}Choose based on whether you need order preserved.
use futures::stream::{self, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let urls = vec![
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/get",
];
// buffer_unordered: process responses as they arrive
let start = std::time::Instant::now();
let client = reqwest::Client::new();
let results: Vec<_> = stream::iter(urls.clone())
.map(|url| {
let client = &client;
async move {
let resp = client.get(url)
.timeout(Duration::from_secs(5))
.send()
.await?;
let text = resp.text().await?;
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(text.len())
}
})
.buffer_unordered(2) // Max 2 concurrent requests
.collect()
.await;
println!("Got {} responses in {:?}", results.len(), start.elapsed());
Ok(())
}buffer_unordered is ideal for I/O where order doesn't matter.
use futures::stream::{self, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
// When order matters (e.g., processing numbered files)
let files = vec![
(1, "file1.txt"),
(2, "file2.txt"),
(3, "file3.txt"),
];
// buffer: maintains file order
let results: Vec<_> = stream::iter(files)
.map(|(id, _name)| async move {
tokio::time::sleep(Duration::from_millis(100)).await;
id
})
.buffer(2)
.collect()
.await;
// Results are always [1, 2, 3]
println!("Processed in order: {:?}", results);
}Use buffer when sequence order matters for correctness.
use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() {
let futures = vec![
async {
tokio::time::sleep(Duration::from_millis(300)).await;
"A (slow)"
},
async {
tokio::time::sleep(Duration::from_millis(100)).await;
"B (fast)"
},
async {
tokio::time::sleep(Duration::from_millis(50)).await;
"C (fastest)"
},
];
// buffer: order preserved, head-of-line blocking
let start = Instant::now();
let buffer_results: Vec<_> = stream::iter(futures.clone())
.buffer(3)
.collect()
.await;
println!("buffer:");
println!(" Results: {:?}", buffer_results);
println!(" Time: {:?}", start.elapsed());
println!(" Order: A, B, C (submission order)");
// buffer_unordered: completion order, no blocking
let start = Instant::now();
let unordered_results: Vec<_> = stream::iter(futures)
.buffer_unordered(3)
.collect()
.await;
println!("\nbuffer_unordered:");
println!(" Results: {:?}", unordered_results);
println!(" Time: {:?}", start.elapsed());
println!(" Order: C, B, A (completion order)");
}Same concurrency limit, different ordering guarantees.
| Aspect | buffer | buffer_unordered |
|--------|----------|-------------------|
| Concurrency limit | Yes | Yes |
| Result order | Submission order | Completion order |
| Head-of-line blocking | Yes | No |
| When to use | Order matters | Order doesn't matter |
| Latency for fast results | May wait for slow predecessors | Yields immediately |
| Total completion time | Same (for full parallelism) | Same (for full parallelism) |
Both combinators provide the same concurrency control but differ fundamentally in how they yield results:
buffer preserves submission order: The stream produces results in the same order the underlying futures were submitted. If future A is submitted before future B, then A's result will be yielded before B's result, even if B completes first. This causes head-of-line blocking: fast results wait for slower predecessors. Use buffer when the sequence mattersâfor example, processing numbered files, ordered messages, or when downstream consumers expect a specific order.
buffer_unordered yields on completion: Results are produced as soon as any buffered future completes, regardless of position in the input sequence. Fast results propagate immediately without waiting for earlier but slower futures. Use buffer_unordered when order doesn't matterâfor example, parallel HTTP requests, concurrent computations whose results are independent, or when you can sort/associate results afterward using identifiers.
Key insight: Both enforce the same maximum concurrent futures. The difference is entirely about ordering semantics, not parallelism. With buffer, the stream's output order matches its input order. With buffer_unordered, the output order matches completion order. If all futures complete at similar speeds or with fixed timing, you might not notice the difference. The divergence appears when completion times vary unpredictablyâexactly the case with I/O-bound workloads like network requests or file operations.
Performance consideration: For the same buffer size, both methods have identical total execution time when all futures are submitted upfront. The difference is when results become available: buffer_unordered yields results incrementally as they complete, while buffer may delay results until earlier futures finish. For streaming pipelines where downstream processing can start before all results are ready, buffer_unordered provides better overall throughput.