How does futures::stream::BufferUnordered control concurrency compared to buffer?

futures::stream::BufferUnordered buffers futures and runs them concurrently up to a specified limit, yielding results in completion order rather than the order they were submitted. The buffer combinator, by contrast, maintains submission order—results are yielded in the same order their corresponding futures appeared in the source stream, even if later futures complete earlier. Both limit concurrency to the specified buffer size, but BufferUnordered allows faster results to propagate immediately without waiting for earlier futures to complete, while buffer enforces strict ordering at the cost of potential head-of-line blocking where a slow future delays all subsequent results.

Basic Buffer Behavior

use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
 
#[tokio::main]
async fn main() {
    // Create a stream of futures with varying completion times
    let futures = vec![
        async {
            tokio::time::sleep(Duration::from_millis(300)).await;
            "first"
        },
        async {
            tokio::time::sleep(Duration::from_millis(100)).await;
            "second"
        },
        async {
            tokio::time::sleep(Duration::from_millis(200)).await;
            "third"
        },
    ];
    
    let start = Instant::now();
    let results: Vec<_> = stream::iter(futures)
        .buffer(2)  // Process at most 2 futures concurrently
        .collect()
        .await;
    
    println!("Results: {:?}", results);
    println!("Total time: {:?}", start.elapsed());
}

buffer limits concurrency to 2 and yields results in submission order.

Basic BufferUnordered Behavior

use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
 
#[tokio::main]
async fn main() {
    let futures = vec![
        async {
            tokio::time::sleep(Duration::from_millis(300)).await;
            "first"
        },
        async {
            tokio::time::sleep(Duration::from_millis(100)).await;
            "second"
        },
        async {
            tokio::time::sleep(Duration::from_millis(200)).await;
            "third"
        },
    ];
    
    let start = Instant::now();
    let results: Vec<_> = stream::iter(futures)
        .buffer_unordered(2)  // Process at most 2 futures concurrently
        .collect()
        .await;
    
    println!("Results: {:?}", results);
    println!("Total time: {:?}", start.elapsed());
}

buffer_unordered yields results as they complete, not in submission order.

Ordering Difference Demonstration

use futures::stream::{self, StreamExt};
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    let futures = vec![
        async { "A" },  // Completes immediately
        async {
            tokio::time::sleep(Duration::from_millis(100)).await;
            "B"
        },
        async { "C" },  // Completes immediately
        async {
            tokio::time::sleep(Duration::from_millis(50)).await;
            "D"
        },
    ];
    
    // buffer: maintains order A, B, C, D
    let buffer_results: Vec<_> = stream::iter(futures.clone())
        .buffer(2)
        .collect()
        .await;
    println!("buffer order: {:?}", buffer_results);
    // Output: ["A", "B", "C", "D"]
    
    // buffer_unordered: yields as they complete
    let unordered_results: Vec<_> = stream::iter(futures)
        .buffer_unordered(2)
        .collect()
        .await;
    println!("buffer_unordered order: {:?}", unordered_results);
    // Output: ["A", "C", "D", "B"] (or similar - A and C complete first)
}

buffer preserves order; buffer_unordered yields on completion.

Head-of-Line Blocking

use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
 
#[tokio::main]
async fn main() {
    let futures = vec![
        async {
            tokio::time::sleep(Duration::from_secs(1)).await;
            "slow"
        },
        async { "fast1" },
        async { "fast2" },
        async { "fast3" },
    ];
    
    // With buffer: fast results wait for slow result
    let start = Instant::now();
    let results: Vec<_> = stream::iter(futures.clone())
        .buffer(10)
        .collect()
        .await;
    println!("buffer: {:?} in {:?}", results, start.elapsed());
    // Waits ~1 second, then yields all in order: ["slow", "fast1", "fast2", "fast3"]
    
    // With buffer_unordered: fast results yield immediately
    let start = Instant::now();
    let results: Vec<_> = stream::iter(futures)
        .buffer_unordered(10)
        .collect()
        .await;
    println!("buffer_unordered: {:?} in {:?}", results, start.elapsed());
    // Yields fast results first, slow last: ["fast1", "fast2", "fast3", "slow"]
}

buffer suffers head-of-line blocking; buffer_unordered doesn't.

Concurrency Limiting

