How does futures::stream::select handle multiple streams and what happens when one stream ends?

futures::stream::select combines multiple streams into a single stream that yields items from any source as they become available, with fairness guarantees that prevent one fast stream from starving others. When one stream ends, select continues yielding items from the remaining streams until all are exhausted—only then does the combined stream end. This is different from select_all which has slightly different semantics, and from try_select which short-circuits on errors. The key behavior is that select completes only when ALL input streams complete, making it suitable for scenarios where you want to process all available work regardless of which source finishes first.

Basic Stream Selection

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Create two streams
    let stream1 = stream::iter(vec![1, 2, 3]);
    let stream2 = stream::iter(vec![4, 5, 6]);
    
    // Select combines them, yielding from whichever is ready
    let mut combined = stream::select(stream1, stream2);
    
    // Items arrive as they become available
    while let Some(item) = combined.next().await {
        println!("Got: {}", item);
    }
    
    // Output could be: 1, 4, 2, 5, 3, 6 (or any interleaving)
    // The exact order depends on which stream is polled first
}

select combines two streams into one that yields items from either source.

Fairness Between Streams

use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
 
#[tokio::main]
async fn main() {
    // Fast stream: yields immediately
    let fast_stream = stream::iter(vec!["fast1", "fast2", "fast3"]);
    
    // Slow stream: yields with delay
    let slow_stream = stream::iter(vec!["slow1", "slow2", "slow3"])
        .then(|item| async move {
            tokio::time::sleep(Duration::from_millis(10)).await;
            item
        });
    
    let mut combined = stream::select(fast_stream, slow_stream);
    
    while let Some(item) = combined.next().await {
        println!("Got: {}", item);
    }
    
    // Output: fast items arrive first, then slow items
    // select doesn't wait for slow stream when fast stream has items
}

select yields from whichever stream has items ready, without waiting for slow streams.

Stream Ends Continue Processing Others

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Stream that ends after 3 items
    let stream1 = stream::iter(vec![1, 2, 3]);
    
    // Stream that continues after stream1 ends
    let stream2 = stream::iter(vec![4, 5, 6, 7, 8]);
    
    let mut combined = stream::select(stream1, stream2);
    
    let mut results = Vec::new();
    while let Some(item) = combined.next().await {
        results.push(item);
    }
    
    // All items from both streams are collected
    assert_eq!(results.len(), 8);  // 3 + 5 = 8 items
    
    // When stream1 ends, stream2 continues
    // Combined stream only ends when BOTH streams end
}

When one stream ends, select continues with remaining streams.

Select with Three or More Streams

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec![1, 2]);
    let stream2 = stream::iter(vec![3, 4]);
    let stream3 = stream::iter(vec![5, 6]);
    
    // select takes two streams, so chain select calls
    let combined = stream::select(stream1, stream2);
    let mut combined = stream::select(combined, stream3);
    
    let mut results = Vec::new();
    while let Some(item) = combined.next().await {
        results.push(item);
    }
    
    assert_eq!(results.len(), 6);
}

select takes exactly two streams; chain calls for more streams.

select_all for Multiple Streams

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    // select_all handles arbitrary number of streams
    let streams = vec![
        stream::iter(vec![1, 2]),
        stream::iter(vec![3, 4]),
        stream::iter(vec![5, 6]),
        stream::iter(vec![7, 8]),
    ];
    
    let mut combined: SelectAll<_> = streams.into_iter().collect();
    
    let mut results = Vec::new();
    while let Some(item) = combined.next().await {
        results.push(item);
    }
    
    assert_eq!(results.len(), 8);
}

select_all combines a vector of streams of the same type.

Selecting Different Stream Types

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Streams must produce the same item type
    let stream1 = stream::iter(vec![1, 2, 3]);
    let stream2 = stream::iter(vec![4, 5, 6]);
    
    // This works - both produce i32
    let mut combined = stream::select(stream1, stream2);
    
    // Different types would require mapping first
    let string_stream = stream::iter(vec!["a", "b"])
        .map(|s| s.to_string());
    let num_stream = stream::iter(vec![1, 2])
        .map(|n| n.to_string());
    
    // Now both produce String
    let mut combined = stream::select(string_stream, num_stream);
}

select requires both streams to produce the same item type.

Priority with Stream Order

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec!["a", "b", "c"]);
    let stream2 = stream::iter(vec!["x", "y", "z"]);
    
    // stream1 is first argument - when both are ready,
    // stream1's items tend to come first
    let mut combined = stream::select(stream1, stream2);
    
    // Note: The exact priority behavior is implementation-defined
    // Don't rely on strict ordering when both streams are always ready
    
    while let Some(item) = combined.next().await {
        println!("Got: {}", item);
    }
}

Argument order can influence which stream is polled first.

