How do I work with Futures for Asynchronous Programming in Rust?

Walkthrough

Futures is the core async runtime foundation crate for Rust. It defines the Future trait, essential combinators, and utilities for working with asynchronous computations. While Tokio or async-std provide the runtime, the futures crate provides the tools to compose and manipulate futures.

Key concepts:

  • Future trait — Core abstraction for async computation
  • Async/await — Syntax for writing async code
  • Combinators — Methods like map, then, join, select
  • Streams — Async iterators (async version of Iterator)
  • Sinks — Async writers (async version of Write)
  • Executors — Running futures to completion

When to use Futures:

  • Building async abstractions
  • Combining multiple async operations
  • Working with streams and sinks
  • Implementing custom futures
  • Lower-level async programming

When NOT to use Futures:

  • Simple async I/O (use Tokio directly)
  • When async/await syntax is sufficient
  • When you need a runtime (use Tokio/async-std)

Code Examples

Basic Future with async/await

use std::future::Future;
use std::pin::Pin;
 
// Simple async function
async fn say_hello() {
    println!("Hello, async world!");
}
 
// Async function returning value
async fn compute() -> i32 {
    42
}
 
#[tokio::main]
async fn main() {
    say_hello().await;
    let result = compute().await;
    println!("Result: {}", result);
}

Creating Futures Manually

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
 
struct Delay {
    millis: u64,
    elapsed: bool,
}
 
impl Future for Delay {
    type Output = ();
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.elapsed {
            Poll::Ready(())
        } else {
            // Schedule wake-up
            let waker = cx.waker().clone();
            let millis = self.millis;
            std::thread::spawn(move || {
                std::thread::sleep(std::time::Duration::from_millis(millis));
                waker.wake();
            });
            self.elapsed = true;
            Poll::Pending
        }
    }
}
 
#[tokio::main]
async fn main() {
    println!("Starting delay...");
    Delay { millis: 1000, elapsed: false }.await;
    println!("Delay complete!");
}

Joining Multiple Futures

use futures::join;
 
async fn fetch_data(id: u32) -> String {
    // Simulate async work
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    format!("Data {}", id)
}
 
#[tokio::main]
async fn main() {
    // Run futures concurrently, wait for all
    let (data1, data2, data3) = join!(
        fetch_data(1),
        fetch_data(2),
        fetch_data(3)
    );
    
    println!("Results: {}, {}, {}", data1, data2, data3);
}

Try Join (Fallible Futures)

use futures::try_join;
 
async fn fetch_user(id: u32) -> Result<String, &'static str> {
    if id == 0 {
        Err("Invalid ID")
    } else {
        Ok(format!("User {}", id))
    }
}
 
#[tokio::main]
async fn main() {
    // All must succeed
    match try_join!(fetch_user(1), fetch_user(2)) {
        Ok((user1, user2)) => println!("Users: {}, {}", user1, user2),
        Err(e) => println!("Error: {}", e),
    }
}

Select (Race Multiple Futures)

use futures::select;
use futures::FutureExt;
 
async fn timeout() -> &'static str {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    "timeout"
}
 
async fn work() -> &'static str {
    tokio::time::sleep(std::time::Duration::from_millis(200)).await;
    "work complete"
}
 
#[tokio::main]
async fn main() {
    // Returns when first completes
    let result = select! {
        res = timeout().fuse() => res,
        res = work().fuse() => res,
    };
    
    println!("Result: {}", result);  // "timeout"
}

Working with Streams

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    // Create a stream
    let stream = stream::iter(vec![1, 2, 3, 4, 5]);
    
    // Process each item
    stream
        .map(|x| x * 2)
        .for_each(|x| async move {
            println!("Value: {}", x);
        })
        .await;
}

Stream with Buffer

use futures::stream::{self, StreamExt};
 
async fn process_item(item: i32) -> i32 {
    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
    item * 2
}
 
#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=10);
    
    // Process up to 3 items concurrently
    let results: Vec<i32> = stream
        .buffer_unordered(3)
        .map(|x| process_item(x))
        .collect()
        .await;
    
    println!("Results: {:?}", results);
}

Fold Stream into Value

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    let stream = stream::iter(vec![1, 2, 3, 4, 5]);
    
    let sum = stream
        .fold(0, |acc, x| async move { acc + x })
        .await;
    
    println!("Sum: {}", sum);  // 15
}

Timeout Future

use tokio::time::{timeout, Duration};
 
async fn slow_operation() -> i32 {
    tokio::time::sleep(Duration::from_secs(5)).await;
    42
}
 