use futures::stream::{self, StreamExt};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    let concurrent_count = Arc::new(AtomicUsize::new(0));
    let max_seen = Arc::new(AtomicUsize::new(0));
    
    let make_future = |id: usize, concurrent: Arc<AtomicUsize>, max: Arc<AtomicUsize>| {
        async move {
            let current = concurrent.fetch_add(1, Ordering::SeqCst) + 1;
            max.fetch_max(current, Ordering::SeqCst);
            println!("Starting {}", id);
            
            tokio::time::sleep(Duration::from_millis(50)).await;
            
            concurrent.fetch_sub(1, Ordering::SeqCst);
            println!("Finishing {}", id);
            id
        }
    };
    
    let cc = concurrent_count.clone();
    let mm = max_seen.clone();
    
    // buffer_unordered limits concurrency to 3
    let results: Vec<_> = stream::iter(0..10)
        .map(move |id| make_future(id, cc.clone(), mm.clone()))
        .buffer_unordered(3)
        .collect()
        .await;
    
    println!("Max concurrent: {}", max_seen.load(Ordering::SeqCst));
    // Max concurrent: 3 (never exceeds the limit)
    println!("Results: {:?}", results);
}

Both buffer and buffer_unordered enforce the concurrency limit.

Buffer Size and Backpressure

use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
 
#[tokio::main]
async fn main() {
    // When buffer is full, polling the stream blocks
    let futures: Vec<_> = (0..10)
        .map(|i| async move {
            tokio::time::sleep(Duration::from_millis(100)).await;
            i
        })
        .collect();
    
    let start = Instant::now();
    
    // Buffer size of 2 means:
    // - At most 2 futures run concurrently
    // - The stream is polled for new futures only when a slot opens
    let results: Vec<_> = stream::iter(futures)
        .buffer_unordered(2)
        .collect()
        .await;
    
    println!("Results: {:?}", results);
    println!("Time: {:?}", start.elapsed());
    // 10 futures / 2 concurrent * 100ms = ~500ms total
}

Buffer size controls both concurrency and backpressure.

Zero Buffer Size

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // buffer_unordered(0) means no buffering
    // Futures are polled one at a time (sequential)
    
    let results: Vec<_> = stream::iter(0..5)
        .map(|i| async move {
            println!("Processing {}", i);
            i * 2
        })
        .buffer_unordered(0)
        .collect()
        .await;
    
    println!("Results: {:?}", results);
    // Output: 0, 2, 4, 6, 8 (processed sequentially)
}

Buffer size of 0 means sequential execution.

Stream vs Iterator Source

use futures::stream::{self, StreamExt};
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    // From iterator - all futures known upfront
    let results: Vec<_> = stream::iter(vec![
        async { "a" },
        async { "b" },
        async { "c" },
    ])
        .buffer_unordered(2)
        .collect()
        .await;
    
    // From stream - futures produced on demand
    let stream_results: Vec<_> = stream::unfold(0, |count| async move {
        if count < 3 {
            let future = async move {
                tokio::time::sleep(Duration::from_millis(100)).await;
                count
            };
            Some((future, count + 1))
        } else {
            None
        }
    })
        .buffer_unordered(2)
        .collect()
        .await;
    
    println!("Iterator results: {:?}", results);
    println!("Stream results: {:?}", stream_results);
}

Both work with iterators and on-demand streams.

Error Handling

use futures::stream::{self, StreamExt};
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    // With Result types, both propagate errors
    let futures = vec![
        async { Ok::<_, &str>(1) },
        async { Err("error") },
        async { Ok(3) },
    ];
    
    // buffer: yields results until error, then stops
    // Order is preserved, error at position 1
    
    // buffer_unordered: error may come at any point depending on completion
    // Other futures continue running after error
    
    let results: Vec<_> = stream::iter(futures)
        .buffer_unordered(2)
        .take_while(|r| futures::future::ready(r.is_ok()))
        .collect()
        .await;
    
    println!("Results until error: {:?}", results);
}

Error handling depends on when errors occur in completion order.

Collecting Results in Order

use futures::stream::{self, StreamExt};
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    // When you need concurrency but ordered results:
    
    // Option 1: Use buffer() for ordered results
    let ordered: Vec<_> = stream::iter(vec![
        async { tokio::time::sleep(Duration::from_millis(300)).await; "first" },
        async { "second" },
        async { "third" },
    ])
        .buffer(3)  // All concurrent, but ordered results
        .collect()
        .await;
    
    println!("Ordered: {:?}", ordered);
    
    // Option 2: Use buffer_unordered and sort afterward
    let mut unordered: Vec<_> = stream::iter(vec![
        async { tokio::time::sleep(Duration::from_millis(300)).await; ("first", 0) },
        async { ("second", 1) },
        async { ("third", 2) },
    ])
        .buffer_unordered(3)
        .collect()
        .await;
    
    unordered.sort_by_key(|(_, i)| *i);
    println!("Sorted: {:?}", unordered);
}

Choose based on whether you need order preserved.