Handling Stream Errors

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Regular select propagates errors
    let stream1 = stream::iter(vec![Ok(1), Ok(2)]);
    let stream2 = stream::iter(vec![Ok(3), Err("error")]);
    
    let mut combined = stream::select(stream1, stream2);
    
    // This stream yields Result<i32, &str>
    // Errors are yielded as-is, not short-circuiting
    
    while let Some(item) = combined.next().await {
        match item {
            Ok(n) => println!("Got: {}", n),
            Err(e) => println!("Error: {}", e),
        }
    }
}

select yields errors as items; use try_select for error short-circuiting.

try_select for Error Short-Circuit

use futures::stream::{self, TryStreamExt};
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let stream1 = stream::iter(vec![Ok(1), Ok(2)]);
    let stream2 = stream::iter(vec![Ok(3), Err("error")]);
    
    // try_select returns Result
    let combined = stream::try_select(stream1, stream2);
    
    // Use try_collect or try_next to handle errors
    let result: Result<Vec<i32>, _> = combined.try_collect().await;
    
    match result {
        Ok(items) => println!("Got all: {:?}", items),
        Err(e) => println!("Error encountered: {:?}", e),
    }
    
    Ok(())
}

try_select short-circuits on the first error from any stream.

Real-Time Event Processing

use futures::stream::{self, StreamExt};
use tokio::sync::mpsc;
 
#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel::<String>(10);
    let (tx2, mut rx2) = mpsc::channel::<String>(10);
    
    // Convert receivers to streams
    let stream1 = recv_to_stream(&mut rx1);
    let stream2 = recv_to_stream(&mut rx2);
    
    let mut combined = stream::select(stream1, stream2);
    
    // Process events from either source
    while let Some(event) = combined.next().await {
        println!("Event: {}", event);
    }
}
 
fn recv_to_stream(rx: &mut mpsc::Receiver<String>) -> impl Stream<Item = String> + '_ {
    async_stream::stream! {
        while let Some(msg) = rx.recv().await {
            yield msg;
        }
    }
}

select is ideal for merging event sources like channels or network connections.

Timeout with Select

use futures::stream::{self, StreamExt};
use tokio::time::{timeout, Duration};
 
#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec![1, 2, 3])
        .then(|n| async move {
            tokio::time::sleep(Duration::from_millis(10)).await;
            n
        });
    
    let stream2 = stream::iter(vec![4, 5, 6]);
    
    let mut combined = stream::select(stream1, stream2);
    
    // Apply timeout to combined stream
    while let Ok(Some(item)) = timeout(Duration::from_millis(100), combined.next()).await {
        println!("Got: {}", item);
    }
}

Combine select with timeout to handle slow streams.

Websocket and Heartbeat

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Simulated websocket stream
    let websocket = stream::iter(vec!["msg1", "msg2", "msg3"]);
    
    // Heartbeat stream
    let heartbeat = stream::unfold(0, |count| async move {
        tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
        Some(("ping", count + 1))
    });
    
    let mut combined = stream::select(websocket, heartbeat);
    
    // Process messages and heartbeats together
    while let Some(item) = combined.next().await {
        match item {
            "ping" => println!("Sending pong"),
            msg => println!("Processing: {}", msg),
        }
    }
}

select merges application messages with system events like heartbeats.

Select with Async Channels

use futures::stream::{self, StreamExt};
use tokio::sync::mpsc;
 
#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel::<i32>(10);
    let (tx2, mut rx2) = mpsc::channel::<i32>(10);
    
    // Spawn producers
    tokio::spawn(async move {
        tx1.send(1).await.unwrap();
        tx1.send(2).await.unwrap();
    });
    
    tokio::spawn(async move {
        tx2.send(10).await.unwrap();
        tx2.send(20).await.unwrap();
    });
    
    // Create streams from channels
    let stream1 = async_stream::stream! {
        while let Some(item) = rx1.recv().await {
            yield item;
        }
    };
    
    let stream2 = async_stream::stream! {
        while let Some(item) = rx2.recv().await {
            yield item;
        }
    };
    
    let mut combined = stream::select(stream1, stream2);
    
    let mut results = Vec::new();
    while let Some(item) = combined.next().await {
        results.push(item);
    }
    
    // Results contain items from both channels
    assert_eq!(results.len(), 4);
}

Combine channel receivers into a single stream.

Order of Stream Completion

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Short stream ends first
    let short = stream::iter(vec![1, 2]);
    
    // Long stream continues after short ends
    let long = stream::iter(vec![3, 4, 5, 6]);
    
    let mut combined = stream::select(short, long);
    
    let mut count = 0;
    let mut from_short = 0;
    let mut from_long = 0;
    
    while let Some(item) = combined.next().await {
        count += 1;
        if item <= 2 {
            from_short += 1;
        } else {
            from_long += 1;
        }
    }
    
    // Short stream: 2 items
    // Long stream: 4 items
    // Combined: 6 total
    assert_eq!(count, 6);
    assert_eq!(from_short, 2);
    assert_eq!(from_long, 4);
}

