How do I work with Futures for Asynchronous Programming in Rust?
Walkthrough
Futures is the core async runtime foundation crate for Rust. It defines the Future trait, essential combinators, and utilities for working with asynchronous computations. While Tokio or async-std provide the runtime, the futures crate provides the tools to compose and manipulate futures.
Key concepts:
- Future trait — Core abstraction for async computation
- Async/await — Syntax for writing async code
- Combinators — Methods like
map,then,join,select - Streams — Async iterators (async version of Iterator)
- Sinks — Async writers (async version of Write)
- Executors — Running futures to completion
When to use Futures:
- Building async abstractions
- Combining multiple async operations
- Working with streams and sinks
- Implementing custom futures
- Lower-level async programming
When NOT to use Futures:
- Simple async I/O (use Tokio directly)
- When async/await syntax is sufficient
- When you need a runtime (use Tokio/async-std)
Code Examples
Basic Future with async/await
use std::future::Future;
use std::pin::Pin;
// Simple async function
async fn say_hello() {
println!("Hello, async world!");
}
// Async function returning value
async fn compute() -> i32 {
42
}
#[tokio::main]
async fn main() {
say_hello().await;
let result = compute().await;
println!("Result: {}", result);
}Creating Futures Manually
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct Delay {
millis: u64,
elapsed: bool,
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.elapsed {
Poll::Ready(())
} else {
// Schedule wake-up
let waker = cx.waker().clone();
let millis = self.millis;
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(millis));
waker.wake();
});
self.elapsed = true;
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
println!("Starting delay...");
Delay { millis: 1000, elapsed: false }.await;
println!("Delay complete!");
}Joining Multiple Futures
use futures::join;
async fn fetch_data(id: u32) -> String {
// Simulate async work
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
format!("Data {}", id)
}
#[tokio::main]
async fn main() {
// Run futures concurrently, wait for all
let (data1, data2, data3) = join!(
fetch_data(1),
fetch_data(2),
fetch_data(3)
);
println!("Results: {}, {}, {}", data1, data2, data3);
}Try Join (Fallible Futures)
use futures::try_join;
async fn fetch_user(id: u32) -> Result<String, &'static str> {
if id == 0 {
Err("Invalid ID")
} else {
Ok(format!("User {}", id))
}
}
#[tokio::main]
async fn main() {
// All must succeed
match try_join!(fetch_user(1), fetch_user(2)) {
Ok((user1, user2)) => println!("Users: {}, {}", user1, user2),
Err(e) => println!("Error: {}", e),
}
}Select (Race Multiple Futures)
use futures::select;
use futures::FutureExt;
async fn timeout() -> &'static str {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
"timeout"
}
async fn work() -> &'static str {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
"work complete"
}
#[tokio::main]
async fn main() {
// Returns when first completes
let result = select! {
res = timeout().fuse() => res,
res = work().fuse() => res,
};
println!("Result: {}", result); // "timeout"
}Working with Streams
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// Create a stream
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
// Process each item
stream
.map(|x| x * 2)
.for_each(|x| async move {
println!("Value: {}", x);
})
.await;
}Stream with Buffer
use futures::stream::{self, StreamExt};
async fn process_item(item: i32) -> i32 {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
item * 2
}
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=10);
// Process up to 3 items concurrently
let results: Vec<i32> = stream
.buffer_unordered(3)
.map(|x| process_item(x))
.collect()
.await;
println!("Results: {:?}", results);
}Fold Stream into Value
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let sum = stream
.fold(0, |acc, x| async move { acc + x })
.await;
println!("Sum: {}", sum); // 15
}Timeout Future
use tokio::time::{timeout, Duration};
async fn slow_operation() -> i32 {
tokio::time::sleep(Duration::from_secs(5)).await;
42
}
#[tokio::main]
async fn main() {
match timeout(Duration::from_millis(100), slow_operation()).await {
Ok(result) => println!("Result: {}", result),
Err(_) => println!("Operation timed out"),
}
}Future Combinators
use futures::FutureExt;
async fn fetch_data() -> i32 {
42
}
#[tokio::main]
async fn main() {
// Map the result
let mapped = fetch_data().map(|x| x * 2);
println!("Mapped: {}", mapped.await);
// Then chain another future
let chained = fetch_data().then(|x| async move {
format!("Value: {}", x)
});
println!("Chained: {}", chained.await);
// Inspect without modifying
let inspected = fetch_data().inspect(|&x| {
println!("Got: {}", x);
});
inspected.await;
}Collect Stream
use futures::stream::{self, StreamExt};
use futures::TryStreamExt;
#[tokio::main]
async fn main() {
let stream = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
// Collect into Vec
let result: Result<Vec<i32>, _> = stream.try_collect().await;
println!("Collected: {:?}", result);
// Or with collect (non-fallible)
let stream = stream::iter(vec![1, 2, 3]);
let result: Vec<i32> = stream.collect().await;
println!("Collected: {:?}", result);
}Spawn Tasks
#[tokio::main]
async fn main() {
// Spawn concurrent task
let handle = tokio::spawn(async {
42
});
// Do other work...
println!("Working...");
// Wait for result
let result = handle.await.unwrap();
println!("Result: {}", result);
}Async Channels
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
// Spawn sender
tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
});
// Receive
while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
}Zip Futures
use futures::future::try_join;
async fn fetch_name() -> Result<String, &'static str> {
Ok("Alice".to_string())
}
async fn fetch_age() -> Result<u32, &'static str> {
Ok(30)
}
#[tokio::main]
async fn main() {
let (name, age) = try_join(fetch_name(), fetch_age()).await.unwrap();
println!("{} is {} years old", name, age);
}Race Futures
use futures::future::{select, Either};
use futures::FutureExt;
async fn fast() -> i32 {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1
}
async fn slow() -> i32 {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2
}
#[tokio::main]
async fn main() {
let result = futures::future::select(fast().boxed(), slow().boxed()).await;
match result {
Either::Left((val, _remaining)) => println!("Fast won: {}", val),
Either::Right((val, _remaining)) => println!("Slow won: {}", val),
}
}Flatten Nested Futures
use futures::FutureExt;
async fn get_future() -> impl std::future::Future<Output = i32> {
async { 42 }
}
#[tokio::main]
async fn main() {
// Without flatten
let nested = async { get_future().await };
// With flatten
let flattened = get_future().flatten();
println!("Result: {}", flattened.await);
}Shared Future (Cloneable)
use futures::FutureExt;
async fn expensive() -> i32 {
println!("Computing...");
42
}
#[tokio::main]
async fn main() {
// Make future cloneable (result is cached)
let shared = expensive().shared();
// Can be awaited multiple times
let a = shared.clone();
let b = shared.clone();
assert_eq!(a.await, 42);
assert_eq!(b.await, 42);
}Inspect and Debug
use futures::FutureExt;
#[tokio::main]
async fn main() {
async fn compute() -> i32 { 42 }
compute()
.inspect(|result| println!("Got: {}", result))
.map(|x| x * 2)
.inspect(|result| println!("Mapped: {}", result))
.await;
}Ready and Pending
use std::future::{ready, pending};
#[tokio::main]
async fn main() {
// Immediately ready
let r = ready(42);
println!("Ready: {}", r.await);
// Never completes
// let p = pending::<i32>();
// p.await; // Would hang forever
println!("Done");
}Stream from Iterator
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5)
.then(|x| async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
x * 2
});
let results: Vec<i32> = stream.collect().await;
println!("Results: {:?}", results);
}Take and Skip
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=10);
// Take first 3
let first_three: Vec<i32> = stream.take(3).collect().await;
println!("First 3: {:?}", first_three);
// Skip first 3
let stream = stream::iter(1..=10);
let after_three: Vec<i32> = stream.skip(3).collect().await;
println!("After 3: {:?}", after_three);
}Filter and Filter Map
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=10);
// Keep only even
let evens: Vec<i32> = stream
.filter(|&x| async move { x % 2 == 0 })
.collect()
.await;
println!("Evens: {:?}", evens);
// Transform and filter
let stream = stream::iter(1..=5);
let doubled_evens: Vec<i32> = stream
.filter_map(|x| async move {
if x % 2 == 0 { Some(x * 2) } else { None }
})
.collect()
.await;
println!("Doubled evens: {:?}", doubled_evens);
}Timeout on Stream
use futures::stream::{self, StreamExt};
use futures::stream::Pending;
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=5);
// Timeout for each item
let results: Vec<_> = stream
.then(|x| async move {
timeout(Duration::from_millis(100), async move { x }).await
})
.collect()
.await;
println!("Results: {:?}", results);
}Async Mutex
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(vec![]));
let mut handles = vec![];
for i in 0..5 {
let data = data.clone();
handles.push(tokio::spawn(async move {
let mut guard = data.lock().await;
guard.push(i);
}));
}
for handle in handles {
handle.await.unwrap();
}
let final_data = data.lock().await.clone();
println!("Data: {:?}", final_data);
}Async RwLock
use std::sync::Arc;
#[tokio::main]
async fn main() {
let data = Arc::new(RwLock::new(0));
// Multiple readers
let read_handles: Vec<_> = (0..3)
.map(|i| {
let data = data.clone();
tokio::spawn(async move {
let guard = data.read().await;
println!("Reader {}: {}", i, *guard);
})
})
.collect();
// One writer
let write_handle = {
let data = data.clone();
tokio::spawn(async move {
let mut guard = data.write().await;
*guard = 42;
})
};
for h in read_handles { h.await.unwrap(); }
write_handle.await.unwrap();
}Abort Handle
use tokio::time::Duration;
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
println!("Working...");
}
});
// Let it run briefly
tokio::time::sleep(Duration::from_millis(250)).await;
// Abort the task
handle.abort();
println!("Task aborted");
}Select with Default
use futures::select;
use futures::FutureExt;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<i32>(10);
// Close channel immediately
drop(tx);
select! {
Some(val) = rx.recv().fuse() => {
println!("Got: {}", val);
}
default => {
println!("No message available");
}
}
}Complete Pattern in Select
use futures::select;
use futures::FutureExt;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<i32>(10);
// Send one message then close
tx.send(42).await.unwrap();
drop(tx); // Close sender
loop {
select! {
Some(val) = rx.recv().fuse() => {
println!("Received: {}", val);
}
complete => {
println!("Channel closed");
break;
}
}
}
}Summary
Futures Key Imports:
use std::future::Future;
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures::stream;
use futures::future::{join, try_join, select};Core Combinators:
| Method | Description |
|---|---|
.map(f) |
Transform output |
.then(f) |
Chain another future |
.inspect(f) |
Side effect without modification |
.flatten() |
Flatten nested future |
.shared() |
Make cloneable (caches result) |
Join Macros:
join!(fut1, fut2); // Run concurrently, return all results
try_join!(fut1, fut2); // Same, but return early on errorSelect Macro:
select! {
res = fut1.fuse() => res,
res = fut2.fuse() => res,
default => { /* no future ready */ },
complete => { /* all done */ },
}Stream Methods:
| Method | Description |
|---|---|
.map(f) |
Transform items |
.filter(f) |
Keep matching items |
.filter_map(f) |
Filter and transform |
.take(n) |
Take first n items |
.skip(n) |
Skip first n items |
.collect() |
Collect into Vec |
.fold(init, f) |
Reduce to single value |
.for_each(f) |
Execute for each item |
Key Points:
- Use
async fnto create futures - Use
.awaitto wait for completion - Use
join!for concurrent execution - Use
select!for racing - Use
tokio::spawnfor background tasks - Streams are async iterators
- Always
.fuse()futures inselect! - Use
tokio::sync::Mutexin async code