#[tokio::main]
async fn main() {
    match timeout(Duration::from_millis(100), slow_operation()).await {
        Ok(result) => println!("Result: {}", result),
        Err(_) => println!("Operation timed out"),
    }
}

Future Combinators

use futures::FutureExt;
 
async fn fetch_data() -> i32 {
    42
}
 
#[tokio::main]
async fn main() {
    // Map the result
    let mapped = fetch_data().map(|x| x * 2);
    println!("Mapped: {}", mapped.await);
    
    // Then chain another future
    let chained = fetch_data().then(|x| async move {
        format!("Value: {}", x)
    });
    println!("Chained: {}", chained.await);
    
    // Inspect without modifying
    let inspected = fetch_data().inspect(|&x| {
        println!("Got: {}", x);
    });
    inspected.await;
}

Collect Stream

use futures::stream::{self, StreamExt};
use futures::TryStreamExt;
 
#[tokio::main]
async fn main() {
    let stream = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
    
    // Collect into Vec
    let result: Result<Vec<i32>, _> = stream.try_collect().await;
    println!("Collected: {:?}", result);
    
    // Or with collect (non-fallible)
    let stream = stream::iter(vec![1, 2, 3]);
    let result: Vec<i32> = stream.collect().await;
    println!("Collected: {:?}", result);
}

Spawn Tasks

#[tokio::main]
async fn main() {
    // Spawn concurrent task
    let handle = tokio::spawn(async {
        42
    });
    
    // Do other work...
    println!("Working...");
    
    // Wait for result
    let result = handle.await.unwrap();
    println!("Result: {}", result);
}

Async Channels

use tokio::sync::mpsc;
 
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    
    // Spawn sender
    tokio::spawn(async move {
        for i in 0..5 {
            tx.send(i).await.unwrap();
        }
    });
    
    // Receive
    while let Some(value) = rx.recv().await {
        println!("Received: {}", value);
    }
}

Zip Futures

use futures::future::try_join;
 
async fn fetch_name() -> Result<String, &'static str> {
    Ok("Alice".to_string())
}
 
async fn fetch_age() -> Result<u32, &'static str> {
    Ok(30)
}
 
#[tokio::main]
async fn main() {
    let (name, age) = try_join(fetch_name(), fetch_age()).await.unwrap();
    println!("{} is {} years old", name, age);
}

Race Futures

use futures::future::{select, Either};
use futures::FutureExt;
 
async fn fast() -> i32 {
    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
    1
}
 
async fn slow() -> i32 {
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
    2
}
 
#[tokio::main]
async fn main() {
    let result = futures::future::select(fast().boxed(), slow().boxed()).await;
    
    match result {
        Either::Left((val, _remaining)) => println!("Fast won: {}", val),
        Either::Right((val, _remaining)) => println!("Slow won: {}", val),
    }
}

Flatten Nested Futures

use futures::FutureExt;
 
async fn get_future() -> impl std::future::Future<Output = i32> {
    async { 42 }
}
 
#[tokio::main]
async fn main() {
    // Without flatten
    let nested = async { get_future().await };
    
    // With flatten
    let flattened = get_future().flatten();
    println!("Result: {}", flattened.await);
}

Shared Future (Cloneable)

use futures::FutureExt;
 
async fn expensive() -> i32 {
    println!("Computing...");
    42
}
 
#[tokio::main]
async fn main() {
    // Make future cloneable (result is cached)
    let shared = expensive().shared();
    
    // Can be awaited multiple times
    let a = shared.clone();
    let b = shared.clone();
    
    assert_eq!(a.await, 42);
    assert_eq!(b.await, 42);
}

Inspect and Debug

use futures::FutureExt;
 
#[tokio::main]
async fn main() {
    async fn compute() -> i32 { 42 }
    
    compute()
        .inspect(|result| println!("Got: {}", result))
        .map(|x| x * 2)
        .inspect(|result| println!("Mapped: {}", result))
        .await;
}

Ready and Pending

use std::future::{ready, pending};
 
#[tokio::main]
async fn main() {
    // Immediately ready
    let r = ready(42);
    println!("Ready: {}", r.await);
    
    // Never completes
    // let p = pending::<i32>();
    // p.await;  // Would hang forever
    
    println!("Done");
}

Stream from Iterator

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=5)
        .then(|x| async move {
            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
            x * 2
        });
    
    let results: Vec<i32> = stream.collect().await;
    println!("Results: {:?}", results);
}

Take and Skip

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=10);
    
    // Take first 3
    let first_three: Vec<i32> = stream.take(3).collect().await;
    println!("First 3: {:?}", first_three);
    
    // Skip first 3
    let stream = stream::iter(1..=10);
    let after_three: Vec<i32> = stream.skip(3).collect().await;
    println!("After 3: {:?}", after_three);
}