select completes only after all streams complete.

Canceling One Stream

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Infinite stream
    let infinite = stream::repeat(1);
    
    // Finite stream that will end
    let finite = stream::iter(vec!["a", "b", "c"]);
    
    // Take only 10 items from combined stream
    let mut combined = stream::select(infinite, finite)
        .take(10);
    
    let results: Vec<_> = combined.collect().await;
    
    // Even though one stream is infinite, take(10) limits us
    assert_eq!(results.len(), 10);
}

Use combinators like take to limit infinite streams.

Select and Backpressure

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Fast-producing stream
    let fast = stream::iter(0..1000);
    
    // Slow-consuming process
    let mut combined = stream::select(fast, stream::empty::<i32>());
    
    // Process items slowly - backpressure is automatic
    // select doesn't buffer; it yields one item at a time
    while let Some(item) = combined.next().await {
        println!("Processing: {}", item);
        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    }
}

select provides one item at a time with no implicit buffering.

Buffered Select

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec![1, 2, 3, 4, 5]);
    let stream2 = stream::iter(vec![10, 20, 30, 40, 50]);
    
    // Use .buffered() to prefetch from streams
    let mut combined = stream::select(stream1, stream2)
        .buffered(10);  // Buffer up to 10 items
    
    while let Some(item) = combined.next().await {
        println!("Got: {}", item);
    }
}

Add buffering for smoother processing when streams have latency.

Unordered vs Ordered Results

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    let stream1 = stream::iter(vec![1, 2, 3]);
    let stream2 = stream::iter(vec![4, 5, 6]);
    
    // select provides no ordering guarantees
    let combined = stream::select(stream1, stream2);
    
    // Items arrive as streams produce them
    // No guarantee about which stream's items come first
    
    // If you need ordered results from each stream:
    let stream1 = stream::iter(vec![1, 2, 3]).map(|n| (1, n));
    let stream2 = stream::iter(vec![4, 5, 6]).map(|n| (2, n));
    
    let mut combined = stream::select(stream1, stream2);
    
    let mut results: Vec<(i32, i32)> = Vec::new();
    while let Some(item) = combined.next().await {
        results.push(item);
    }
    
    // Tag items with source stream if order matters
    for (stream_id, value) in results {
        println!("From stream {}: {}", stream_id, value);
    }
}

select provides no ordering guarantees; tag items if source matters.

Dynamic Stream Addition with select_all

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    let mut selector: SelectAll<_> = SelectAll::new();
    
    // Add streams dynamically
    selector.push(stream::iter(vec![1, 2, 3]));
    selector.push(stream::iter(vec![4, 5, 6]));
    
    // Can add more streams later
    selector.push(stream::iter(vec![7, 8, 9]));
    
    let results: Vec<_> = selector.collect().await;
    
    assert_eq!(results.len(), 9);
}

SelectAll allows dynamically adding streams with push.

Removing Streams Dynamically

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn main() {
    // SelectAll doesn't support removal directly
    // Use a different pattern for dynamic addition/removal
    
    // One approach: use a channel to signal stream completion
    // and filter streams based on messages
    
    let mut selector: SelectAll<_> = SelectAll::new();
    selector.push(stream::iter(vec![1, 2, 3]));
    selector.push(stream::iter(vec![4, 5, 6]));
    
    // Streams complete naturally when exhausted
    // SelectAll removes completed streams automatically
    
    let results: Vec<_> = selector.collect().await;
    assert_eq!(results.len(), 6);
}

SelectAll automatically removes streams when they complete.

Comparison Table

Aspect select select_all try_select
Number of streams Exactly 2 Any number Exactly 2
Error handling Pass through Pass through Short-circuit
Dynamic addition No Yes (push) No
Returns on error Error as item Error as item Early return

Synthesis

futures::stream::select merges streams with important semantics:

Completion behavior: select completes only when ALL input streams complete. If one stream ends, the others continue. This makes select suitable for scenarios where you want all available data processed regardless of which source finishes first.

Fairness: select doesn't starve slow streams. When multiple streams have items ready, select yields from each fairly. However, when one stream has items and another doesn't, select yields from the ready stream without waiting.

Error handling: Regular select treats errors as regular items—use try_select when you need error short-circuiting. Errors from any stream are yielded, and processing continues.

Performance: select is zero-allocation when streams are ready and doesn't buffer. It polls streams in an interleaved fashion, making it efficient for combining I/O sources.

Key insight: select is the foundation for many async patterns—combining channel sources, merging event streams, handling heartbeat alongside application messages. When you need to process items from multiple sources as they become available, select provides the stream-level primitive. For more than two streams, use SelectAll which also supports dynamic stream addition.