Use Case: HTTP Requests

use futures::stream::{self, StreamExt};
use std::time::Duration;
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec![
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/get",
    ];
    
    // buffer_unordered: process responses as they arrive
    let start = std::time::Instant::now();
    
    let client = reqwest::Client::new();
    
    let results: Vec<_> = stream::iter(urls.clone())
        .map(|url| {
            let client = &client;
            async move {
                let resp = client.get(url)
                    .timeout(Duration::from_secs(5))
                    .send()
                    .await?;
                let text = resp.text().await?;
                Ok::<_, Box<dyn std::error::Error + Send + Sync>>(text.len())
            }
        })
        .buffer_unordered(2)  // Max 2 concurrent requests
        .collect()
        .await;
    
    println!("Got {} responses in {:?}", results.len(), start.elapsed());
    
    Ok(())
}

buffer_unordered is ideal for I/O where order doesn't matter.

Use Case: Ordered Processing Pipeline

use futures::stream::{self, StreamExt};
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    // When order matters (e.g., processing numbered files)
    let files = vec![
        (1, "file1.txt"),
        (2, "file2.txt"),
        (3, "file3.txt"),
    ];
    
    // buffer: maintains file order
    let results: Vec<_> = stream::iter(files)
        .map(|(id, _name)| async move {
            tokio::time::sleep(Duration::from_millis(100)).await;
            id
        })
        .buffer(2)
        .collect()
        .await;
    
    // Results are always [1, 2, 3]
    println!("Processed in order: {:?}", results);
}

Use buffer when sequence order matters for correctness.

Comparison Summary

use futures::stream::{self, StreamExt};
use std::time::{Duration, Instant};
 
#[tokio::main]
async fn main() {
    let futures = vec![
        async {
            tokio::time::sleep(Duration::from_millis(300)).await;
            "A (slow)"
        },
        async {
            tokio::time::sleep(Duration::from_millis(100)).await;
            "B (fast)"
        },
        async {
            tokio::time::sleep(Duration::from_millis(50)).await;
            "C (fastest)"
        },
    ];
    
    // buffer: order preserved, head-of-line blocking
    let start = Instant::now();
    let buffer_results: Vec<_> = stream::iter(futures.clone())
        .buffer(3)
        .collect()
        .await;
    println!("buffer:");
    println!("  Results: {:?}", buffer_results);
    println!("  Time: {:?}", start.elapsed());
    println!("  Order: A, B, C (submission order)");
    
    // buffer_unordered: completion order, no blocking
    let start = Instant::now();
    let unordered_results: Vec<_> = stream::iter(futures)
        .buffer_unordered(3)
        .collect()
        .await;
    println!("\nbuffer_unordered:");
    println!("  Results: {:?}", unordered_results);
    println!("  Time: {:?}", start.elapsed());
    println!("  Order: C, B, A (completion order)");
}

Same concurrency limit, different ordering guarantees.

Summary Table

Aspect buffer buffer_unordered
Concurrency limit Yes Yes
Result order Submission order Completion order
Head-of-line blocking Yes No
When to use Order matters Order doesn't matter
Latency for fast results May wait for slow predecessors Yields immediately
Total completion time Same (for full parallelism) Same (for full parallelism)

Synthesis

Both combinators provide the same concurrency control but differ fundamentally in how they yield results:

buffer preserves submission order: The stream produces results in the same order the underlying futures were submitted. If future A is submitted before future B, then A's result will be yielded before B's result, even if B completes first. This causes head-of-line blocking: fast results wait for slower predecessors. Use buffer when the sequence matters—for example, processing numbered files, ordered messages, or when downstream consumers expect a specific order.

buffer_unordered yields on completion: Results are produced as soon as any buffered future completes, regardless of position in the input sequence. Fast results propagate immediately without waiting for earlier but slower futures. Use buffer_unordered when order doesn't matter—for example, parallel HTTP requests, concurrent computations whose results are independent, or when you can sort/associate results afterward using identifiers.

Key insight: Both enforce the same maximum concurrent futures. The difference is entirely about ordering semantics, not parallelism. With buffer, the stream's output order matches its input order. With buffer_unordered, the output order matches completion order. If all futures complete at similar speeds or with fixed timing, you might not notice the difference. The divergence appears when completion times vary unpredictably—exactly the case with I/O-bound workloads like network requests or file operations.

Performance consideration: For the same buffer size, both methods have identical total execution time when all futures are submitted upfront. The difference is when results become available: buffer_unordered yields results incrementally as they complete, while buffer may delay results until earlier futures finish. For streaming pipelines where downstream processing can start before all results are ready, buffer_unordered provides better overall throughput.