What is the purpose of futures::stream::FuturesUnordered for managing a collection of futures?

futures::stream::FuturesUnordered is a collection type that stores an arbitrary number of futures and polls them concurrently, yielding results as each future completes rather than in insertion order. Unlike a Vec of futures that must be polled sequentially, FuturesUnordered maintains an internal data structure optimized for efficient insertion, removal, and completion tracking, allowing you to process results as soon as any future finishes. This enables responsive, event-driven asynchronous code where you don't wait for all futures to complete before handling the first available result.

Basic Usage

use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
 
async fn basic_usage() {
    let mut futures = FuturesUnordered::new();
    
    // Add futures to the collection
    futures.push(async {
        sleep(Duration::from_millis(100)).await;
        "first"
    });
    
    futures.push(async {
        sleep(Duration::from_millis(50)).await;
        "second"
    });
    
    futures.push(async {
        sleep(Duration::from_millis(150)).await;
        "third"
    });
    
    // Results arrive in completion order, not insertion order
    let results: Vec<&str> = futures.collect().await;
    
    // "second" (50ms) completes before "first" (100ms) and "third" (150ms)
    // Order: "second", "first", "third"
}

FuturesUnordered yields results as futures complete, enabling responsive handling.

Comparison with Vec

use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
 
async fn compare_with_vec() {
    // Vec<JoinHandle> requires manual polling or join_all
    let handles: Vec<tokio::task::JoinHandle<&str>> = vec![
        tokio::spawn(async {
            sleep(Duration::from_millis(100)).await;
            "first"
        }),
        tokio::spawn(async {
            sleep(Duration::from_millis(50)).await;
            "second"
        }),
    ];
    
    // Join in order - waits for "first" even though "second" finished
    for handle in handles {
        let result = handle.await.unwrap();
        println!("Vec result: {}", result);
    }
    
    // FuturesUnordered yields as each completes
    let mut futures = FuturesUnordered::new();
    futures.push(async {
        sleep(Duration::from_millis(100)).await;
        "first"
    });
    futures.push(async {
        sleep(Duration::from_millis(50)).await;
        "second"
    });
    
    while let Some(result) = futures.next().await {
        println!("FuturesUnordered result: {}", result);
    }
}

Vec processes in order; FuturesUnordered processes in completion order.

Dynamic Addition and Removal

use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
 
async fn dynamic_management() {
    let mut futures = FuturesUnordered::new();
    
    // Add futures dynamically
    for i in 0..5 {
        futures.push(async move {
            sleep(Duration::from_millis(i * 100)).await;
            i
        });
    }
    
    // Process results as they arrive
    let mut count = 0;
    while let Some(result) = futures.next().await {
        println!("Completed: {}", result);
        count += 1;
        
        // Add more futures while processing
        if count < 3 {
            futures.push(async {
                sleep(Duration::from_millis(50)).await;
                100 + count
            });
        }
    }
}

Futures can be added while others are still running, enabling dynamic workloads.

Stream Interface

use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
 
async fn stream_interface() {
    let mut futures = FuturesUnordered::new();
    
    futures.push(async {
        sleep(Duration::from_millis(100)).await;
        1
    });
    futures.push(async {
        sleep(Duration::from_millis(50)).await;
        2
    });
    futures.push(async {
        sleep(Duration::from_millis(150)).await;
        3
    });
    
    // Use Stream methods
    // take(n) - process only first n completions
    let first_two: Vec<i32> = futures.by_ref().take(2).collect().await;
    println!("First two: {:?}", first_two);
    
    // Remaining futures continue running
    let remaining: Vec<i32> = futures.collect().await;
    println!("Remaining: {:?}", remaining);
}

FuturesUnordered implements Stream, providing all stream combinators.

Processing Results Incrementally

use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::{sleep, timeout};
 
async fn incremental_processing() {
    let mut futures = FuturesUnordered::new();
    
    for i in 0..10 {
        futures.push(async move {
            sleep(Duration::from_millis(i * 50)).await;
            i * i
        });
    }
    
    // Process each result as it arrives
    while let Some(result) = futures.next().await {
        println!("Got result: {}", result);
        
        // Can do work between results
        // The other futures continue running concurrently
    }
}

