What is the difference between futures::stream::SelectAll and select for concurrent stream handling?

futures::stream::SelectAll merges multiple streams into a single stream that yields items from any of its constituent streams as they become available, continuing until all streams are exhausted, while select races multiple futures or streams and returns the first one that completes, cancelling the others. The key distinction is persistence: SelectAll maintains an ongoing collection of streams and produces a combined stream that runs all of them to completion, whereas select is a one-time operation that determines which of several competing operations finishes first. For streams specifically, select can be used in a loop to process items as they arrive, but it requires manual management of the remaining streams and doesn't handle the case where multiple streams have items ready simultaneously. SelectAll handles this automatically, managing the collection of streams and yielding items in the order they become available across all streams.

Basic select Operation

use futures::future::{self, select};
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    // select races futures, returns first to complete
    let fut1 = async {
        tokio::time::sleep(Duration::from_millis(100)).await;
        "fast"
    };
    
    let fut2 = async {
        tokio::time::sleep(Duration::from_millis(200)).await;
        "slow"
    };
    
    // Race: first to complete wins
    let result = select(fut1, fut2).await;
    
    match result {
        future::Either::Left((value, _remaining)) => {
            println!("First future won: {}", value);
        }
        future::Either::Right((value, _remaining)) => {
            println!("Second future won: {}", value);
        }
    }
}

select races futures and returns the winner, discarding the loser.

select with Streams

use futures::stream::{self, StreamExt};
use futures::future::select;
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    let mut stream1 = stream::iter(vec
![1, 2, 3]).throttle(Duration::from_millis(100));
    let mut stream2 = stream::iter(vec
![4, 5, 6]).throttle(Duration::from_millis(150));
    
    // Manual loop with select on next() futures
    loop {
        let fut1 = stream1.next();
        let fut2 = stream2.next();
        
        match select(fut1, fut2).await {
            future::Either::Left((Some(val), _)) => {
                println!("From stream1: {}", val);
            }
            future::Either::Right((Some(val), _)) => {
                println!("From stream2: {}", val);
            }
            _ => break,  // Both streams exhausted
        }
    }
}

select with streams requires manual looping and doesn't handle stream management.

SelectAll Basics

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec
![1, 2, 3]);
    let stream2 = stream::iter(vec
![4, 5, 6]);
    let stream3 = stream::iter(vec
![7, 8, 9]);
    
    // Merge all streams into one
    let mut merged: SelectAll<_> = SelectAll::new();
    merged.push(stream1);
    merged.push(stream2);
    merged.push(stream3);
    
    // Items arrive as they become available
    while let Some(value) = merged.next().await {
        println!("Received: {}", value);
    }
    
    // All items from all streams are yielded
    // Order depends on which stream yields first
}

SelectAll merges streams and yields items from all of them.

SelectAll Construction Methods

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    // Method 1: new() + push()
    let mut merged1: SelectAll<_> = SelectAll::new();
    merged1.push(stream::iter(vec
![1, 2]));
    merged1.push(stream::iter(vec
![3, 4]));
    
    // Method 2: from iterator
    let streams = [
        stream::iter(vec
![1, 2]),
        stream::iter(vec
![3, 4]),
        stream::iter(vec
![5, 6]),
    ];
    let mut merged2: SelectAll<_> = streams.into_iter().collect();
    
    // Both methods produce equivalent behavior
    println!("Merged stream ready");
}

SelectAll can be constructed incrementally with push or from an iterator.

Key Difference: Cancellation Behavior

use futures::future::{self, select};
use futures::stream::{self, StreamExt, SelectAll};
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    // select: Loser futures are DROPPED
    println!("--- select behavior ---");
    let fut1 = async {
        println!("Future 1 started");
        tokio::time::sleep(Duration::from_millis(50)).await;
        println!("Future 1 completed");
        1
    };
    
    let fut2 = async {
        println!("Future 2 started");
        tokio::time::sleep(Duration::from_millis(200)).await;
        println!("Future 2 completed");  // Never printed!
        2
    };
    
    let result = select(fut1, fut2).await;
    println!("select result: {:?}", result);
    
    // SelectAll: ALL streams run to completion
    println!("--- SelectAll behavior ---");
    let stream1 = stream::iter(vec
![10, 20, 30])
        .then(|x| async move {
            println!("Stream1 yielding {}", x);
            x
        });
    
    let stream2 = stream::iter(vec
![100, 200, 300])
        .then(|x| async move {
            println!("Stream2 yielding {}", x);
            x
        });
    
    let mut merged: SelectAll<_> = SelectAll::new();
    merged.push(stream1);
    merged.push(stream2);
    
    // All items from both streams will be yielded
    while let Some(val) = merged.next().await {
        println!("Got: {}", val);
    }
}

