Loading page…
Rust walkthroughs
Loading page…
tokio::select! and futures::select! for handling multiple concurrent operations?tokio::select! is tightly integrated with Tokio's async runtime, providing optimized branch selection, automatic cancellation of losing branches, and pattern matching on futures, while futures::select! is runtime-agnostic but requires more manual handling of cancellation and lacks some ergonomics like pattern matching. Tokio's macro evaluates all branches concurrently, cancels the non-winning futures automatically, and supports if conditions and pattern guards. The futures macro provides more explicit control but requires manual cancellation handling via Fuse and doesn't offer the same level of integration with runtime optimizations. Use tokio::select! when building Tokio applications; use futures::select! when writing runtime-independent code or when you need its specific semantics.
use tokio::select;
#[tokio::main]
async fn main() {
let mut task1 = Box::pin(async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
"task1"
});
let mut task2 = Box::pin(async {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
"task2"
});
let result = select! {
res = &mut task1 => res,
res = &mut task2 => res,
};
println!("First to complete: {}", result); // "task1"
}tokio::select! races multiple futures and returns when the first completes.
use futures::select;
use futures::FutureExt;
#[tokio::main]
async fn main() {
let mut task1 = Box::pin(async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
"task1"
});
let mut task2 = Box::pin(async {
tokio::time::Duration::from_millis(200).await;
"task2"
});
// futures::select! requires futures to be Fused
let mut task1 = task1.fuse();
let mut task2 = task2.fuse();
let result = select! {
res = task1 => res,
res = task2 => res,
};
println!("First to complete: {}", result); // "task1"
}futures::select! requires futures to implement FusedFuture, typically via .fuse().
use tokio::select;
#[tokio::main]
async fn main() {
let result = select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
println!("Timer completed");
"timeout"
}
_ = async {
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
println!("This never prints");
"slow"
} => "slow",
};
println!("Result: {}", result);
// The slow future is cancelled when timer wins
}tokio::select! automatically cancels all non-completed futures.
use futures::select;
use futures::FutureExt;
#[tokio::main]
async fn main() {
let mut fast = tokio::time::sleep(std::time::Duration::from_millis(100)).fuse();
let mut slow = async {
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
"slow"
}.fuse();
let result = select! {
_ = fast => {
println!("Timer completed");
"timeout"
}
res = slow => res,
};
// Note: The slow future is NOT cancelled!
// It continues to run unless explicitly dropped
println!("Result: {}", result);
}futures::select! does NOT cancel other futures by default.
use tokio::select;
#[tokio::main]
async fn main() {
// Tokio select! doesn't require FusedFuture
let future = async { "hello" };
let result = select! {
res = future => res,
};
println!("{}", result); // "hello"
// Can't use future again - it's moved/consumed
}tokio::select! takes futures by value and doesn't require FusedFuture.
use futures::select;
use futures::FutureExt;
#[tokio::main]
async fn main() {
// futures::select! requires FusedFuture
let future = async { "hello" }.fuse(); // Must call .fuse()
let result = select! {
res = future => res,
};
println!("{}", result); // "hello"
// The fused future can be checked for completion
}futures::select! requires all branches to implement FusedFuture.
use tokio::select;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Result<String, String>>(32);
// Close channel to demonstrate pattern matching
drop(tx);
let result = select! {
Some(Ok(msg)) = rx.recv() => {
println!("Received: {}", msg);
msg
}
Some(Err(e)) = rx.recv() => {
println!("Error: {}", e);
e
}
None = rx.recv() => {
println!("Channel closed");
"closed".to_string()
}
};
println!("Result: {}", result);
}tokio::select! supports pattern matching on the future's output.
use futures::select;
use futures::FutureExt;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Result<String, String>>(32);
drop(tx);
// futures::select! doesn't support pattern matching
// You receive the whole result and match afterwards
let receive = rx.recv().fuse();
let result = select! {
res = receive => {
match res {
Some(Ok(msg)) => {
println!("Received: {}", msg);
msg
}
Some(Err(e)) => {
println!("Error: {}", e);
e
}
None => {
println!("Channel closed");
"closed".to_string()
}
}
}
};
println!("Result: {}", result);
}futures::select! requires manual pattern matching after receiving the result.
use tokio::select;
#[tokio::main]
async fn main() {
let future = tokio::time::sleep(std::time::Duration::from_secs(10));
let result = select! {
_ = future => {
"completed"
}
else => {
"no futures ready"
}
};
println!("Result: {}", result); // "completed"
}tokio::select! uses else for the default case when no future is ready.
use tokio::select;
#[tokio::main]
async fn main() {
let result = select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
"slept"
}
else => {
"no futures ready immediately"
}
};
// This would need to be used with something that might not be ready
// The 'else' branch handles case where all polled futures return Pending
println!("Result: {}", result);
}The else branch is for when no future completes immediately (though in practice all async code will yield Pending or complete).
use futures::select;
use futures::FutureExt;
#[tokio::main]
async fn main() {
let mut future = tokio::time::sleep(std::time::Duration::from_millis(100)).fuse();
// futures::select! uses 'default =>' for non-blocking case
let result = select! {
_ = future => {
"completed"
}
default => {
"no futures ready"
}
};
println!("Result: {}", result);
}futures::select! uses default => for the case when no future is ready.
use tokio::select;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let should_receive = true;
// Only include this branch if condition is true
let result = select! {
Some(msg) = rx.recv(), if should_receive => {
format!("Received: {}", msg)
}
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
"timeout".to_string()
}
};
println!("Result: {}", result);
}tokio::select! supports if guards on branches.
use tokio::select;
#[tokio::main]
async fn main() {
let option_future = Some(async { "some value" });
// Conditional inclusion based on Option
let result = select! {
res = async { "always present" } => res,
// If option_future is None, this branch is skipped
res = option_future.unwrap(), if option_future.is_some() => res,
else => "nothing",
};
println!("Result: {}", result);
}Guards can enable/disable branches dynamically.
use futures::select;
use futures::FutureExt;
#[tokio::main]
async fn main() {
// futures::select! doesn't have built-in if guards
// You need to handle conditions differently
let should_check = true;
let mut check = async { "checked" }.fuse();
let mut skip = async { "skipped" }.fuse();
let result = if should_check {
select! {
res = check => res,
res = skip => res,
}
} else {
"condition was false".to_string()
};
println!("Result: {}", result);
}futures::select! requires manual conditional logic outside the macro.
use tokio::select;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
// Send some messages
tx.send("hello").await.unwrap();
tx.send("world").await.unwrap();
drop(tx);
loop {
select! {
Some(msg) = rx.recv() => {
println!("Got: {}", msg);
}
else => {
println!("Channel closed");
break;
}
}
}
}tokio::select! works naturally in loops; the else branch handles channel closure.
use futures::select;
use futures::FutureExt;
use futures::StreamExt;
#[tokio::main]
async fn main() {
use tokio_stream::wrappers::ReceiverStream;
let (tx, rx) = tokio::sync::mpsc::channel(32);
let mut stream = ReceiverStream::new(rx).fuse();
tx.send("hello").await.unwrap();
tx.send("world").await.unwrap();
drop(tx);
loop {
select! {
msg = stream.next() => {
match msg {
Some(m) => println!("Got: {}", m),
None => {
println!("Stream ended");
break;
}
}
}
}
}
}futures::select! in loops requires handling the fused stream correctly.
use tokio::select;
#[tokio::main]
async fn main() {
// By default, tokio::select! is fair
// It randomly picks among ready futures
let future1 = async { 1 };
let future2 = async { 2 };
// Both futures are ready immediately
// The winner is chosen pseudo-randomly
let result = select! {
res = future1 => res,
res = future2 => res,
};
println!("Result: {}", result); // Could be 1 or 2
}tokio::select! randomly selects among ready futures for fairness.
use tokio::select;
#[tokio::main]
async fn main() {
// biased mode checks branches in order
let mut count1 = 0;
let mut count2 = 0;
for _ in 0..100 {
let future1 = async { 1 };
let future2 = async { 2 };
select! {
biased;
res = future1 => { count1 += 1; res }
res = future2 => { count2 += 1; res }
}
}
println!("Future 1 won: {} times", count1); // Always 100
println!("Future 2 won: {} times", count2); // Always 0
}biased; mode checks branches in declaration order, prioritizing earlier branches.
use futures::select;
use futures::FutureExt;
#[tokio::main]
async fn main() {
// futures::select! checks branches in order by default
let future1 = async { 1 }.fuse();
let future2 = async { 2 }.fuse();
let result = select! {
res = future1 => res,
res = future2 => res,
};
// If both are ready, future1 wins
println!("Result: {}", result); // Always 1
}futures::select! checks branches in order (first branch wins if multiple ready).
use tokio::select;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
// Start receiving
tokio::spawn(async move {
loop {
select! {
Some(msg) = rx.recv() => {
println!("Received: {}", msg);
}
else => break,
}
}
});
// Cancel-safety: if select! is cancelled during recv(),
// the message is not lost - recv() is cancel-safe
// The message remains in the channel
}Tokio's primitives are designed for cancel-safety with select!.
use futures::select;
use futures::FutureExt;
#[tokio::main]
async fn main() {
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel(32);
let mut recv_future = rx.recv().fuse();
// Manual cancel-safety handling needed
let result = select! {
res = recv_future => res,
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
// If we cancel here, the recv is still pending
// The message might be in flight
None
}
};
}futures::select! requires more careful handling for cancel-safety.
use tokio::select;
#[tokio::main]
async fn main() {
let result = select! {
res = async { "first" } => format!("Got: {}", res),
res = async { "second" } => format!("Got: {}", res),
res = async { "third" } => format!("Got: {}", res),
res = async { "fourth" } => format!("Got: {}", res),
res = async { "fifth" } => format!("Got: {}", res),
};
println!("{}", result); // "Got: first" (or any other)
}tokio::select! supports any number of branches (practically).
use futures::select;
use futures::FutureExt;
#[tokio::main]
async fn main() {
let result = select! {
res = async { "first" }.fuse() => format!("Got: {}", res),
res = async { "second" }.fuse() => format!("Got: {}", res),
res = async { "third" }.fuse() => format!("Got: {}", res),
res = async { "fourth" }.fuse() => format!("Got: {}", res),
res = async { "fifth" }.fuse() => format!("Got: {}", res),
};
println!("{}", result);
}futures::select! also supports multiple branches, each needing .fuse().
use tokio::select;
#[tokio::main]
async fn main() {
let mut channel = async { 42 };
// Can borrow references
let result = select! {
res = &mut channel => res,
};
println!("Result: {}", result);
// Future is still available if borrowed
}tokio::select! can work with borrowed futures using &mut.
use futures::select;
use futures::FutureExt;
#[tokio::main]
async fn main() {
let mut future = async { 42 };
// futures::select! requires ownership or special handling
let mut future = future.fuse();
let result = select! {
res = future => res,
};
println!("Result: {}", result);
}futures::select! consumes futures; they must be fused first.
use tokio::select;
#[tokio::main]
async fn main() {
// Works seamlessly with Tokio types
let result = select! {
res = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
"timer"
}
res = tokio::fs::read_to_string("test.txt") => {
match res {
Ok(content) => content,
Err(_) => "error".to_string(),
}
}
res = tokio::net::TcpListener::bind("127.0.0.1:0").accept() => {
"connection"
}
};
println!("Result: {}", result);
}tokio::select! is designed for Tokio's async primitives.
use futures::select;
use futures::FutureExt;
#[tokio::main]
async fn main() {
// Works with any futures, but needs explicit fuse
let mut timer = tokio::time::sleep(std::time::Duration::from_millis(100)).fuse();
let mut reader = async {
tokio::fs::read_to_string("test.txt").await
}.fuse();
let result = select! {
_ = timer => "timer",
res = reader => {
match res {
Ok(content) => content,
Err(_) => "error".to_string(),
}
}
};
println!("Result: {}", result);
}futures::select! works with any future but requires .fuse() everywhere.
| Feature | tokio::select! | futures::select! |
|---------|-----------------|-------------------|
| FusedFuture requirement | Not required | Required (.fuse()) |
| Automatic cancellation | Yes (losing branches cancelled) | No (manual handling) |
| Pattern matching | Yes (on results) | No (receive whole result) |
| Default case | else => | default => |
| Conditional branches | if guards | Manual (outside macro) |
| Fairness | Random (biased mode available) | Declaration order |
| Runtime integration | Tokio-optimized | Runtime-agnostic |
| Cancel-safety | Built-in for Tokio types | Manual handling |
The two macros serve overlapping but distinct purposes:
tokio::select! for Tokio applications: It's deeply integrated with Tokio's runtime semantics, automatically cancelling losing branches, supporting pattern matching for ergonomics, and working seamlessly with Tokio's cancel-safe primitives. The random fairness among ready futures prevents starvation, while biased mode enables priority-based selection. Pattern matching on results like Some(msg) = rx.recv() reduces boilerplate significantly.
futures::select! for runtime independence: It's designed to work with any futures implementation, requiring FusedFuture to handle repeated polling safely. The lack of automatic cancellation means futures continue running unless you drop them, giving explicit control but requiring more careful reasoning. The declaration-order fairness is predictable but can lead to starvation if early branches are always ready.
Key insight: tokio::select! is about ergonomics and integration—it handles cancellation, provides pattern matching, and works with Tokio's cancel-safe design. futures::select! is about control and portability—it works with any future but requires you to understand and manage the semantics yourself. In Tokio applications, prefer tokio::select! for its ergonomics and cancel-safety guarantees. Use futures::select! when writing runtime-agnostic code or when its explicit control model matches your needs.
The cancellation behavior is the most critical difference: tokio::select! drops non-winning futures, triggering their cancellation. futures::select! leaves all futures in place, letting you decide whether to continue polling them. For cancel-safe code like message processing, Tokio's approach is usually what you want. For state machines where you need explicit control over future lifetimes, the futures approach provides that flexibility.