Process results one at a time while other futures continue executing.

Timeout Handling

use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::{sleep, timeout};
 
async fn with_timeouts() {
    let mut futures = FuturesUnordered::new();
    
    futures.push(async {
        sleep(Duration::from_millis(100)).await;
        Ok("fast")
    });
    
    futures.push(async {
        sleep(Duration::from_secs(10)).await;
        Ok("slow")
    });
    
    // Process with timeout for each result
    while let Some(result) = futures.next().await {
        match timeout(Duration::from_secs(1), async { result }).await {
            Ok(value) => println!("Completed: {:?}", value),
            Err(_) => println!("Timeout waiting for result"),
        }
    }
}
 
async fn overall_timeout() {
    let mut futures = FuturesUnordered::new();
    
    for i in 0..5 {
        futures.push(async move {
            sleep(Duration::from_millis(i * 100)).await;
            i
        });
    }
    
    // Overall timeout for all operations
    match timeout(Duration::from_millis(250), futures.collect::<Vec<_>>()).await {
        Ok(results) => println!("All completed: {:?}", results),
        Err(_) => println!("Not all futures completed in time"),
    }
}

Use timeouts with FuturesUnordered to handle slow or hung futures.

Error Handling with Results

use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
 
async fn error_handling() {
    let mut futures = FuturesUnordered::new();
    
    futures.push(async {
        sleep(Duration::from_millis(50)).await;
        Ok::<_, &str>("success 1")
    });
    
    futures.push(async {
        sleep(Duration::from_millis(100)).await;
        Err::<&str, _>("error 1")
    });
    
    futures.push(async {
        sleep(Duration::from_millis(75)).await;
        Ok("success 2")
    });
    
    // Process results, handling errors individually
    while let Some(result) = futures.next().await {
        match result {
            Ok(value) => println!("Success: {}", value),
            Err(e) => println!("Error: {}", e),
        }
    }
}
 
async fn collect_until_error() {
    let mut futures = FuturesUnordered::new();
    
    futures.push(async { Ok(1) });
    futures.push(async { Err("failure") as Result<i32, &str> });
    futures.push(async { Ok(3) });
    
    // Collect until first error
    let results: Vec<Result<i32, &str>> = futures.collect().await;
    
    // Results contains all outcomes, including errors
    for result in results {
        println!("{:?}", result);
    }
}

Handle errors per-future without stopping the entire collection.

FuturesOrdered vs FuturesUnordered