select cancels losers; SelectAll runs all streams to completion.

Dynamic Stream Management

use futures::stream::{StreamExt, SelectAll};
use tokio::sync::mpsc;
 
#[tokio::main]
async fn main() {
    let (tx1, rx1) = mpsc::channel::<i32>(10);
    let (tx2, rx2) = mpsc::channel::<i32>(10);
    
    let mut merged: SelectAll<_> = SelectAll::new();
    merged.push(rx1);
    merged.push(rx2);
    
    // Spawn tasks that send to channels
    tokio::spawn(async move {
        tx1.send(1).await.unwrap();
        tx1.send(2).await.unwrap();
    });
    
    tokio::spawn(async move {
        tx2.send(10).await.unwrap();
        tx2.send(20).await.unwrap();
    });
    
    // Process items as they arrive from any stream
    let mut count = 0;
    while let Some(value) = merged.next().await {
        println!("Received: {}", value);
        count += 1;
        if count >= 4 {
            break;
        }
    }
}

SelectAll supports dynamic stream addition during its lifetime.

Handling Multiple Ready Items

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec
![1, 2, 3]);
    let stream2 = stream::iter(vec
![4, 5, 6]);
    
    let mut merged: SelectAll<_> = SelectAll::new();
    merged.push(stream1);
    merged.push(stream2);
    
    // When multiple streams have items ready,
    // SelectAll yields from one (implementation-defined order)
    // but processes all items eventually
    
    let mut items = Vec::new();
    while let Some(val) = merged.next().await {
        items.push(val);
    }
    
    println!("All items: {:?}", items);
    // Contains all 6 items, order may vary
}

SelectAll handles multiple ready streams automatically.

select! Macro

use tokio::select;
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    // tokio::select! is similar to futures::future::select
    // but with ergonomic syntax for multiple branches
    
    let mut interval1 = tokio::time::interval(Duration::from_millis(100));
    let mut interval2 = tokio::time::interval(Duration::from_millis(150));
    
    let mut count = 0;
    loop {
        select! {
            _ = interval1.tick() => {
                println!("Tick from interval1");
            }
            _ = interval2.tick() => {
                println!("Tick from interval2");
            }
            _ = tokio::time::sleep(Duration::from_millis(500)) => {
                println!("Timeout, breaking");
                break;
            }
        }
        
        count += 1;
        if count >= 5 {
            break;
        }
    }
}

select! provides ergonomic syntax for racing multiple operations.

Comparison: Stream Processing

use futures::stream::{self, StreamExt, SelectAll};
use futures::future::select;
 
#[tokio::main]
async fn main() {
    // Approach 1: select with manual loop (complex)
    println!("--- Manual select loop ---");
    let mut s1 = stream::iter(vec
![1, 2, 3]);
    let mut s2 = stream::iter(vec
![4, 5, 6]);
    
    loop {
        let f1 = s1.next();
        let f2 = s2.next();
        
        match select(f1, f2).await {
            future::Either::Left((Some(v), _)) => println!("s1: {}", v),
            future::Either::Right((Some(v), _)) => println!("s2: {}", v),
            future::Either::Left((None, _)) | 
            future::Either::Right((None, _)) => break,
        }
    }
    
    // Approach 2: SelectAll (simpler)
    println!("--- SelectAll ---");
    let s1 = stream::iter(vec
![1, 2, 3]);
    let s2 = stream::iter(vec
![4, 5, 6]);
    
    let mut merged: SelectAll<_> = [s1, s2].into_iter().collect();
    
    while let Some(v) = merged.next().await {
        println!("merged: {}", v);
    }
}

SelectAll simplifies concurrent stream processing significantly.

Empty SelectAll Behavior

use futures::stream::{StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    // Empty SelectAll yields nothing immediately
    let mut empty: SelectAll<i32> = SelectAll::new();
    
    // next() returns None immediately
    let result = empty.next().await;
    println!("Empty SelectAll: {:?}", result);  // None
    
    // Adding streams makes it productive
    let mut merged: SelectAll<_> = SelectAll::new();
    
    let items: Vec<i32> = merged.collect().await;
    println!("Collected from empty: {:?}", items);  // []
}

SelectAll with no streams completes immediately with no items.

