Loading pageā¦
Rust walkthroughs
Loading pageā¦
futures::stream::FuturesUnordered differ from Vec<JoinHandle> for managing concurrent futures?futures::stream::FuturesUnordered is a collection that streams future results in completion order as they become ready, while Vec<JoinHandle> is a collection of spawned task handles that must be awaited individually or with join_all. The fundamental difference is how results are observed: FuturesUnordered acts as a stream yielding results as futures complete, allowing incremental processing, while JoinHandle represents already-spawned tasks where you await final results without observing intermediate completion order. Use FuturesUnordered when you need to process results as they complete or when futures have different execution times. Use Vec<JoinHandle> when tasks are independent and you just need all results eventually.
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
let handles: Vec<JoinHandle<u32>> = vec![
tokio::spawn(async { 1 }),
tokio::spawn(async { 2 }),
tokio::spawn(async { 3 }),
];
// Must await each handle to get results
// Order is preserved: first handle, second handle, etc.
let results: Vec<u32> = futures::future::join_all(handles)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
println!("{:?}", results); // [1, 2, 3]
}JoinHandle represents a task already spawned on the runtime. You await the handle to get the result.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
let mut futures = FuturesUnordered::new();
futures.push(async { 1 });
futures.push(async { 2 });
futures.push(async { 3 });
// Results come as they complete
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
println!("{:?}", results); // Order depends on completion order
}FuturesUnordered holds futures and yields results as they complete.
use tokio::task::JoinHandle;
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
// JoinHandle: results in collection order
let handles: Vec<JoinHandle<u32>> = vec![
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1
}),
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2
}),
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3
}),
];
let results: Vec<_> = futures::future::join_all(handles).await;
// Results still in original order: [Ok(1), Ok(2), Ok(3)]
// Even though task 2 completed first
println!("JoinHandle results: {:?}", results);
}join_all preserves insertion order regardless of completion order.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
// FuturesUnordered: results in completion order
let mut futures = FuturesUnordered::new();
futures.push(async {
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1
});
futures.push(async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2
});
futures.push(async {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
3
});
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
println!("Got result: {}", result);
}
// Results in completion order: [2, 3, 1]
println!("FuturesUnordered results: {:?}", results);
}FuturesUnordered yields results as they complete.
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
// JoinHandle: tasks start immediately upon spawn
let handles: Vec<JoinHandle<u32>> = (0..5)
.map(|i| tokio::spawn(async move {
println!("Task {} running", i);
i
}))
.collect();
// Tasks are already running, just collecting results
for handle in handles {
let result = handle.await.unwrap();
println!("Result: {}", result);
}
}JoinHandle tasks start executing when spawned, before you await them.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
// FuturesUnordered: futures start when polled
let mut futures = FuturesUnordered::new();
for i in 0..5 {
futures.push(async move {
println!("Task {} running", i);
i
});
}
// Futures start executing when we poll the stream
while let Some(result) = futures.next().await {
println!("Result: {}", result);
}
}FuturesUnordered futures execute when the stream is polled.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
let mut futures = FuturesUnordered::new();
// Start with initial futures
futures.push(async { 1 });
futures.push(async { 2 });
// Can add more while processing
let mut count = 0;
while let Some(result) = futures.next().await {
println!("Got: {}", result);
count += 1;
if count < 5 {
// Add new futures dynamically
futures.push(async move { count + 10 });
}
}
// FuturesUnordered allows dynamic additions
}FuturesUnordered supports adding futures while iterating.
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
// Vec<JoinHandle>: fixed collection after creation
let handles: Vec<JoinHandle<u32>> = vec![
tokio::spawn(async { 1 }),
tokio::spawn(async { 2 }),
];
// Can push to the vec, but need to handle separately
let extra_handle = tokio::spawn(async { 3 });
// Must manage new handles separately from original collection
let mut all_handles = handles;
all_handles.push(extra_handle);
// Then collect all
let results: Vec<_> = futures::future::join_all(all_handles).await;
println!("{:?}", results);
}Vec<JoinHandle> is fixed after creation; adding requires managing a separate collection.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
let mut futures = FuturesUnordered::new();
for i in 0..10 {
futures.push(async move {
tokio::time::sleep(std::time::Duration::from_millis(i * 50)).await;
i
});
}
// Process results as they complete
let mut completed = 0;
while let Some(result) = futures.next().await {
println!("Completed: {} (total done: {})", result, completed);
completed += 1;
// Can take action immediately upon completion
if result == 5 {
println!("Found target, continuing with remaining...");
}
}
}FuturesUnordered enables immediate reaction to each completion.
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
let handles: Vec<JoinHandle<u32>> = (0..10)
.map(|i| tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(i * 50)).await;
i
}))
.collect();
// join_all waits for ALL to complete before returning
let results = futures::future::join_all(handles).await;
// All results available at once, but no incremental processing
for result in results {
println!("Result: {:?}", result);
}
}join_all returns all results at once, blocking until completion.
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
let handles: Vec<JoinHandle<u32>> = (0..5)
.map(|i| tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(i)).await;
i
}))
.collect();
// Can abort individual handles
for (i, handle) in handles.iter().enumerate() {
if i % 2 == 0 {
handle.abort();
}
}
let results: Vec<_> = futures::future::join_all(handles).await;
for (i, result) in results.iter().enumerate() {
match result {
Ok(v) => println!("Task {}: {}", i, v),
Err(e) => println!("Task {} was cancelled: {}", i, e),
}
}
}JoinHandle::abort() cancels individual tasks.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
let mut futures = FuturesUnordered::new();
for i in 0..5 {
futures.push(async move {
tokio::time::sleep(std::time::Duration::from_secs(i)).await;
i
});
}
// Clear cancels all remaining futures
futures.clear();
// Or drop the collection entirely
drop(futures);
// All contained futures are cancelled
println!("All futures cancelled");
}Dropping FuturesUnordered cancels all contained futures.
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
let handles: Vec<JoinHandle<Result<u32, String>>> = vec![
tokio::spawn(async { Ok(1) }),
tokio::spawn(async { Err("failed".to_string()) }),
tokio::spawn(async { Ok(3) }),
];
let results: Vec<_> = futures::future::join_all(handles).await;
// Each result is Result<Result<u32, String>, JoinError>
for result in results {
match result {
Ok(Ok(v)) => println!("Success: {}", v),
Ok(Err(e)) => println!("Task error: {}", e),
Err(e) => println!("Join error: {}", e),
}
}
}JoinHandle results wrap task results in Result<Result<T, E>, JoinError>.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
let mut futures = FuturesUnordered::new();
futures.push(async { Ok::<_, String>(1) });
futures.push(async { Err::<u32, String>("failed".to_string()) });
futures.push(async { Ok(3) });
while let Some(result) = futures.next().await {
match result {
Ok(v) => println!("Success: {}", v),
Err(e) => println!("Error: {}", e),
}
}
}FuturesUnordered yields the future's output directly.
use tokio::task::JoinHandle;
use std::mem;
#[tokio::main]
async fn main() {
// JoinHandle: small overhead per task
// Handle is just a reference to the spawned task
let handles: Vec<JoinHandle<u32>> = (0..10000)
.map(|i| tokio::spawn(async move { i }))
.collect();
println!("JoinHandle size: {}", mem::size_of::<JoinHandle<u32>>());
// Tasks run on the runtime, not in the Vec
let results: Vec<_> = futures::future::join_all(handles).await;
}JoinHandle is a small handle; the task runs on the runtime.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
// FuturesUnordered: owns the futures
let mut futures = FuturesUnordered::new();
for i in 0..10000 {
futures.push(async move { i });
}
// Futures are stored in the collection
// Memory includes all future states
// But can process and release incrementally
let mut count = 0;
while let Some(_) = futures.next().await {
count += 1;
}
}FuturesUnordered stores future states directly.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
let mut futures = FuturesUnordered::new();
// Buffer up to 5 concurrent operations
let urls = vec!["url1", "url2", "url3", "url4", "url5", "url6", "url7"];
for url in urls.into_iter().take(5) {
futures.push(async move {
fetch(url).await
});
}
let mut completed = 0;
while let Some(result) = futures.next().await {
completed += 1;
println!("Completed {} requests", completed);
// Add more if we have remaining
// This maintains a window of concurrent operations
}
}
async fn fetch(url: &str) -> String {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
format!("Response from {}", url)
}FuturesUnordered can implement sliding window concurrency.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
let mut futures = FuturesUnordered::new();
futures.push(async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
"fast".to_string()
});
futures.push(async {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
"slow".to_string()
});
// Get first result (race semantics)
if let Some(first) = futures.next().await {
println!("First to complete: {}", first);
// Cancel remaining futures
futures.clear();
println!("Cancelled remaining futures");
}
}FuturesUnordered naturally implements "first wins" patterns.
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
let handles: Vec<JoinHandle<&str>> = vec![
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
"fast"
}),
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
"slow"
}),
];
// select! or race for first result
tokio::select! {
result = handles[0].clone() => {
println!("Handle 0: {:?}", result);
}
result = handles[1].clone() => {
println!("Handle 1: {:?}", result);
}
}
// Need to abort remaining handles
for handle in handles {
handle.abort();
}
}With JoinHandle, use select! for racing; manual abort needed.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
let futures = FuturesUnordered::from_iter((0..5).map(|i| async move { i * 2 }));
// Collect all results
let results: Vec<i32> = futures.collect().await;
println!("{:?}", results);
}FuturesUnordered can be collected into containers.
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
let handles: Vec<JoinHandle<i32>> = (0..5)
.map(|i| tokio::spawn(async move { i * 2 }))
.collect();
let results: Vec<i32> = futures::future::join_all(handles)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
println!("{:?}", results);
}JoinHandle requires join_all and unwrapping.
use futures::stream::{FuturesUnordered, StreamExt};
#[tokio::main]
async fn main() {
// FuturesUnordered can pre-allocate capacity
let mut futures = FuturesUnordered::with_capacity(100);
for i in 0..100 {
futures.push(async move { i });
}
// Iteration is efficient as completed futures are removed
while let Some(_) = futures.next().await {}
}FuturesUnordered supports capacity pre-allocation.
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
// Vec pre-allocation
let mut handles: Vec<JoinHandle<u32>> = Vec::with_capacity(100);
for i in 0..100 {
handles.push(tokio::spawn(async move { i }));
}
let results: Vec<_> = futures::future::join_all(handles).await;
}Vec<JoinHandle> uses standard Vec capacity.
use futures::stream::{FuturesUnordered, StreamExt};
use std::collections::VecDeque;
#[tokio::main]
async fn main() {
let mut pending = VecDeque::new();
pending.push_back("https://example.com".to_string());
let mut in_progress = FuturesUnordered::new();
let max_concurrent = 3;
let mut visited = 0;
while !pending.is_empty() || !in_progress.is_empty() {
// Fill up to max_concurrent
while in_progress.len() < max_concurrent {
if let Some(url) = pending.pop_front() {
in_progress.push(async move {
crawl(&url).await
});
} else {
break;
}
}
// Wait for any to complete
if let Some(result) = in_progress.next().await {
visited += 1;
println!("Visited {} pages", visited);
println!("Result: {:?}", result);
// Could add more URLs to pending here
}
}
}
async fn crawl(url: &str) -> String {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
format!("Crawled: {}", url)
}FuturesUnordered enables dynamic concurrency control.
| Aspect | FuturesUnordered | Vec |
|--------|-----------------|-----------------|
| Execution model | Polled on demand | Spawned immediately |
| Result order | Completion order | Collection order |
| Dynamic addition | Yes, while iterating | Requires separate management |
| Cancellation | clear() or drop | Individual abort() |
| Use case | Stream processing, racing | Batch operations, fire-and-forget |
| Result access | Stream (next().await) | join_all() or individual await |
| Memory | Owns future states | Small handles to runtime tasks |
FuturesUnordered and Vec<JoinHandle> represent two different approaches to concurrent execution:
FuturesUnordered is a stream of results: It holds futures and yields their outputs as they complete. The collection is the driverāyou poll it to make progress. This makes it ideal for:
Vec is a collection of running tasks: Tasks start executing immediately when spawned, and the handles are references to runtime-managed tasks. This makes it ideal for:
join_all to wait for everything at onceKey insight: FuturesUnordered is activeāyou're driving execution through polling. JoinHandle is passiveātasks run on the runtime independently, and you just await results. This fundamental difference determines which to use: if you need fine-grained control over when futures start and how results flow, FuturesUnordered gives you a stream. If you just need to spawn tasks and collect results eventually, JoinHandle with join_all is simpler.
The choice is about control flow: do you want to be notified as each future completes (stream model), or do you just need all results at the end (batch model)?