What are the trade-offs between futures::stream::select and select_all for merging multiple streams?

select merges exactly two streams and identifies which stream produced each item, while select_all merges an arbitrary number of streams anonymously, losing source identity but supporting dynamic stream counts. select returns items wrapped with their stream index, making it useful when you need to know which source produced data. select_all returns items directly without source identification, simplifying code when source tracking isn't needed. The choice between them depends on whether you need source identification, have a fixed or dynamic number of streams, and whether you need to handle stream completion differently.

Basic select Usage

use futures::stream::{self, StreamExt, Select};
use std::time::Duration;
use tokio::time::interval;
 
#[tokio::main]
async fn main() {
    // Create two streams
    let stream1 = stream::iter(vec!["A1", "A2", "A3"]);
    let stream2 = stream::iter(vec!["B1", "B2", "B3"]);
    
    // select merges two streams
    let mut merged = stream1.select(stream2);
    
    // Items arrive in interleaved order
    let results: Vec<_> = merged.collect().await;
    // Results contain all items from both streams
    println!("Merged: {:?}", results);
}

select combines two streams into one that yields from whichever stream is ready first.

Basic select_all Usage

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    // Create multiple streams
    let streams = vec![
        stream::iter(vec!["A"]),
        stream::iter(vec!["B"]),
        stream::iter(vec!["C"]),
    ];
    
    // select_all merges any number of streams
    let merged: SelectAll<_> = streams.into_iter().collect();
    let results: Vec<_> = merged.collect().await;
    
    println!("Merged: {:?}", results);
}

select_all accepts a collection of streams of arbitrary size.

Source Identification with select

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    let sensor1 = stream::iter(vec![21.5, 22.1, 21.8]);
    let sensor2 = stream::iter(vec![23.0, 22.5, 23.2]);
    
    // select identifies which stream produced each item
    let mut merged = sensor1.select(sensor2);
    
    while let Some((stream_idx, value)) = merged.next().await {
        // stream_idx is 0 for sensor1, 1 for sensor2
        println!("Sensor {}: {}°C", stream_idx, value);
    }
}

select yields tuples of (stream_index, item), preserving source identity.

Anonymous Merging with select_all

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    let sources = vec![
        stream::iter(vec!["msg1"]),
        stream::iter(vec!["msg2"]),
        stream::iter(vec!["msg3"]),
    ];
    
    // select_all yields items directly, no source identification
    let merged: SelectAll<_> = sources.into_iter().collect();
    
    // Just get the items, don't care which source
    let messages: Vec<_> = merged.collect().await;
    println!("All messages: {:?}", messages);
}

select_all yields items directly when source identification isn't needed.

Fixed vs Dynamic Stream Count

use futures::stream::{self, StreamExt, SelectAll};
 
// select: Fixed two streams at compile time
async fn fixed_streams() {
    let stream1 = stream::iter(vec![1, 2, 3]);
    let stream2 = stream::iter(vec![4, 5, 6]);
    
    // Exactly two streams, type is known
    let merged = stream1.select(stream2);
    
    // Cannot easily add more streams later
}
 
// select_all: Dynamic number of streams
async fn dynamic_streams() {
    let mut merged: SelectAll<_> = SelectAll::new();
    
    // Add streams dynamically
    merged.push(stream::iter(vec![1, 2, 3]));
    merged.push(stream::iter(vec![4, 5, 6]));
    
    // Can add more streams at runtime
    let count = get_stream_count();
    for i in 0..count {
        merged.push(stream::iter(vec![i]));
    }
}
 
fn get_stream_count() -> usize { 3 }

SelectAll allows adding streams dynamically during execution.

Collecting vs Pushing Streams

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    // Method 1: Collect from iterator
    let streams = vec![
        stream::iter(vec![1]),
        stream::iter(vec![2]),
    ];
    let merged: SelectAll<_> = streams.into_iter().collect();
    
    // Method 2: Create empty and push
    let mut merged2: SelectAll<_> = SelectAll::new();
    merged2.push(stream::iter(vec![10]));
    merged2.push(stream::iter(vec![20]));
    
    // Both produce the same type
    let results: Vec<_> = merged.collect().await;
    let results2: Vec<_> = merged2.collect().await;
}

SelectAll::new() creates an empty merger that streams can be pushed to.