Stream Exhaustion

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec
![1, 2, 3]);
    let stream2 = stream::iter(vec
![4, 5]);
    let stream3 = stream::iter(vec
![6, 7, 8, 9]);
    
    let mut merged: SelectAll<_> = SelectAll::new();
    merged.push(stream1);  // 3 items
    merged.push(stream2);  // 2 items
    merged.push(stream3);  // 4 items
    
    // Process until all streams exhausted
    let mut count = 0;
    while let Some(_) = merged.next().await {
        count += 1;
    }
    
    println!("Total items: {}", count);  // 9
}

SelectAll yields None only when all constituent streams are exhausted.

Backpressure and Buffering

use futures::stream::{StreamExt, SelectAll};
use tokio::sync::mpsc;
 
#[tokio::main]
async fn main() {
    let (tx1, rx1) = mpsc::channel::<i32>(2);  // Buffer size 2
    let (tx2, rx2) = mpsc::channel::<i32>(2);
    
    let mut merged: SelectAll<_> = SelectAll::new();
    merged.push(rx1);
    merged.push(rx2);
    
    // Producers
    let producer1 = tokio::spawn(async move {
        for i in 0..5 {
            if tx1.send(i).await.is_err() {
                println!("Channel 1 closed");
                break;
            }
            println!("Sent {} to channel 1", i);
        }
    });
    
    let producer2 = tokio::spawn(async move {
        for i in 10..15 {
            if tx2.send(i).await.is_err() {
                println!("Channel 2 closed");
                break;
            }
            println!("Sent {} to channel 2", i);
        }
    });
    
    // Consumer
    while let Some(val) = merged.next().await {
        println!("Received: {}", val);
    }
}

SelectAll respects backpressure from constituent streams.

Error Handling

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    type Result<T> = std::result::Result<T, &'static str>;
    
    let stream1 = stream::iter(vec
![Ok(1), Ok(2)]);
    let stream2 = stream::iter(vec
![Ok(10), Err("error in stream2"), Ok(30)]);
    
    let mut merged: SelectAll<_> = SelectAll::new();
    merged.push(stream1);
    merged.push(stream2);
    
    // Errors are propagated as values
    while let Some(result) = merged.next().await {
        match result {
            Ok(val) => println!("Value: {}", val),
            Err(e) => println!("Error: {}", e),
        }
    }
}

SelectAll yields values as-is, including errors.

Futures Unordered: Alternative to SelectAll

use futures::stream::{self, StreamExt};
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    // futures::stream::futures_unordered is similar to SelectAll
    // but for futures (not streams)
    
    let futures = vec
![
        async {
            tokio::time::sleep(Duration::from_millis(100)).await;
            1
        },
        async {
            tokio::time::sleep(Duration::from_millis(50)).await;
            2
        },
        async {
            tokio::time::sleep(Duration::from_millis(150)).await;
            3
        },
    ];
    
    // Collect results as futures complete
    let mut stream = stream::iter(futures).buffer_unordered(3);
    
    while let Some(result) = stream.next().await {
        println!("Future completed: {}", result);
    }
}

futures_unordered and buffer_unordered apply similar concepts to futures.

Synthesis

Comparison table:

Feature select SelectAll
Input 2 futures/streams Multiple streams
Output First to complete All items from all streams
Cancellation Drops losers Runs all to completion
Reusability One-shot Ongoing stream
Stream support Manual loop needed Built-in
Dynamic addition No Yes (with push)

When to use each:

Scenario Recommendation
Race multiple futures, take first winner select
Process items from multiple streams SelectAll
Timeout pattern (cancel after duration) select
Merge concurrent data sources SelectAll
One-shot decision between operations select
Event handling from multiple sources SelectAll

Key insight: select and SelectAll serve fundamentally different purposes. select is a racing operation: given multiple futures, determine which completes first and cancel the rest. This is ideal for timeouts (race an operation against a timer), cancellation (race an operation against a cancellation signal), or choosing between alternatives (race multiple lookup strategies and use whichever finishes first). SelectAll is a merging operation: given multiple streams, yield items from all of them as they become available. This is ideal for event aggregation (combine events from multiple sources), request fan-in (merge responses from multiple services), or any scenario where you want to process items from multiple concurrent producers. The "select" in SelectAll refers to the internal mechanism—selecting which stream to poll next based on readiness—not to the cancellation semantics of future::select. When choosing between them, ask: should the slower operation be cancelled (select), or should all operations run to completion (SelectAll)?