use futures::stream::{FuturesUnordered, FuturesOrdered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
 
async fn ordered_vs_unordered() {
    // FuturesOrdered: Yields results in insertion order
    let mut ordered = FuturesOrdered::new();
    ordered.push(async {
        sleep(Duration::from_millis(100)).await;
        1
    });
    ordered.push(async {
        sleep(Duration::from_millis(50)).await;
        2
    });
    
    // Results: 1, 2 (insertion order, not completion order)
    let ordered_results: Vec<i32> = ordered.collect().await;
    assert_eq!(ordered_results, vec![1, 2]);
    
    // FuturesUnordered: Yields results in completion order
    let mut unordered = FuturesUnordered::new();
    unordered.push(async {
        sleep(Duration::from_millis(100)).await;
        1
    });
    unordered.push(async {
        sleep(Duration::from_millis(50)).await;
        2
    });
    
    // Results: 2, 1 (completion order)
    let unordered_results: Vec<i32> = unordered.collect().await;
    assert_eq!(unordered_results, vec![2, 1]);
}

FuturesOrdered preserves insertion order; FuturesUnordered yields in completion order.

Capacity Management

use futures::stream::FuturesUnordered;
 
fn capacity_management() {
    // Create with pre-allocated capacity
    let mut futures: FuturesUnordered<_> = FuturesUnordered::with_capacity(100);
    
    // Capacity doesn't limit the number of futures
    // It's just pre-allocation for efficiency
    for i in 0..200 {
        futures.push(async move { i });
    }
    
    // shrink_to_fit() reduces memory after removing futures
    futures.clear();
    futures.shrink_to_fit();
}

Use with_capacity to pre-allocate for known sizes, similar to Vec.

Length and IsEmpty

use futures::stream::{FuturesUnordered, StreamExt};
 
async fn length_tracking() {
    let mut futures = FuturesUnordered::new();
    
    assert!(futures.is_empty());
    assert_eq!(futures.len(), 0);
    
    futures.push(async { 1 });
    futures.push(async { 2 });
    
    assert_eq!(futures.len(), 2);
    assert!(!futures.is_empty());
    
    // After polling, length decreases
    futures.next().await;
    assert_eq!(futures.len(), 1);
    
    futures.next().await;
    assert!(futures.is_empty());
}

Track the number of pending futures with len() and is_empty().

Clear and Extend

use futures::stream::{FuturesUnordered, StreamExt};
use std::iter;
 
async fn clear_and_extend() {
    let mut futures = FuturesUnordered::new();
    
    // Add multiple futures at once
    futures.extend((0..5).map(|i| async move { i * 2 }));
    
    assert_eq!(futures.len(), 5);
    
    // Remove all futures
    futures.clear();
    
    assert!(futures.is_empty());
    
    // Extend again
    futures.extend(iter::repeat(async { 42 }).take(3));
    
    let results: Vec<i32> = futures.collect().await;
    assert_eq!(results.len(), 3);
}

Use extend to add multiple futures, clear to remove all pending futures.

Real-World Example: Concurrent HTTP Requests

use futures::stream::{FuturesUnordered, StreamExt};
use reqwest::Client;
use std::collections::HashMap;
 
#[derive(Debug)]
struct FetchResult {
    url: String,
    status: u16,
    duration_ms: u64,
}
 
async fn fetch_urls(urls: Vec<String>) -> Vec<FetchResult> {
    let client = Client::new();
    let mut futures = FuturesUnordered::new();
    
    for url in urls {
        let client = client.clone();
        futures.push(async move {
            let start = std::time::Instant::now();
            let response = client.get(&url).send().await?;
            let duration = start.elapsed();
            
            Ok::<_, reqwest::Error>(FetchResult {
                url,
                status: response.status().as_u16(),
                duration_ms: duration.as_millis() as u64,
            })
        });
    }
    
    let mut results = Vec::new();
    while let Some(result) = futures.next().await {
        match result {
            Ok(fetch_result) => {
                println!("Completed: {} ({}ms)", fetch_result.url, fetch_result.duration_ms);
                results.push(fetch_result);
            }
            Err(e) => {
                eprintln!("Failed: {}", e);
            }
        }
    }
    
    results
}

Fetch multiple URLs concurrently, processing results as each request completes.

Real-World Example: Rate-Limited Concurrent Processing

use futures::stream::{FuturesUnordered, StreamExt};
use std::sync::Arc;
use tokio::sync::Semaphore;
 
async fn rate_limited_processing(items: Vec<i32>, max_concurrent: usize) -> Vec<i32> {
    let semaphore = Arc::new(Semaphore::new(max_concurrent));
    let mut futures = FuturesUnordered::new();
    
    for item in items {
        let semaphore = semaphore.clone();
        futures.push(async move {
            let permit = semaphore.acquire().await.unwrap();
            let result = process_item(item).await;
            drop(permit); // Release permit
            result
        });
    }
    
    futures.collect().await
}
 
async fn process_item(item: i32) -> i32 {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    item * 2
}
 
async fn main() {
    let items: Vec<i32> = (0..20).collect();
    let results = rate_limited_processing(items, 5).await;
    println!("Processed {} items", results.len());
}

Combine FuturesUnordered with a semaphore for rate-limited concurrent execution.

Real-World Example: Task Pool with Cancellation

use futures::stream::{FuturesUnordered, StreamExt};
use tokio::sync::broadcast;
 
async fn task_pool_with_cancellation() {
    let (cancel_tx, cancel_rx) = broadcast::channel::<()>(1);
    let mut futures = FuturesUnordered::new();
    
    // Spawn tasks that listen for cancellation
    for i in 0..10 {
        let mut cancel_rx = cancel_rx.resubscribe();
        futures.push(async move {
            tokio::select! {
                result = process_task(i) => result,
                _ = cancel_rx.recv() => {
                    println!("Task {} cancelled", i);
                    -1
                }
            }
        });
    }
    
    // Process results, cancel remaining on first error
    while let Some(result) = futures.next().await {
        if result < 0 {
            println!("Error detected, cancelling remaining tasks");
            let _ = cancel_tx.send(());
            break;
        }
        println!("Task result: {}", result);
    }
}
 
async fn process_task(id: i32) -> i32 {
    tokio::time::sleep(std::time::Duration::from_millis(id as u64 * 100)).await;
    id * id
}

Use broadcast channels with FuturesUnordered for coordinated cancellation.

Concurrency vs Parallelism

use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::{sleep, Instant};
 
async fn demonstrating_concurrency() {
    let start = Instant::now();
    
    let mut futures = FuturesUnordered::new();
    
    // These futures run concurrently, not necessarily in parallel
    // The runtime decides how to schedule them
    for i in 0..5 {
        futures.push(async move {
            sleep(Duration::from_millis(100)).await;
            i
        });
    }
    
    futures.collect::<Vec<_>>().await;
    
    let elapsed = start.elapsed();
    println!("Total time: {:?}", elapsed);
    
    // All 5 futures complete in ~100ms (concurrent)
    // If sequential, would take ~500ms
}

FuturesUnordered enables concurrency; the runtime handles parallelism.

Memory Efficiency

use futures::stream::{FuturesUnordered, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
 
async fn memory_efficient() {
    // Process a large number of futures without keeping all in memory
    let mut futures = FuturesUnordered::new();
    
    // Start with initial batch
    for i in 0..10 {
        futures.push(process_item(i));
    }
    
    let mut next_item = 10;
    let total_items = 100;
    
    while let Some(result) = futures.next().await {
        println!("Got result: {}", result);
        
        // Add more work as items complete
        if next_item < total_items {
            futures.push(process_item(next_item));
            next_item += 1;
        }
    }
}
 
async fn process_item(id: i32) -> i32 {
    sleep(Duration::from_millis(50)).await;
    id * 2
}

Keep a bounded number of active futures to manage memory consumption.

Synthesis

Key characteristics of FuturesUnordered:

Property Behavior
Order Completion order (first completed, first yielded)
Growth Dynamic insertion at any time
Concurrency Polls all futures concurrently
Memory Stores futures until completion
Interface Implements Stream

Comparison with alternatives:

Collection Order Use Case
FuturesUnordered Completion order Process results as they arrive
FuturesOrdered Insertion order Results must match input order
Vec<JoinHandle> Manual order Spawned tasks, manual management
join_all! All complete Wait for all, don't stream

Common patterns:

  1. Fire-and-forget collection: Add futures, process results as they complete
  2. Dynamic work queue: Add new futures while processing existing ones
  3. Streaming pipeline: Chain with stream combinators for complex workflows
  4. Concurrent with limits: Combine with semaphore for bounded concurrency

Best practices:

  1. Use when completion order doesn't matter
  2. Add futures dynamically for work-queue patterns
  3. Handle errors per-future, not globally
  4. Use len() to track pending futures
  5. Combine with timeouts for resilient systems

Key insight: FuturesUnordered bridges the gap between individual futures and batch operations like join_all. It provides a streaming interface over a collection of futures, yielding each result as soon as it's ready. This is fundamentally different from Vec<Future> or join_all, which either require sequential polling or wait for all futures to complete. The ability to add futures dynamically, combined with the stream interface, makes FuturesUnordered ideal for scenarios where work arrives over time and results should be processed immediately upon completion. Whether building a job queue, handling multiple network requests, or implementing a concurrent task pool, FuturesUnordered provides the right abstraction for responsive, event-driven asynchronous code.