Completion Behavior

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    // select: Returns remaining stream when one completes
    let stream1 = stream::iter(vec![1, 2, 3]);
    let stream2 = stream::iter(vec![4]);
    
    let mut merged = stream1.select(stream2);
    
    // After consuming all items, one stream might still have data
    // select returns (remaining_stream, item) on each yield
    // When one stream ends, the other continues
    
    // select_all: Stream ends when ALL streams end
    let streams = vec![
        stream::iter(vec![1]),
        stream::iter(vec![2, 3, 4]),
    ];
    let merged: SelectAll<_> = streams.into_iter().collect();
    
    // merged yields None only when all streams are exhausted
}

select and select_all handle stream completion similarly—both terminate when all sources are exhausted.

Error Handling Differences

use futures::stream::{self, StreamExt, SelectAll, TryStreamExt};
 
#[tokio::main]
async fn main() {
    // For fallible streams, use select_next_some pattern
    let stream1 = stream::iter(vec![Ok(1), Ok(2)]);
    let stream2 = stream::iter(vec![Ok(3), Err("error")]);
    
    // select with error propagation
    let mut merged = stream1.select(stream2);
    
    while let Some(result) = merged.next().await {
        match result {
            Ok((idx, value)) => println!("Stream {} yielded {}", idx, value),
            Err(e) => {
                println!("Error: {}", e);
                break;
            }
        }
    }
    
    // select_all with errors
    let streams = vec![
        stream::iter(vec![Ok(1), Ok(2)]),
        stream::iter(vec![Ok(3), Err("error")]),
    ];
    
    let merged: SelectAll<_> = streams.into_iter().collect();
    
    // Process with error handling
    let results: Vec<_> = merged
        .take_while(|item| futures::future::ready(item.is_ok()))
        .collect()
        .await;
}

Both handle errors, but select preserves which stream errored.

Chaining select for Multiple Streams

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec!["A"]);
    let stream2 = stream::iter(vec!["B"]);
    let stream3 = stream::iter(vec!["C"]);
    
    // Chain select calls for multiple streams
    let merged = stream1.select(stream2).select(stream3);
    
    // Note: indices become nested/confusing
    // First select yields (0 or 1, item)
    // After chaining, interpretation becomes complex
    
    let results: Vec<_> = merged.collect().await;
    println!("Chained: {:?}", results);
    
    // For 3+ streams, select_all is usually clearer
}

Chaining select works but index interpretation becomes confusing.

Use Case: Priority Handling

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // select naturally supports priority by stream index
    let high_priority = stream::iter(vec!["urgent1", "urgent2"]);
    let low_priority = stream::iter(vec!["normal1", "normal2"]);
    
    let merged = high_priority.select(low_priority);
    
    // Index tells you priority level
    merged.for_each(|(idx, msg)| async move {
        let priority = if idx == 0 { "HIGH" } else { "LOW" };
        println!("[{}] {}", priority, msg);
    }).await;
}

When source identity matters, select provides it naturally.

Use Case: Unordered Processing

use futures::stream::{self, StreamExt, SelectAll};
use std::time::Duration;
use tokio::time::sleep;
 
#[tokio::main]
async fn main() {
    // Simulating multiple workers producing results
    let worker1 = stream::iter(vec![1, 2, 3])
        .then(|n| async move {
            sleep(Duration::from_millis(100)).await;
            n
        });
    
    let worker2 = stream::iter(vec![4, 5, 6])
        .then(|n| async move {
            sleep(Duration::from_millis(50)).await;
            n
        });
    
    let worker3 = stream::iter(vec![7, 8, 9])
        .then(|n| async move {
            sleep(Duration::from_millis(75)).await;
            n
        });
    
    // select_all for unordered processing - don't care which worker
    let merged: SelectAll<_> = vec![worker1, worker2, worker3]
        .into_iter()
        .collect();
    
    let results: Vec<_> = merged.collect().await;
    // Results arrive in completion order, not source order
    println!("Results (completion order): {:?}", results);
}

When you don't need source identity, select_all is simpler and more flexible.

Dynamic Stream Addition

use futures::stream::{self, StreamExt, SelectAll};
use tokio::sync::mpsc;
 
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<i32>(10);
    let mut merged: SelectAll<_> = SelectAll::new();
    
    // Add initial stream
    merged.push(stream::iter(vec![1, 2, 3]));
    
    // Spawn task that sends new streams dynamically
    let handle = tokio::spawn(async move {
        // Could add streams based on events, config, etc.
        tx.send(4).await.unwrap();
        tx.send(5).await.unwrap();
    });
    
    // Combine channel with other streams
    merged.push(rx.into_stream());
    
    // Process all items uniformly
    while let Some(item) = merged.next().await {
        println!("Received: {}", item);
    }
}

