Loading pageā¦
Rust walkthroughs
Loading pageā¦
futures::stream::SelectAll and select for concurrent stream handling?futures::stream::SelectAll merges multiple streams into a single stream that yields items from any of its constituent streams as they become available, continuing until all streams are exhausted, while select races multiple futures or streams and returns the first one that completes, cancelling the others. The key distinction is persistence: SelectAll maintains an ongoing collection of streams and produces a combined stream that runs all of them to completion, whereas select is a one-time operation that determines which of several competing operations finishes first. For streams specifically, select can be used in a loop to process items as they arrive, but it requires manual management of the remaining streams and doesn't handle the case where multiple streams have items ready simultaneously. SelectAll handles this automatically, managing the collection of streams and yielding items in the order they become available across all streams.
use futures::future::{self, select};
use std::time::Duration;
#[tokio::main]
async fn main() {
// select races futures, returns first to complete
let fut1 = async {
tokio::time::sleep(Duration::from_millis(100)).await;
"fast"
};
let fut2 = async {
tokio::time::sleep(Duration::from_millis(200)).await;
"slow"
};
// Race: first to complete wins
let result = select(fut1, fut2).await;
match result {
future::Either::Left((value, _remaining)) => {
println!("First future won: {}", value);
}
future::Either::Right((value, _remaining)) => {
println!("Second future won: {}", value);
}
}
}select races futures and returns the winner, discarding the loser.
use futures::stream::{self, StreamExt};
use futures::future::select;
use std::time::Duration;
#[tokio::main]
async fn main() {
let mut stream1 = stream::iter(vec
![1, 2, 3]).throttle(Duration::from_millis(100));
let mut stream2 = stream::iter(vec
![4, 5, 6]).throttle(Duration::from_millis(150));
// Manual loop with select on next() futures
loop {
let fut1 = stream1.next();
let fut2 = stream2.next();
match select(fut1, fut2).await {
future::Either::Left((Some(val), _)) => {
println!("From stream1: {}", val);
}
future::Either::Right((Some(val), _)) => {
println!("From stream2: {}", val);
}
_ => break, // Both streams exhausted
}
}
}select with streams requires manual looping and doesn't handle stream management.
use futures::stream::{self, StreamExt, SelectAll};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec
![1, 2, 3]);
let stream2 = stream::iter(vec
![4, 5, 6]);
let stream3 = stream::iter(vec
![7, 8, 9]);
// Merge all streams into one
let mut merged: SelectAll<_> = SelectAll::new();
merged.push(stream1);
merged.push(stream2);
merged.push(stream3);
// Items arrive as they become available
while let Some(value) = merged.next().await {
println!("Received: {}", value);
}
// All items from all streams are yielded
// Order depends on which stream yields first
}SelectAll merges streams and yields items from all of them.
use futures::stream::{self, StreamExt, SelectAll};
#[tokio::main]
async fn main() {
// Method 1: new() + push()
let mut merged1: SelectAll<_> = SelectAll::new();
merged1.push(stream::iter(vec
![1, 2]));
merged1.push(stream::iter(vec
![3, 4]));
// Method 2: from iterator
let streams = [
stream::iter(vec
![1, 2]),
stream::iter(vec
![3, 4]),
stream::iter(vec
![5, 6]),
];
let mut merged2: SelectAll<_> = streams.into_iter().collect();
// Both methods produce equivalent behavior
println!("Merged stream ready");
}SelectAll can be constructed incrementally with push or from an iterator.
use futures::future::{self, select};
use futures::stream::{self, StreamExt, SelectAll};
use std::time::Duration;
#[tokio::main]
async fn main() {
// select: Loser futures are DROPPED
println!("--- select behavior ---");
let fut1 = async {
println!("Future 1 started");
tokio::time::sleep(Duration::from_millis(50)).await;
println!("Future 1 completed");
1
};
let fut2 = async {
println!("Future 2 started");
tokio::time::sleep(Duration::from_millis(200)).await;
println!("Future 2 completed"); // Never printed!
2
};
let result = select(fut1, fut2).await;
println!("select result: {:?}", result);
// SelectAll: ALL streams run to completion
println!("--- SelectAll behavior ---");
let stream1 = stream::iter(vec
![10, 20, 30])
.then(|x| async move {
println!("Stream1 yielding {}", x);
x
});
let stream2 = stream::iter(vec
![100, 200, 300])
.then(|x| async move {
println!("Stream2 yielding {}", x);
x
});
let mut merged: SelectAll<_> = SelectAll::new();
merged.push(stream1);
merged.push(stream2);
// All items from both streams will be yielded
while let Some(val) = merged.next().await {
println!("Got: {}", val);
}
}select cancels losers; SelectAll runs all streams to completion.
use futures::stream::{StreamExt, SelectAll};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx1, rx1) = mpsc::channel::<i32>(10);
let (tx2, rx2) = mpsc::channel::<i32>(10);
let mut merged: SelectAll<_> = SelectAll::new();
merged.push(rx1);
merged.push(rx2);
// Spawn tasks that send to channels
tokio::spawn(async move {
tx1.send(1).await.unwrap();
tx1.send(2).await.unwrap();
});
tokio::spawn(async move {
tx2.send(10).await.unwrap();
tx2.send(20).await.unwrap();
});
// Process items as they arrive from any stream
let mut count = 0;
while let Some(value) = merged.next().await {
println!("Received: {}", value);
count += 1;
if count >= 4 {
break;
}
}
}SelectAll supports dynamic stream addition during its lifetime.
use futures::stream::{self, StreamExt, SelectAll};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec
![1, 2, 3]);
let stream2 = stream::iter(vec
![4, 5, 6]);
let mut merged: SelectAll<_> = SelectAll::new();
merged.push(stream1);
merged.push(stream2);
// When multiple streams have items ready,
// SelectAll yields from one (implementation-defined order)
// but processes all items eventually
let mut items = Vec::new();
while let Some(val) = merged.next().await {
items.push(val);
}
println!("All items: {:?}", items);
// Contains all 6 items, order may vary
}SelectAll handles multiple ready streams automatically.
use tokio::select;
use std::time::Duration;
#[tokio::main]
async fn main() {
// tokio::select! is similar to futures::future::select
// but with ergonomic syntax for multiple branches
let mut interval1 = tokio::time::interval(Duration::from_millis(100));
let mut interval2 = tokio::time::interval(Duration::from_millis(150));
let mut count = 0;
loop {
select! {
_ = interval1.tick() => {
println!("Tick from interval1");
}
_ = interval2.tick() => {
println!("Tick from interval2");
}
_ = tokio::time::sleep(Duration::from_millis(500)) => {
println!("Timeout, breaking");
break;
}
}
count += 1;
if count >= 5 {
break;
}
}
}select! provides ergonomic syntax for racing multiple operations.
use futures::stream::{self, StreamExt, SelectAll};
use futures::future::select;
#[tokio::main]
async fn main() {
// Approach 1: select with manual loop (complex)
println!("--- Manual select loop ---");
let mut s1 = stream::iter(vec
![1, 2, 3]);
let mut s2 = stream::iter(vec
![4, 5, 6]);
loop {
let f1 = s1.next();
let f2 = s2.next();
match select(f1, f2).await {
future::Either::Left((Some(v), _)) => println!("s1: {}", v),
future::Either::Right((Some(v), _)) => println!("s2: {}", v),
future::Either::Left((None, _)) |
future::Either::Right((None, _)) => break,
}
}
// Approach 2: SelectAll (simpler)
println!("--- SelectAll ---");
let s1 = stream::iter(vec
![1, 2, 3]);
let s2 = stream::iter(vec
![4, 5, 6]);
let mut merged: SelectAll<_> = [s1, s2].into_iter().collect();
while let Some(v) = merged.next().await {
println!("merged: {}", v);
}
}SelectAll simplifies concurrent stream processing significantly.
use futures::stream::{StreamExt, SelectAll};
#[tokio::main]
async fn main() {
// Empty SelectAll yields nothing immediately
let mut empty: SelectAll<i32> = SelectAll::new();
// next() returns None immediately
let result = empty.next().await;
println!("Empty SelectAll: {:?}", result); // None
// Adding streams makes it productive
let mut merged: SelectAll<_> = SelectAll::new();
let items: Vec<i32> = merged.collect().await;
println!("Collected from empty: {:?}", items); // []
}SelectAll with no streams completes immediately with no items.
use futures::stream::{self, StreamExt, SelectAll};
#[tokio::main]
async fn main() {
let stream1 = stream::iter(vec
![1, 2, 3]);
let stream2 = stream::iter(vec
![4, 5]);
let stream3 = stream::iter(vec
![6, 7, 8, 9]);
let mut merged: SelectAll<_> = SelectAll::new();
merged.push(stream1); // 3 items
merged.push(stream2); // 2 items
merged.push(stream3); // 4 items
// Process until all streams exhausted
let mut count = 0;
while let Some(_) = merged.next().await {
count += 1;
}
println!("Total items: {}", count); // 9
}SelectAll yields None only when all constituent streams are exhausted.
use futures::stream::{StreamExt, SelectAll};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx1, rx1) = mpsc::channel::<i32>(2); // Buffer size 2
let (tx2, rx2) = mpsc::channel::<i32>(2);
let mut merged: SelectAll<_> = SelectAll::new();
merged.push(rx1);
merged.push(rx2);
// Producers
let producer1 = tokio::spawn(async move {
for i in 0..5 {
if tx1.send(i).await.is_err() {
println!("Channel 1 closed");
break;
}
println!("Sent {} to channel 1", i);
}
});
let producer2 = tokio::spawn(async move {
for i in 10..15 {
if tx2.send(i).await.is_err() {
println!("Channel 2 closed");
break;
}
println!("Sent {} to channel 2", i);
}
});
// Consumer
while let Some(val) = merged.next().await {
println!("Received: {}", val);
}
}SelectAll respects backpressure from constituent streams.
use futures::stream::{self, StreamExt, SelectAll};
#[tokio::main]
async fn main() {
type Result<T> = std::result::Result<T, &'static str>;
let stream1 = stream::iter(vec
![Ok(1), Ok(2)]);
let stream2 = stream::iter(vec
![Ok(10), Err("error in stream2"), Ok(30)]);
let mut merged: SelectAll<_> = SelectAll::new();
merged.push(stream1);
merged.push(stream2);
// Errors are propagated as values
while let Some(result) = merged.next().await {
match result {
Ok(val) => println!("Value: {}", val),
Err(e) => println!("Error: {}", e),
}
}
}SelectAll yields values as-is, including errors.
use futures::stream::{self, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
// futures::stream::futures_unordered is similar to SelectAll
// but for futures (not streams)
let futures = vec
![
async {
tokio::time::sleep(Duration::from_millis(100)).await;
1
},
async {
tokio::time::sleep(Duration::from_millis(50)).await;
2
},
async {
tokio::time::sleep(Duration::from_millis(150)).await;
3
},
];
// Collect results as futures complete
let mut stream = stream::iter(futures).buffer_unordered(3);
while let Some(result) = stream.next().await {
println!("Future completed: {}", result);
}
}futures_unordered and buffer_unordered apply similar concepts to futures.
Comparison table:
| Feature | select | SelectAll |
|---------|----------|-------------|
| Input | 2 futures/streams | Multiple streams |
| Output | First to complete | All items from all streams |
| Cancellation | Drops losers | Runs all to completion |
| Reusability | One-shot | Ongoing stream |
| Stream support | Manual loop needed | Built-in |
| Dynamic addition | No | Yes (with push) |
When to use each:
| Scenario | Recommendation |
|----------|----------------|
| Race multiple futures, take first winner | select |
| Process items from multiple streams | SelectAll |
| Timeout pattern (cancel after duration) | select |
| Merge concurrent data sources | SelectAll |
| One-shot decision between operations | select |
| Event handling from multiple sources | SelectAll |
Key insight: select and SelectAll serve fundamentally different purposes. select is a racing operation: given multiple futures, determine which completes first and cancel the rest. This is ideal for timeouts (race an operation against a timer), cancellation (race an operation against a cancellation signal), or choosing between alternatives (race multiple lookup strategies and use whichever finishes first). SelectAll is a merging operation: given multiple streams, yield items from all of them as they become available. This is ideal for event aggregation (combine events from multiple sources), request fan-in (merge responses from multiple services), or any scenario where you want to process items from multiple concurrent producers. The "select" in SelectAll refers to the internal mechanismāselecting which stream to poll next based on readinessānot to the cancellation semantics of future::select. When choosing between them, ask: should the slower operation be cancelled (select), or should all operations run to completion (SelectAll)?