Loading page…
Rust walkthroughs
Loading page…
Futures is the core async runtime foundation crate for Rust. It defines the Future trait, essential combinators, and utilities for working with asynchronous computations. While Tokio or async-std provide the runtime, the futures crate provides the tools to compose and manipulate futures.
Key concepts:
map, then, join, selectWhen to use Futures:
When NOT to use Futures:
use std::future::Future;
use std::pin::Pin;
// Simple async function
async fn say_hello() {
println!("Hello, async world!");
}
// Async function returning value
async fn compute() -> i32 {
42
}
#[tokio::main]
async fn main() {
say_hello().await;
let result = compute().await;
println!("Result: {}", result);
}use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct Delay {
millis: u64,
elapsed: bool,
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.elapsed {
Poll::Ready(())
} else {
// Schedule wake-up
let waker = cx.waker().clone();
let millis = self.millis;
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(millis));
waker.wake();
});
self.elapsed = true;
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
println!("Starting delay...");
Delay { millis: 1000, elapsed: false }.await;
println!("Delay complete!");
}use futures::join;
async fn fetch_data(id: u32) -> String {
// Simulate async work
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
format!("Data {}", id)
}
#[tokio::main]
async fn main() {
// Run futures concurrently, wait for all
let (data1, data2, data3) = join!(
fetch_data(1),
fetch_data(2),
fetch_data(3)
);
println!("Results: {}, {}, {}", data1, data2, data3);
}use futures::try_join;
async fn fetch_user(id: u32) -> Result<String, &'static str> {
if id == 0 {
Err("Invalid ID")
} else {
Ok(format!("User {}", id))
}
}
#[tokio::main]
async fn main() {
// All must succeed
match try_join!(fetch_user(1), fetch_user(2)) {
Ok((user1, user2)) => println!("Users: {}, {}", user1, user2),
Err(e) => println!("Error: {}", e),
}
}use futures::select;
use futures::FutureExt;
async fn timeout() -> &'static str {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
"timeout"
}
async fn work() -> &'static str {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
"work complete"
}
#[tokio::main]
async fn main() {
// Returns when first completes
let result = select! {
res = timeout().fuse() => res,
res = work().fuse() => res,
};
println!("Result: {}", result); // "timeout"
}use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Create a stream
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
// Process each item
stream
.map(|x| x * 2)
.for_each(|x| async move {
println!("Value: {}", x);
})
.await;
}use futures::stream::{self, StreamExt};
async fn process_item(item: i32) -> i32 {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
item * 2
}
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=10);
// Process up to 3 items concurrently
let results: Vec<i32> = stream
.buffer_unordered(3)
.map(|x| process_item(x))
.collect()
.await;
println!("Results: {:?}", results);
}use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let sum = stream
.fold(0, |acc, x| async move { acc + x })
.await;
println!("Sum: {}", sum); // 15
}use tokio::time::{timeout, Duration};
async fn slow_operation() -> i32 {
tokio::time::sleep(Duration::from_secs(5)).await;
42
}
#[tokio::main]
async fn main() {
match timeout(Duration::from_millis(100), slow_operation()).await {
Ok(result) => println!("Result: {}", result),
Err(_) => println!("Operation timed out"),
}
}use futures::FutureExt;
async fn fetch_data() -> i32 {
42
}
#[tokio::main]
async fn main() {
// Map the result
let mapped = fetch_data().map(|x| x * 2);
println!("Mapped: {}", mapped.await);
// Then chain another future
let chained = fetch_data().then(|x| async move {
format!("Value: {}", x)
});
println!("Chained: {}", chained.await);
// Inspect without modifying
let inspected = fetch_data().inspect(|&x| {
println!("Got: {}", x);
});
inspected.await;
}use futures::stream::{self, StreamExt};
use futures::TryStreamExt;
#[tokio::main]
async fn main() {
let stream = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
// Collect into Vec
let result: Result<Vec<i32>, _> = stream.try_collect().await;
println!("Collected: {:?}", result);
// Or with collect (non-fallible)
let stream = stream::iter(vec![1, 2, 3]);
let result: Vec<i32> = stream.collect().await;
println!("Collected: {:?}", result);
}#[tokio::main]
async fn main() {
// Spawn concurrent task
let handle = tokio::spawn(async {
42
});
// Do other work...
println!("Working...");
// Wait for result
let result = handle.await.unwrap();
println!("Result: {}", result);
}use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
// Spawn sender
tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
});
// Receive
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
}use futures::future::try_join;
async fn fetch_name() -> Result<String, &'static str> {
Ok("Alice".to_string())
}
async fn fetch_age() -> Result<u32, &'static str> {
Ok(30)
}
#[tokio::main]
async fn main() {
let (name, age) = try_join(fetch_name(), fetch_age()).await.unwrap();
println!("{} is {} years old", name, age);
}use futures::future::{select, Either};
use futures::FutureExt;
async fn fast() -> i32 {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1
}
async fn slow() -> i32 {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2
}
#[tokio::main]
async fn main() {
let result = futures::future::select(fast().boxed(), slow().boxed()).await;
match result {
Either::Left((val, _remaining)) => println!("Fast won: {}", val),
Either::Right((val, _remaining)) => println!("Slow won: {}", val),
}
}use futures::FutureExt;
async fn get_future() -> impl std::future::Future<Output = i32> {
async { 42 }
}
#[tokio::main]
async fn main() {
// Without flatten
let nested = async { get_future().await };
// With flatten
let flattened = get_future().flatten();
println!("Result: {}", flattened.await);
}use futures::FutureExt;
async fn expensive() -> i32 {
println!("Computing...");
42
}
#[tokio::main]
async fn main() {
// Make future cloneable (result is cached)
let shared = expensive().shared();
// Can be awaited multiple times
let a = shared.clone();
let b = shared.clone();
assert_eq!(a.await, 42);
assert_eq!(b.await, 42);
}use futures::FutureExt;
#[tokio::main]
async fn main() {
async fn compute() -> i32 { 42 }
compute()
.inspect(|result| println!("Got: {}", result))
.map(|x| x * 2)
.inspect(|result| println!("Mapped: {}", result))
.await;
}use std::future::{ready, pending};
#[tokio::main]
async fn main() {
// Immediately ready
let r = ready(42);
println!("Ready: {}", r.await);
// Never completes
// let p = pending::<i32>();
// p.await; // Would hang forever
println!("Done");
}use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5)
.then(|x| async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
x * 2
});
let results: Vec<i32> = stream.collect().await;
println!("Results: {:?}", results);
}use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=10);
// Take first 3
let first_three: Vec<i32> = stream.take(3).collect().await;
println!("First 3: {:?}", first_three);
// Skip first 3
let stream = stream::iter(1..=10);
let after_three: Vec<i32> = stream.skip(3).collect().await;
println!("After 3: {:?}", after_three);
}use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=10);
// Keep only even
let evens: Vec<i32> = stream
.filter(|&x| async move { x % 2 == 0 })
.collect()
.await;
println!("Evens: {:?}", evens);
// Transform and filter
let stream = stream::iter(1..=5);
let doubled_evens: Vec<i32> = stream
.filter_map(|x| async move {
if x % 2 == 0 { Some(x * 2) } else { None }
})
.collect()
.await;
println!("Doubled evens: {:?}", doubled_evens);
}use futures::stream::{self, StreamExt};
use futures::stream::Pending;
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5);
// Timeout for each item
let results: Vec<_> = stream
.then(|x| async move {
timeout(Duration::from_millis(100), async move { x }).await
})
.collect()
.await;
println!("Results: {:?}", results);
}use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(vec![]));
let mut handles = vec![];
for i in 0..5 {
let data = data.clone();
handles.push(tokio::spawn(async move {
let mut guard = data.lock().await;
guard.push(i);
}));
}
for handle in handles {
handle.await.unwrap();
}
let final_data = data.lock().await.clone();
println!("Data: {:?}", final_data);
}use std::sync::Arc;
#[tokio::main]
async fn main() {
let data = Arc::new(RwLock::new(0));
// Multiple readers
let read_handles: Vec<_> = (0..3)
.map(|i| {
let data = data.clone();
tokio::spawn(async move {
let guard = data.read().await;
println!("Reader {}: {}", i, *guard);
})
})
.collect();
// One writer
let write_handle = {
let data = data.clone();
tokio::spawn(async move {
let mut guard = data.write().await;
*guard = 42;
})
};
for h in read_handles { h.await.unwrap(); }
write_handle.await.unwrap();
}use tokio::time::Duration;
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
println!("Working...");
}
});
// Let it run briefly
tokio::time::sleep(Duration::from_millis(250)).await;
// Abort the task
handle.abort();
println!("Task aborted");
}use futures::select;
use futures::FutureExt;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<i32>(10);
// Close channel immediately
drop(tx);
select! {
Some(val) = rx.recv().fuse() => {
println!("Got: {}", val);
}
default => {
println!("No message available");
}
}
}use futures::select;
use futures::FutureExt;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<i32>(10);
// Send one message then close
tx.send(42).await.unwrap();
drop(tx); // Close sender
loop {
select! {
Some(val) = rx.recv().fuse() => {
println!("Received: {}", val);
}
complete => {
println!("Channel closed");
break;
}
}
}
}Futures Key Imports:
use std::future::Future;
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures::stream;
use futures::future::{join, try_join, select};Core Combinators:
| Method | Description |
|--------|-------------|
| .map(f) | Transform output |
| .then(f) | Chain another future |
| .inspect(f) | Side effect without modification |
| .flatten() | Flatten nested future |
| .shared() | Make cloneable (caches result) |
Join Macros:
join!(fut1, fut2); // Run concurrently, return all results
try_join!(fut1, fut2); // Same, but return early on errorSelect Macro:
select! {
res = fut1.fuse() => res,
res = fut2.fuse() => res,
default => { /* no future ready */ },
complete => { /* all done */ },
}Stream Methods:
| Method | Description |
|--------|-------------|
| .map(f) | Transform items |
| .filter(f) | Keep matching items |
| .filter_map(f) | Filter and transform |
| .take(n) | Take first n items |
| .skip(n) | Skip first n items |
| .collect() | Collect into Vec |
| .fold(init, f) | Reduce to single value |
| .for_each(f) | Execute for each item |
Key Points:
async fn to create futures.await to wait for completionjoin! for concurrent executionselect! for racingtokio::spawn for background tasks.fuse() futures in select!tokio::sync::Mutex in async code