Loading page…
Rust walkthroughs
Loading page…
futures::stream::FuturesUnordered for managing a collection of futures?futures::stream::FuturesUnordered is a collection type that stores an arbitrary number of futures and polls them concurrently, yielding results as each future completes rather than in insertion order. Unlike a Vec of futures that must be polled sequentially, FuturesUnordered maintains an internal data structure optimized for efficient insertion, removal, and completion tracking, allowing you to process results as soon as any future finishes. This enables responsive, event-driven asynchronous code where you don't wait for all futures to complete before handling the first available result.
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
async fn basic_usage() {
let mut futures = FuturesUnordered::new();
// Add futures to the collection
futures.push(async {
sleep(Duration::from_millis(100)).await;
"first"
});
futures.push(async {
sleep(Duration::from_millis(50)).await;
"second"
});
futures.push(async {
sleep(Duration::from_millis(150)).await;
"third"
});
// Results arrive in completion order, not insertion order
let results: Vec<&str> = futures.collect().await;
// "second" (50ms) completes before "first" (100ms) and "third" (150ms)
// Order: "second", "first", "third"
}FuturesUnordered yields results as futures complete, enabling responsive handling.
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
async fn compare_with_vec() {
// Vec<JoinHandle> requires manual polling or join_all
let handles: Vec<tokio::task::JoinHandle<&str>> = vec![
tokio::spawn(async {
sleep(Duration::from_millis(100)).await;
"first"
}),
tokio::spawn(async {
sleep(Duration::from_millis(50)).await;
"second"
}),
];
// Join in order - waits for "first" even though "second" finished
for handle in handles {
let result = handle.await.unwrap();
println!("Vec result: {}", result);
}
// FuturesUnordered yields as each completes
let mut futures = FuturesUnordered::new();
futures.push(async {
sleep(Duration::from_millis(100)).await;
"first"
});
futures.push(async {
sleep(Duration::from_millis(50)).await;
"second"
});
while let Some(result) = futures.next().await {
println!("FuturesUnordered result: {}", result);
}
}Vec processes in order; FuturesUnordered processes in completion order.
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
async fn dynamic_management() {
let mut futures = FuturesUnordered::new();
// Add futures dynamically
for i in 0..5 {
futures.push(async move {
sleep(Duration::from_millis(i * 100)).await;
i
});
}
// Process results as they arrive
let mut count = 0;
while let Some(result) = futures.next().await {
println!("Completed: {}", result);
count += 1;
// Add more futures while processing
if count < 3 {
futures.push(async {
sleep(Duration::from_millis(50)).await;
100 + count
});
}
}
}Futures can be added while others are still running, enabling dynamic workloads.
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
async fn stream_interface() {
let mut futures = FuturesUnordered::new();
futures.push(async {
sleep(Duration::from_millis(100)).await;
1
});
futures.push(async {
sleep(Duration::from_millis(50)).await;
2
});
futures.push(async {
sleep(Duration::from_millis(150)).await;
3
});
// Use Stream methods
// take(n) - process only first n completions
let first_two: Vec<i32> = futures.by_ref().take(2).collect().await;
println!("First two: {:?}", first_two);
// Remaining futures continue running
let remaining: Vec<i32> = futures.collect().await;
println!("Remaining: {:?}", remaining);
}FuturesUnordered implements Stream, providing all stream combinators.
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::{sleep, timeout};
async fn incremental_processing() {
let mut futures = FuturesUnordered::new();
for i in 0..10 {
futures.push(async move {
sleep(Duration::from_millis(i * 50)).await;
i * i
});
}
// Process each result as it arrives
while let Some(result) = futures.next().await {
println!("Got result: {}", result);
// Can do work between results
// The other futures continue running concurrently
}
}Process results one at a time while other futures continue executing.
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::{sleep, timeout};
async fn with_timeouts() {
let mut futures = FuturesUnordered::new();
futures.push(async {
sleep(Duration::from_millis(100)).await;
Ok("fast")
});
futures.push(async {
sleep(Duration::from_secs(10)).await;
Ok("slow")
});
// Process with timeout for each result
while let Some(result) = futures.next().await {
match timeout(Duration::from_secs(1), async { result }).await {
Ok(value) => println!("Completed: {:?}", value),
Err(_) => println!("Timeout waiting for result"),
}
}
}
async fn overall_timeout() {
let mut futures = FuturesUnordered::new();
for i in 0..5 {
futures.push(async move {
sleep(Duration::from_millis(i * 100)).await;
i
});
}
// Overall timeout for all operations
match timeout(Duration::from_millis(250), futures.collect::<Vec<_>>()).await {
Ok(results) => println!("All completed: {:?}", results),
Err(_) => println!("Not all futures completed in time"),
}
}Use timeouts with FuturesUnordered to handle slow or hung futures.
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
async fn error_handling() {
let mut futures = FuturesUnordered::new();
futures.push(async {
sleep(Duration::from_millis(50)).await;
Ok::<_, &str>("success 1")
});
futures.push(async {
sleep(Duration::from_millis(100)).await;
Err::<&str, _>("error 1")
});
futures.push(async {
sleep(Duration::from_millis(75)).await;
Ok("success 2")
});
// Process results, handling errors individually
while let Some(result) = futures.next().await {
match result {
Ok(value) => println!("Success: {}", value),
Err(e) => println!("Error: {}", e),
}
}
}
async fn collect_until_error() {
let mut futures = FuturesUnordered::new();
futures.push(async { Ok(1) });
futures.push(async { Err("failure") as Result<i32, &str> });
futures.push(async { Ok(3) });
// Collect until first error
let results: Vec<Result<i32, &str>> = futures.collect().await;
// Results contains all outcomes, including errors
for result in results {
println!("{:?}", result);
}
}Handle errors per-future without stopping the entire collection.
use futures::stream::{FuturesUnordered, FuturesOrdered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
async fn ordered_vs_unordered() {
// FuturesOrdered: Yields results in insertion order
let mut ordered = FuturesOrdered::new();
ordered.push(async {
sleep(Duration::from_millis(100)).await;
1
});
ordered.push(async {
sleep(Duration::from_millis(50)).await;
2
});
// Results: 1, 2 (insertion order, not completion order)
let ordered_results: Vec<i32> = ordered.collect().await;
assert_eq!(ordered_results, vec![1, 2]);
// FuturesUnordered: Yields results in completion order
let mut unordered = FuturesUnordered::new();
unordered.push(async {
sleep(Duration::from_millis(100)).await;
1
});
unordered.push(async {
sleep(Duration::from_millis(50)).await;
2
});
// Results: 2, 1 (completion order)
let unordered_results: Vec<i32> = unordered.collect().await;
assert_eq!(unordered_results, vec![2, 1]);
}FuturesOrdered preserves insertion order; FuturesUnordered yields in completion order.
use futures::stream::FuturesUnordered;
fn capacity_management() {
// Create with pre-allocated capacity
let mut futures: FuturesUnordered<_> = FuturesUnordered::with_capacity(100);
// Capacity doesn't limit the number of futures
// It's just pre-allocation for efficiency
for i in 0..200 {
futures.push(async move { i });
}
// shrink_to_fit() reduces memory after removing futures
futures.clear();
futures.shrink_to_fit();
}Use with_capacity to pre-allocate for known sizes, similar to Vec.
use futures::stream::{FuturesUnordered, StreamExt};
async fn length_tracking() {
let mut futures = FuturesUnordered::new();
assert!(futures.is_empty());
assert_eq!(futures.len(), 0);
futures.push(async { 1 });
futures.push(async { 2 });
assert_eq!(futures.len(), 2);
assert!(!futures.is_empty());
// After polling, length decreases
futures.next().await;
assert_eq!(futures.len(), 1);
futures.next().await;
assert!(futures.is_empty());
}Track the number of pending futures with len() and is_empty().
use futures::stream::{FuturesUnordered, StreamExt};
use std::iter;
async fn clear_and_extend() {
let mut futures = FuturesUnordered::new();
// Add multiple futures at once
futures.extend((0..5).map(|i| async move { i * 2 }));
assert_eq!(futures.len(), 5);
// Remove all futures
futures.clear();
assert!(futures.is_empty());
// Extend again
futures.extend(iter::repeat(async { 42 }).take(3));
let results: Vec<i32> = futures.collect().await;
assert_eq!(results.len(), 3);
}Use extend to add multiple futures, clear to remove all pending futures.
use futures::stream::{FuturesUnordered, StreamExt};
use reqwest::Client;
use std::collections::HashMap;
#[derive(Debug)]
struct FetchResult {
url: String,
status: u16,
duration_ms: u64,
}
async fn fetch_urls(urls: Vec<String>) -> Vec<FetchResult> {
let client = Client::new();
let mut futures = FuturesUnordered::new();
for url in urls {
let client = client.clone();
futures.push(async move {
let start = std::time::Instant::now();
let response = client.get(&url).send().await?;
let duration = start.elapsed();
Ok::<_, reqwest::Error>(FetchResult {
url,
status: response.status().as_u16(),
duration_ms: duration.as_millis() as u64,
})
});
}
let mut results = Vec::new();
while let Some(result) = futures.next().await {
match result {
Ok(fetch_result) => {
println!("Completed: {} ({}ms)", fetch_result.url, fetch_result.duration_ms);
results.push(fetch_result);
}
Err(e) => {
eprintln!("Failed: {}", e);
}
}
}
results
}Fetch multiple URLs concurrently, processing results as each request completes.
use futures::stream::{FuturesUnordered, StreamExt};
use std::sync::Arc;
use tokio::sync::Semaphore;
async fn rate_limited_processing(items: Vec<i32>, max_concurrent: usize) -> Vec<i32> {
let semaphore = Arc::new(Semaphore::new(max_concurrent));
let mut futures = FuturesUnordered::new();
for item in items {
let semaphore = semaphore.clone();
futures.push(async move {
let permit = semaphore.acquire().await.unwrap();
let result = process_item(item).await;
drop(permit); // Release permit
result
});
}
futures.collect().await
}
async fn process_item(item: i32) -> i32 {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
item * 2
}
async fn main() {
let items: Vec<i32> = (0..20).collect();
let results = rate_limited_processing(items, 5).await;
println!("Processed {} items", results.len());
}Combine FuturesUnordered with a semaphore for rate-limited concurrent execution.
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::sync::broadcast;
async fn task_pool_with_cancellation() {
let (cancel_tx, cancel_rx) = broadcast::channel::<()>(1);
let mut futures = FuturesUnordered::new();
// Spawn tasks that listen for cancellation
for i in 0..10 {
let mut cancel_rx = cancel_rx.resubscribe();
futures.push(async move {
tokio::select! {
result = process_task(i) => result,
_ = cancel_rx.recv() => {
println!("Task {} cancelled", i);
-1
}
}
});
}
// Process results, cancel remaining on first error
while let Some(result) = futures.next().await {
if result < 0 {
println!("Error detected, cancelling remaining tasks");
let _ = cancel_tx.send(());
break;
}
println!("Task result: {}", result);
}
}
async fn process_task(id: i32) -> i32 {
tokio::time::sleep(std::time::Duration::from_millis(id as u64 * 100)).await;
id * id
}Use broadcast channels with FuturesUnordered for coordinated cancellation.
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::{sleep, Instant};
async fn demonstrating_concurrency() {
let start = Instant::now();
let mut futures = FuturesUnordered::new();
// These futures run concurrently, not necessarily in parallel
// The runtime decides how to schedule them
for i in 0..5 {
futures.push(async move {
sleep(Duration::from_millis(100)).await;
i
});
}
futures.collect::<Vec<_>>().await;
let elapsed = start.elapsed();
println!("Total time: {:?}", elapsed);
// All 5 futures complete in ~100ms (concurrent)
// If sequential, would take ~500ms
}FuturesUnordered enables concurrency; the runtime handles parallelism.
use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
async fn memory_efficient() {
// Process a large number of futures without keeping all in memory
let mut futures = FuturesUnordered::new();
// Start with initial batch
for i in 0..10 {
futures.push(process_item(i));
}
let mut next_item = 10;
let total_items = 100;
while let Some(result) = futures.next().await {
println!("Got result: {}", result);
// Add more work as items complete
if next_item < total_items {
futures.push(process_item(next_item));
next_item += 1;
}
}
}
async fn process_item(id: i32) -> i32 {
sleep(Duration::from_millis(50)).await;
id * 2
}Keep a bounded number of active futures to manage memory consumption.
Key characteristics of FuturesUnordered:
| Property | Behavior |
|----------|----------|
| Order | Completion order (first completed, first yielded) |
| Growth | Dynamic insertion at any time |
| Concurrency | Polls all futures concurrently |
| Memory | Stores futures until completion |
| Interface | Implements Stream |
Comparison with alternatives:
| Collection | Order | Use Case |
|------------|-------|----------|
| FuturesUnordered | Completion order | Process results as they arrive |
| FuturesOrdered | Insertion order | Results must match input order |
| Vec<JoinHandle> | Manual order | Spawned tasks, manual management |
| join_all! | All complete | Wait for all, don't stream |
Common patterns:
Best practices:
len() to track pending futuresKey insight: FuturesUnordered bridges the gap between individual futures and batch operations like join_all. It provides a streaming interface over a collection of futures, yielding each result as soon as it's ready. This is fundamentally different from Vec<Future> or join_all, which either require sequential polling or wait for all futures to complete. The ability to add futures dynamically, combined with the stream interface, makes FuturesUnordered ideal for scenarios where work arrives over time and results should be processed immediately upon completion. Whether building a job queue, handling multiple network requests, or implementing a concurrent task pool, FuturesUnordered provides the right abstraction for responsive, event-driven asynchronous code.