Filter and Filter Map

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=10);
    
    // Keep only even
    let evens: Vec<i32> = stream
        .filter(|&x| async move { x % 2 == 0 })
        .collect()
        .await;
    println!("Evens: {:?}", evens);
    
    // Transform and filter
    let stream = stream::iter(1..=5);
    let doubled_evens: Vec<i32> = stream
        .filter_map(|x| async move {
            if x % 2 == 0 { Some(x * 2) } else { None }
        })
        .collect()
        .await;
    println!("Doubled evens: {:?}", doubled_evens);
}

Timeout on Stream

use futures::stream::{self, StreamExt};
use futures::stream::Pending;
use tokio::time::{timeout, Duration};
 
#[tokio::main]
async fn main() {
    let stream = stream::iter(1..=5);
    
    // Timeout for each item
    let results: Vec<_> = stream
        .then(|x| async move {
            timeout(Duration::from_millis(100), async move { x }).await
        })
        .collect()
        .await;
    
    println!("Results: {:?}", results);
}

Async Mutex

use tokio::sync::Mutex;
use std::sync::Arc;
 
#[tokio::main]
async fn main() {
    let data = Arc::new(Mutex::new(vec![]));
    
    let mut handles = vec![];
    
    for i in 0..5 {
        let data = data.clone();
        handles.push(tokio::spawn(async move {
            let mut guard = data.lock().await;
            guard.push(i);
        }));
    }
    
    for handle in handles {
        handle.await.unwrap();
    }
    
    let final_data = data.lock().await.clone();
    println!("Data: {:?}", final_data);
}

Async RwLock

use std::sync::Arc;
 
#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(0));
    
    // Multiple readers
    let read_handles: Vec<_> = (0..3)
        .map(|i| {
            let data = data.clone();
            tokio::spawn(async move {
                let guard = data.read().await;
                println!("Reader {}: {}", i, *guard);
            })
        })
        .collect();
    
    // One writer
    let write_handle = {
        let data = data.clone();
        tokio::spawn(async move {
            let mut guard = data.write().await;
            *guard = 42;
        })
    };
    
    for h in read_handles { h.await.unwrap(); }
    write_handle.await.unwrap();
}

Abort Handle

use tokio::time::Duration;
 
#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        loop {
            tokio::time::sleep(Duration::from_millis(100)).await;
            println!("Working...");
        }
    });
    
    // Let it run briefly
    tokio::time::sleep(Duration::from_millis(250)).await;
    
    // Abort the task
    handle.abort();
    println!("Task aborted");
}

Select with Default

use futures::select;
use futures::FutureExt;
use tokio::sync::mpsc;
 
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<i32>(10);
    
    // Close channel immediately
    drop(tx);
    
    select! {
        Some(val) = rx.recv().fuse() => {
            println!("Got: {}", val);
        }
        default => {
            println!("No message available");
        }
    }
}

Complete Pattern in Select

use futures::select;
use futures::FutureExt;
use tokio::sync::mpsc;
 
#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<i32>(10);
    
    // Send one message then close
    tx.send(42).await.unwrap();
    drop(tx); // Close sender
    
    loop {
        select! {
            Some(val) = rx.recv().fuse() => {
                println!("Received: {}", val);
            }
            complete => {
                println!("Channel closed");
                break;
            }
        }
    }
}

Summary

Futures Key Imports:

use std::future::Future;
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures::stream;
use futures::future::{join, try_join, select};

Core Combinators:

Method Description
.map(f) Transform output
.then(f) Chain another future
.inspect(f) Side effect without modification
.flatten() Flatten nested future
.shared() Make cloneable (caches result)

Join Macros:

join!(fut1, fut2);      // Run concurrently, return all results
try_join!(fut1, fut2);  // Same, but return early on error

Select Macro:

select! {
    res = fut1.fuse() => res,
    res = fut2.fuse() => res,
    default => { /* no future ready */ },
    complete => { /* all done */ },
}

Stream Methods:

Method Description
.map(f) Transform items
.filter(f) Keep matching items
.filter_map(f) Filter and transform
.take(n) Take first n items
.skip(n) Skip first n items
.collect() Collect into Vec
.fold(init, f) Reduce to single value
.for_each(f) Execute for each item

Key Points:

  • Use async fn to create futures
  • Use .await to wait for completion
  • Use join! for concurrent execution
  • Use select! for racing
  • Use tokio::spawn for background tasks
  • Streams are async iterators
  • Always .fuse() futures in select!
  • Use tokio::sync::Mutex in async code