Loading pageā¦
Rust walkthroughs
Loading pageā¦
futures::stream::select handle multiple streams and what happens when one stream ends?futures::stream::select combines multiple streams into a single stream that yields items from any source as they become available, with fairness guarantees that prevent one fast stream from starving others. When one stream ends, select continues yielding items from the remaining streams until all are exhaustedāonly then does the combined stream end. This is different from select_all which has slightly different semantics, and from try_select which short-circuits on errors. The key behavior is that select completes only when ALL input streams complete, making it suitable for scenarios where you want to process all available work regardless of which source finishes first.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Create two streams
let stream1 = stream::iter(vec![1, 2, 3]);
let stream2 = stream::iter(vec![4, 5, 6]);
// Select combines them, yielding from whichever is ready
let mut combined = stream::select(stream1, stream2);
// Items arrive as they become available
while let Some(item) = combined.next().await {
println!("Got: {}", item);
}
// Output could be: 1, 4, 2, 5, 3, 6 (or any interleaving)
// The exact order depends on which stream is polled first
}select combines two streams into one that yields items from either source.
use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
#[tokio::main]
async fn main() {
// Fast stream: yields immediately
let fast_stream = stream::iter(vec!["fast1", "fast2", "fast3"]);
// Slow stream: yields with delay
let slow_stream = stream::iter(vec!["slow1", "slow2", "slow3"])
.then(|item| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
item
});
let mut combined = stream::select(fast_stream, slow_stream);
while let Some(item) = combined.next().await {
println!("Got: {}", item);
}
// Output: fast items arrive first, then slow items
// select doesn't wait for slow stream when fast stream has items
}select yields from whichever stream has items ready, without waiting for slow streams.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Stream that ends after 3 items
let stream1 = stream::iter(vec![1, 2, 3]);
// Stream that continues after stream1 ends
let stream2 = stream::iter(vec![4, 5, 6, 7, 8]);
let mut combined = stream::select(stream1, stream2);
let mut results = Vec::new();
while let Some(item) = combined.next().await {
results.push(item);
}
// All items from both streams are collected
assert_eq!(results.len(), 8); // 3 + 5 = 8 items
// When stream1 ends, stream2 continues
// Combined stream only ends when BOTH streams end
}When one stream ends, select continues with remaining streams.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec![1, 2]);
let stream2 = stream::iter(vec![3, 4]);
let stream3 = stream::iter(vec![5, 6]);
// select takes two streams, so chain select calls
let combined = stream::select(stream1, stream2);
let mut combined = stream::select(combined, stream3);
let mut results = Vec::new();
while let Some(item) = combined.next().await {
results.push(item);
}
assert_eq!(results.len(), 6);
}select takes exactly two streams; chain calls for more streams.
use futures::stream::{self, StreamExt, SelectAll};
#[tokio::main]
async fn main() {
// select_all handles arbitrary number of streams
let streams = vec![
stream::iter(vec![1, 2]),
stream::iter(vec![3, 4]),
stream::iter(vec![5, 6]),
stream::iter(vec![7, 8]),
];
let mut combined: SelectAll<_> = streams.into_iter().collect();
let mut results = Vec::new();
while let Some(item) = combined.next().await {
results.push(item);
}
assert_eq!(results.len(), 8);
}select_all combines a vector of streams of the same type.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Streams must produce the same item type
let stream1 = stream::iter(vec![1, 2, 3]);
let stream2 = stream::iter(vec![4, 5, 6]);
// This works - both produce i32
let mut combined = stream::select(stream1, stream2);
// Different types would require mapping first
let string_stream = stream::iter(vec!["a", "b"])
.map(|s| s.to_string());
let num_stream = stream::iter(vec![1, 2])
.map(|n| n.to_string());
// Now both produce String
let mut combined = stream::select(string_stream, num_stream);
}select requires both streams to produce the same item type.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec!["a", "b", "c"]);
let stream2 = stream::iter(vec!["x", "y", "z"]);
// stream1 is first argument - when both are ready,
// stream1's items tend to come first
let mut combined = stream::select(stream1, stream2);
// Note: The exact priority behavior is implementation-defined
// Don't rely on strict ordering when both streams are always ready
while let Some(item) = combined.next().await {
println!("Got: {}", item);
}
}Argument order can influence which stream is polled first.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Regular select propagates errors
let stream1 = stream::iter(vec![Ok(1), Ok(2)]);
let stream2 = stream::iter(vec![Ok(3), Err("error")]);
let mut combined = stream::select(stream1, stream2);
// This stream yields Result<i32, &str>
// Errors are yielded as-is, not short-circuiting
while let Some(item) = combined.next().await {
match item {
Ok(n) => println!("Got: {}", n),
Err(e) => println!("Error: {}", e),
}
}
}select yields errors as items; use try_select for error short-circuiting.
use futures::stream::{self, TryStreamExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let stream1 = stream::iter(vec![Ok(1), Ok(2)]);
let stream2 = stream::iter(vec![Ok(3), Err("error")]);
// try_select returns Result
let combined = stream::try_select(stream1, stream2);
// Use try_collect or try_next to handle errors
let result: Result<Vec<i32>, _> = combined.try_collect().await;
match result {
Ok(items) => println!("Got all: {:?}", items),
Err(e) => println!("Error encountered: {:?}", e),
}
Ok(())
}try_select short-circuits on the first error from any stream.
use futures::stream::{self, StreamExt};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel::<String>(10);
let (tx2, mut rx2) = mpsc::channel::<String>(10);
// Convert receivers to streams
let stream1 = recv_to_stream(&mut rx1);
let stream2 = recv_to_stream(&mut rx2);
let mut combined = stream::select(stream1, stream2);
// Process events from either source
while let Some(event) = combined.next().await {
println!("Event: {}", event);
}
}
fn recv_to_stream(rx: &mut mpsc::Receiver<String>) -> impl Stream<Item = String> + '_ {
async_stream::stream! {
while let Some(msg) = rx.recv().await {
yield msg;
}
}
}select is ideal for merging event sources like channels or network connections.
use futures::stream::{self, StreamExt};
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec![1, 2, 3])
.then(|n| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
n
});
let stream2 = stream::iter(vec![4, 5, 6]);
let mut combined = stream::select(stream1, stream2);
// Apply timeout to combined stream
while let Ok(Some(item)) = timeout(Duration::from_millis(100), combined.next()).await {
println!("Got: {}", item);
}
}Combine select with timeout to handle slow streams.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Simulated websocket stream
let websocket = stream::iter(vec!["msg1", "msg2", "msg3"]);
// Heartbeat stream
let heartbeat = stream::unfold(0, |count| async move {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
Some(("ping", count + 1))
});
let mut combined = stream::select(websocket, heartbeat);
// Process messages and heartbeats together
while let Some(item) = combined.next().await {
match item {
"ping" => println!("Sending pong"),
msg => println!("Processing: {}", msg),
}
}
}select merges application messages with system events like heartbeats.
use futures::stream::{self, StreamExt};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx1, mut rx1) = mpsc::channel::<i32>(10);
let (tx2, mut rx2) = mpsc::channel::<i32>(10);
// Spawn producers
tokio::spawn(async move {
tx1.send(1).await.unwrap();
tx1.send(2).await.unwrap();
});
tokio::spawn(async move {
tx2.send(10).await.unwrap();
tx2.send(20).await.unwrap();
});
// Create streams from channels
let stream1 = async_stream::stream! {
while let Some(item) = rx1.recv().await {
yield item;
}
};
let stream2 = async_stream::stream! {
while let Some(item) = rx2.recv().await {
yield item;
}
};
let mut combined = stream::select(stream1, stream2);
let mut results = Vec::new();
while let Some(item) = combined.next().await {
results.push(item);
}
// Results contain items from both channels
assert_eq!(results.len(), 4);
}Combine channel receivers into a single stream.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Short stream ends first
let short = stream::iter(vec![1, 2]);
// Long stream continues after short ends
let long = stream::iter(vec![3, 4, 5, 6]);
let mut combined = stream::select(short, long);
let mut count = 0;
let mut from_short = 0;
let mut from_long = 0;
while let Some(item) = combined.next().await {
count += 1;
if item <= 2 {
from_short += 1;
} else {
from_long += 1;
}
}
// Short stream: 2 items
// Long stream: 4 items
// Combined: 6 total
assert_eq!(count, 6);
assert_eq!(from_short, 2);
assert_eq!(from_long, 4);
}select completes only after all streams complete.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Infinite stream
let infinite = stream::repeat(1);
// Finite stream that will end
let finite = stream::iter(vec!["a", "b", "c"]);
// Take only 10 items from combined stream
let mut combined = stream::select(infinite, finite)
.take(10);
let results: Vec<_> = combined.collect().await;
// Even though one stream is infinite, take(10) limits us
assert_eq!(results.len(), 10);
}Use combinators like take to limit infinite streams.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Fast-producing stream
let fast = stream::iter(0..1000);
// Slow-consuming process
let mut combined = stream::select(fast, stream::empty::<i32>());
// Process items slowly - backpressure is automatic
// select doesn't buffer; it yields one item at a time
while let Some(item) = combined.next().await {
println!("Processing: {}", item);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
}select provides one item at a time with no implicit buffering.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec![1, 2, 3, 4, 5]);
let stream2 = stream::iter(vec![10, 20, 30, 40, 50]);
// Use .buffered() to prefetch from streams
let mut combined = stream::select(stream1, stream2)
.buffered(10); // Buffer up to 10 items
while let Some(item) = combined.next().await {
println!("Got: {}", item);
}
}Add buffering for smoother processing when streams have latency.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec![1, 2, 3]);
let stream2 = stream::iter(vec![4, 5, 6]);
// select provides no ordering guarantees
let combined = stream::select(stream1, stream2);
// Items arrive as streams produce them
// No guarantee about which stream's items come first
// If you need ordered results from each stream:
let stream1 = stream::iter(vec![1, 2, 3]).map(|n| (1, n));
let stream2 = stream::iter(vec![4, 5, 6]).map(|n| (2, n));
let mut combined = stream::select(stream1, stream2);
let mut results: Vec<(i32, i32)> = Vec::new();
while let Some(item) = combined.next().await {
results.push(item);
}
// Tag items with source stream if order matters
for (stream_id, value) in results {
println!("From stream {}: {}", stream_id, value);
}
}select provides no ordering guarantees; tag items if source matters.
use futures::stream::{self, StreamExt, SelectAll};
#[tokio::main]
async fn main() {
let mut selector: SelectAll<_> = SelectAll::new();
// Add streams dynamically
selector.push(stream::iter(vec![1, 2, 3]));
selector.push(stream::iter(vec![4, 5, 6]));
// Can add more streams later
selector.push(stream::iter(vec![7, 8, 9]));
let results: Vec<_> = selector.collect().await;
assert_eq!(results.len(), 9);
}SelectAll allows dynamically adding streams with push.
use futures::stream::{self, StreamExt, SelectAll};
#[tokio::main]
async fn main() {
// SelectAll doesn't support removal directly
// Use a different pattern for dynamic addition/removal
// One approach: use a channel to signal stream completion
// and filter streams based on messages
let mut selector: SelectAll<_> = SelectAll::new();
selector.push(stream::iter(vec![1, 2, 3]));
selector.push(stream::iter(vec![4, 5, 6]));
// Streams complete naturally when exhausted
// SelectAll removes completed streams automatically
let results: Vec<_> = selector.collect().await;
assert_eq!(results.len(), 6);
}SelectAll automatically removes streams when they complete.
| Aspect | select | select_all | try_select |
|--------|----------|---------------|--------------|
| Number of streams | Exactly 2 | Any number | Exactly 2 |
| Error handling | Pass through | Pass through | Short-circuit |
| Dynamic addition | No | Yes (push) | No |
| Returns on error | Error as item | Error as item | Early return |
futures::stream::select merges streams with important semantics:
Completion behavior: select completes only when ALL input streams complete. If one stream ends, the others continue. This makes select suitable for scenarios where you want all available data processed regardless of which source finishes first.
Fairness: select doesn't starve slow streams. When multiple streams have items ready, select yields from each fairly. However, when one stream has items and another doesn't, select yields from the ready stream without waiting.
Error handling: Regular select treats errors as regular itemsāuse try_select when you need error short-circuiting. Errors from any stream are yielded, and processing continues.
Performance: select is zero-allocation when streams are ready and doesn't buffer. It polls streams in an interleaved fashion, making it efficient for combining I/O sources.
Key insight: select is the foundation for many async patternsācombining channel sources, merging event streams, handling heartbeat alongside application messages. When you need to process items from multiple sources as they become available, select provides the stream-level primitive. For more than two streams, use SelectAll which also supports dynamic stream addition.