SelectAll supports adding streams at runtime.

Performance Considerations

use futures::stream::{self, StreamExt, SelectAll};
 
// select: Constant overhead for two streams
// O(1) to select between them
 
// select_all: Scales with number of streams
// Internally maintains collection of active streams
 
async fn performance_example() {
    // Two streams: select is optimal
    let stream1 = stream::iter(vec![1]);
    let stream2 = stream::iter(vec![2]);
    let merged = stream1.select(stream2);
    // Minimal overhead, direct polling
    
    // Many streams: select_all is designed for this
    let streams: Vec<_> = (0..100)
        .map(|i| stream::iter(vec![i]))
        .collect();
    let merged: SelectAll<_> = streams.into_iter().collect();
    // Internally manages poll state for all streams
}
 
fn main() {}

select has minimal overhead for two streams; select_all scales to many.

Type Signature Comparison

use futures::stream::{Stream, Select, SelectAll};
 
// select produces Select<StreamA, StreamB>
// Yields (index: usize, item: Item)
 
// select_all produces SelectAll<Stream>
// Yields item directly (no index)
 
// Type signature differences:
// select: StreamA::Item and StreamB::Item must be the same type
// select_all: All streams in the collection must have the same Item type
 
fn example() {
    // Both require compatible item types
    let stream1 = futures::stream::iter(vec![1u32, 2, 3]);
    let stream2 = futures::stream::iter(vec![4u32, 5, 6]);
    
    // Works: same item type
    let merged = stream1.select(stream2);
    
    // Won't compile: different item types
    // let stream3 = futures::stream::iter(vec!["string"]);
    // let bad_merge = stream1.select(stream3);
}

Both require streams to produce compatible item types.

When to Use Each

use futures::stream::{self, StreamExt, SelectAll};
 
// Use select when:
// 1. You have exactly two streams
// 2. You need to know which stream produced each item
// 3. You want compile-time guarantee of stream count
// 4. Handling different semantics per source
 
async fn select_appropriate() {
    let user_events = stream::iter(vec!["click", "scroll"]);
    let system_events = stream::iter(vec!["timer", "network"]);
    
    user_events.select(system_events)
        .for_each(|(source, event)| async move {
            match source {
                0 => println!("User: {}", event),
                1 => println!("System: {}", event),
                _ => unreachable!(),
            }
        }).await;
}
 
// Use select_all when:
// 1. You have dynamic number of streams
// 2. You don't need source identification
// 3. You need to add streams at runtime
// 4. You have a collection of homogeneous streams
 
async fn select_all_appropriate() {
    let workers: Vec<_> = (0..10)
        .map(|id| stream::iter(vec![format!("task-{}", id)]))
        .collect();
    
    let merged: SelectAll<_> = workers.into_iter().collect();
    
    merged.for_each(|result| async move {
        println!("Completed: {}", result);
    }).await;
}
 
fn main() {}

Choose based on stream count, source identification needs, and runtime flexibility.

Synthesis

Quick reference:

use futures::stream::{self, StreamExt, SelectAll};
 
// select: Two streams, source identification
let merged = stream1.select(stream2);
while let Some((idx, item)) = merged.next().await {
    println!("From stream {}: {}", idx, item);
}
 
// select_all: Many streams, anonymous merging
let merged: SelectAll<_> = streams.into_iter().collect();
while let Some(item) = merged.next().await {
    println!("Item: {}", item);
}
 
// Key differences:
// 1. Stream count: select=2, select_all=arbitrary
// 2. Source ID: select preserves, select_all drops
// 3. Dynamic: select_all supports runtime addition
// 4. Type: select yields (usize, T), select_all yields T
// 5. Creation: select from two, select_all from Vec/iter
 
// Performance:
// - select: minimal overhead for two streams
// - select_all: scales to many streams efficiently

Key insight: The fundamental trade-off is source identity versus flexibility. select preserves which stream produced each item through the (usize, Item) tuple, enabling priority handling, differentiated processing, or debugging traceability. select_all trades this identity for flexibility—it handles dynamic stream counts, supports adding streams at runtime, and has simpler ergonomics when you just need to merge data sources. Use select when you have two sources with different semantics (user vs. system events, primary vs. fallback data) and need to track which source is active. Use select_all when you're aggregating homogeneous sources (multiple workers, shards, or connections) where all items are processed identically. For three or more streams, select_all is almost always clearer than chaining select calls.