Loading pageā¦
Rust walkthroughs
Loading pageā¦
futures::future::try_join! handle error propagation across multiple concurrent futures?try_join! concurrently executes multiple futures and short-circuits on the first error, returning either a tuple of all successful results or the first error encountered. Unlike join! which waits for all futures regardless of outcome, try_join! immediately returns when any future fails, making it ideal for operations where partial success is meaningless. The macro requires all futures to have compatible error types, and it polls futures fairly using a deterministic ordering. This error-propagation behavior mirrors the ? operator but across concurrent operations, enabling ergonomic composition of fallible asynchronous work.
use futures::future::try_join;
use std::error::Error;
async fn fetch_user(id: u32) -> Result<String, Box<dyn Error>> {
Ok(format!("User {}", id))
}
async fn fetch_permissions(id: u32) -> Result<Vec<String>, Box<dyn Error>> {
Ok(vec!["read".to_string(), "write".to_string()])
}
async fn example() -> Result<(), Box<dyn Error>> {
// try_join! requires all futures to succeed
let (user, permissions) = try_join!(
fetch_user(1),
fetch_permissions(1),
)?;
println!("User: {}", user);
println!("Permissions: {:?}", permissions);
Ok(())
}
// If fetch_user fails: returns Err immediately
// If fetch_permissions fails: returns Err immediately
// Only returns Ok if both succeedtry_join! returns Result<(T1, T2, ...), E> where all inner types must have the same error type.
use futures::future::try_join;
use std::error::Error;
#[derive(Debug)]
struct ApiError(String);
async fn operation_a() -> Result<String, ApiError> {
println!("Operation A starting");
// Simulate work
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("Operation A completed");
Ok("A".to_string())
}
async fn operation_b() -> Result<String, ApiError> {
println!("Operation B starting");
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
println!("Operation B failing");
Err(ApiError("B failed".to_string()))
}
async fn example() -> Result<(), ApiError> {
let result = try_join!(
operation_a(),
operation_b(),
);
match result {
Ok((a, b)) => println!("Both succeeded: {}, {}", a, b),
Err(e) => println!("Error: {:?}", e),
}
Ok(())
}
// Output:
// Operation A starting
// Operation B starting
// Operation B failing
// Error: ApiError("B failed")
// Note: Operation A may still complete or may be cancelled
// The error is returned as soon as it's knownWhen any future returns Err, try_join! returns that error immediately without waiting for other futures.
use futures::future::try_join;
use std::error::Error;
// All futures must have compatible error types
async fn example_same_error_type() -> Result<(), std::io::Error> {
async fn read_file() -> Result<String, std::io::Error> {
Ok("file contents".to_string())
}
async fn read_config() -> Result<Vec<String>, std::io::Error> {
Ok(vec!["config".to_string()])
}
// Both return std::io::Error - compatible
let (file, config) = try_join!(read_file(), read_config())?;
Ok(())
}
// Different error types require conversion
async fn example_different_error_types() -> Result<(), Box<dyn Error>> {
async fn read_file() -> Result<String, std::io::Error> {
Ok("file".to_string())
}
async fn parse_data() -> Result<i32, ParseIntError> {
Ok(42)
}
#[derive(Debug)]
struct ParseIntError;
// This won't compile - different error types
// let (file, data) = try_join!(read_file(), parse_data())?;
// Solution: Convert errors to common type
let (file, data) = try_join!(
async { read_file().await.map_err(|e| Box::new(e) as Box<dyn Error>) },
async { parse_data().await.map_err(|e| Box::new(e) as Box<dyn Error>) },
)?;
Ok(())
}All futures must return Result<_, E> where the error types are compatible (same type or convertible).
use futures::future::{try_join, join};
use std::error::Error;
async fn failing_operation() -> Result<String, Box<dyn Error>> {
Err("failed".into())
}
async fn slow_operation() -> Result<i32, Box<dyn Error>> {
println!("Slow operation starting");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
println!("Slow operation completed");
Ok(42)
}
// try_join! - short-circuits on error
async fn with_try_join() -> Result<(), Box<dyn Error>> {
let result = try_join!(
failing_operation(),
slow_operation(),
);
// If failing_operation returns Err immediately,
// try_join! returns Err without waiting for slow_operation
result?;
Ok(())
}
// join! - waits for all futures
async fn with_join() -> Result<(), Box<dyn Error>> {
let (a_result, b_result) = join!(
failing_operation(),
slow_operation(),
);
// join! waits for BOTH futures, then returns both Results
// slow_operation will complete even if failing_operation already errored
println!("Both futures completed");
let a = a_result?;
let b = b_result?;
Ok(())
}
// join! returns: (Result<T1, E1>, Result<T2, E2>)
// try_join! returns: Result<(T1, T2), E>join! collects all results; try_join! short-circuits on first error.
use futures::future::try_join;
use std::error::Error;
async fn fetch_all() -> Result<(String, String), Box<dyn Error>> {
async fn fetch_data(source: &str) -> Result<String, Box<dyn Error>> {
if source == "fail" {
Err(format!("Failed to fetch from {}", source).into())
} else {
Ok(format!("Data from {}", source))
}
}
// try_join! integrates with ? for clean error propagation
let (data1, data2) = try_join!(
fetch_data("source1"),
fetch_data("source2"),
)?;
Ok((data1, data2))
}
async fn caller() -> Result<(), Box<dyn Error>> {
// The ? propagates errors from try_join! to caller
let (d1, d2) = fetch_all().await?;
println!("Got: {}, {}", d1, d2);
Ok(())
}
// If either fetch_data returns Err:
// 1. try_join! returns Err immediately
// 2. The ? operator propagates it to caller()
// 3. caller() returns Errtry_join! combined with ? provides clean error propagation through async call stacks.
use futures::future::try_join;
use std::error::Error;
// Using Box<dyn Error> for flexibility
async fn multiple_error_types() -> Result<(), Box<dyn Error>> {
async fn read_file() -> Result<String, std::io::Error> {
Ok("file contents".to_string())
}
async fn fetch_url() -> Result<String, reqwest::Error> {
// Simulated
Ok("url contents".to_string())
}
async fn parse_config() -> Result<Config, toml::de::Error> {
Ok(Config {})
}
struct Config {}
// Convert all to Box<dyn Error>
let (file, url, config) = try_join!(
async { read_file().await.map_err(Into::into) },
async { fetch_url().await.map_err(Into::into) },
async { parse_config().await.map_err(Into::into) },
)?;
Ok(())
}
// Using a custom error enum
#[derive(Debug)]
enum AppError {
Io(std::io::Error),
Http(String),
Config(String),
}
impl From<std::io::Error> for AppError {
fn from(e: std::io::Error) -> Self {
AppError::Io(e)
}
}
impl From<String> for AppError {
fn from(e: String) -> Self {
AppError::Http(e)
}
}
async fn with_enum() -> Result<(), AppError> {
async fn read_file() -> Result<String, std::io::Error> {
Ok("file".to_string())
}
async fn fetch() -> Result<String, String> {
Ok("data".to_string())
}
let (file, data) = try_join!(
async { read_file().await.map_err(AppError::from) },
async { fetch().await.map_err(AppError::from) },
)?;
Ok(())
}Convert different error types to a common type for use with try_join!.
use futures::future::try_join;
use std::time::Instant;
async fn concurrent_timing() -> Result<(), Box<dyn std::error::Error>> {
let start = Instant::now();
async fn task_a() -> Result<String, Box<dyn std::error::Error>> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok("A".to_string())
}
async fn task_b() -> Result<String, Box<dyn std::error::Error>> {
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
Ok("B".to_string())
}
async fn task_c() -> Result<String, Box<dyn std::error::Error>> {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Ok("C".to_string())
}
// All tasks run concurrently
let (a, b, c) = try_join!(task_a(), task_b(), task_c())?;
let elapsed = start.elapsed();
println!("Elapsed: {:?}", elapsed);
// Elapsed ~150ms (max of concurrent tasks), not 300ms (sum)
Ok(())
}
// try_join! executes futures concurrently, not sequentially
// Total time is roughly max(task times), not sum(task times)All futures passed to try_join! execute concurrently; completion time is bounded by the slowest future.
use futures::future::try_join;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
async fn cancellation_example() -> Result<(), Box<dyn std::error::Error>> {
let counter = Arc::new(AtomicUsize::new(0));
async fn slow_task(counter: Arc<AtomicUsize>) -> Result<String, &'static str> {
for i in 0..10 {
counter.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
Ok("slow completed".to_string())
}
async fn fast_failure() -> Result<String, &'static str> {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
Err("fast failed")
}
let result = try_join!(
slow_task(Arc::clone(&counter)),
fast_failure(),
);
// fast_failure returns Err after ~20ms
// try_join! returns Err immediately
// slow_task may continue running (it's already started)
// The counter value shows how far slow_task progressed
println!("Result: {:?}", result);
println!("Counter: {}", counter.load(Ordering::SeqCst));
Ok(())
}
// Important: try_join! doesn't cancel running futures
// It just returns early and drops the Join handle
// The futures continue until they complete or are droppedWhen one future errors, try_join! returns immediately but doesn't cancel running futures.
use futures::future::try_join;
// try_join! works with Result, not Option
// For Option, use try_join with conversion:
async fn with_options() -> Result<(), &'static str> {
async fn get_optional() -> Option<String> {
Some("data".to_string())
}
// Convert Option to Result
let (a, b) = try_join!(
async { get_optional().await.ok_or("a was None") },
async { get_optional().await.ok_or("b was None") },
)?;
println!("Got: {}, {}", a, b);
Ok(())
}
// Or use Option::ok_or to convert None to Errtry_join! expects Result; convert Option to Result with ok_or.
use futures::future::try_join_all;
use std::error::Error;
async fn dynamic_collection() -> Result<Vec<String>, Box<dyn Error>> {
// try_join! is a macro for fixed number of futures
// try_join_all handles collections of futures
let futures: Vec<_> = (0..5)
.map(|i| async move {
if i == 3 {
Err(format!("Index {} failed", i).into())
} else {
Ok(format!("Item {}", i))
}
})
.collect();
// try_join_all returns Result<Vec<T>, E>
let results: Vec<String> = try_join_all(futures).await?;
println!("All results: {:?}", results);
Ok(())
}
// If any future fails:
// - try_join_all returns the first error
// - All other results are discardedFor a dynamic number of futures, use try_join_all instead of the try_join! macro.
use futures::future::try_join;
use std::error::Error;
async fn with_closures() -> Result<(), Box<dyn Error>> {
let config = "config_value";
// Pass owned values into async blocks
let (result1, result2) = try_join!(
async {
let data = process_with(config).await?;
Ok(data)
},
async {
let other = other_process().await?;
Ok(other)
},
)?;
Ok(())
}
async fn process_with(config: &str) -> Result<String, Box<dyn Error>> {
Ok(format!("Processed with {}", config))
}
async fn other_process() -> Result<String, Box<dyn Error>> {
Ok("Other result".to_string())
}
// Async blocks can capture environment
// Ensure proper lifetime and ownership semanticsUse async blocks to capture environment and create futures inline.
use futures::future::try_join;
use std::error::Error;
async fn nested_try_join() -> Result<(), Box<dyn Error>> {
async fn fetch_a() -> Result<String, Box<dyn Error>> {
Ok("A".to_string())
}
async fn fetch_b() -> Result<String, Box<dyn Error>> {
Ok("B".to_string())
}
async fn fetch_c() -> Result<String, Box<dyn Error>> {
Ok("C".to_string())
}
async fn fetch_d() -> Result<String, Box<dyn Error>> {
Ok("D".to_string())
}
// Nest try_join! calls for hierarchical composition
let ((a, b), (c, d)) = try_join!(
async { try_join!(fetch_a(), fetch_b()) },
async { try_join!(fetch_c(), fetch_d()) },
)?;
println!("Results: {}, {}, {}, {}", a, b, c, d);
Ok(())
}
// Nested try_join! compositions work naturally
// Error propagation still short-circuits at any leveltry_join! can be nested for hierarchical composition of concurrent operations.
use futures::future::try_join;
use std::error::Error;
struct User {
id: u32,
name: String,
}
struct Account {
id: u32,
balance: f64,
}
struct Permissions {
user_id: u32,
roles: Vec<String>,
}
async fn create_user_transaction(
name: String,
initial_balance: f64,
) -> Result<(User, Account, Permissions), Box<dyn Error>> {
async fn insert_user(name: String) -> Result<User, Box<dyn Error>> {
// Simulate DB insert
Ok(User { id: 1, name })
}
async fn create_account(user_id: u32, balance: f64) -> Result<Account, Box<dyn Error>> {
// Simulate DB insert
Ok(Account { id: user_id, balance })
}
async fn assign_default_permissions(user_id: u32) -> Result<Permissions, Box<dyn Error>> {
// Simulate DB insert
Ok(Permissions {
user_id,
roles: vec!["user".to_string()],
})
}
// All three operations run concurrently
// If any fails, the error is returned immediately
// Note: This is NOT a transaction - see below
let user = insert_user(name.clone()).await?;
// Now run dependent operations concurrently
let (account, permissions) = try_join!(
create_account(user.id, initial_balance),
assign_default_permissions(user.id),
)?;
Ok((user, account, permissions))
}
// Important: try_join! doesn't provide transaction semantics
// If create_account succeeds but assign_default_permissions fails,
// you have partial state. Use proper transactions for atomic operations.try_join! provides concurrent execution, not atomicity. Use database transactions for atomic operations.
use futures::future::try_join;
use std::error::Error;
struct UserProfile {
user: UserData,
posts: Vec<Post>,
followers: Vec<Follower>,
}
struct UserData {
id: u32,
name: String,
}
struct Post {
id: u32,
title: String,
}
struct Follower {
id: u32,
name: String,
}
async fn fetch_user_profile(user_id: u32) -> Result<UserProfile, Box<dyn Error>> {
async fn get_user(id: u32) -> Result<UserData, Box<dyn Error>> {
Ok(UserData { id, name: "Alice".to_string() })
}
async fn get_posts(id: u32) -> Result<Vec<Post>, Box<dyn Error>> {
Ok(vec![
Post { id: 1, title: "First".to_string() },
Post { id: 2, title: "Second".to_string() },
])
}
async fn get_followers(id: u32) -> Result<Vec<Follower>, Box<dyn Error>> {
Ok(vec![
Follower { id: 1, name: "Bob".to_string() },
])
}
// Fetch all profile components concurrently
// If any API call fails, the whole profile fetch fails
let (user, posts, followers) = try_join!(
get_user(user_id),
get_posts(user_id),
get_followers(user_id),
)?;
Ok(UserProfile { user, posts, followers })
}
// All three API calls run concurrently
// Total latency is max(latencies), not sum(latencies)
// If any fails, the error propagates immediatelytry_join! is ideal for aggregating multiple independent API calls where all must succeed.
use futures::future::try_join;
use std::error::Error;
async fn with_fallback() -> Result<(String, String), Box<dyn Error>> {
async fn primary_source() -> Result<String, Box<dyn Error>> {
// Might fail
Err("primary failed".into())
}
async fn secondary_source() -> Result<String, Box<dyn Error>> {
// Fallback
Ok("secondary data".to_string())
}
async fn required_data() -> Result<String, Box<dyn Error>> {
Ok("required data".to_string())
}
// Handle failure for one future but require another
let (primary_or_fallback, required) = try_join!(
async {
primary_source().await.or_else(|_| secondary_source().await)
},
required_data(),
)?;
println!("Got: {}, {}", primary_or_fallback, required);
Ok(())
}
// Pattern: use .or_else() inside try_join! for per-future fallback
// This allows graceful degradation while still using try_join!Use .or_else() on individual futures before try_join! for per-future error recovery.
use futures::future::try_join;
use tokio::time::{timeout, Duration};
use std::error::Error;
async fn with_timeout() -> Result<(String, String), Box<dyn Error>> {
async fn slow_operation() -> Result<String, Box<dyn Error>> {
tokio::time::sleep(Duration::from_secs(10)).await;
Ok("slow result".to_string())
}
async fn fast_operation() -> Result<String, Box<dyn Error>> {
Ok("fast result".to_string())
}
// Apply timeout to individual futures
let (slow, fast) = try_join!(
async {
timeout(Duration::from_secs(1), slow_operation())
.await
.map_err(|_| "timeout".into())
.and_then(|r| r)
},
fast_operation(),
)?;
Ok((slow, fast))
}
// Or apply timeout to the whole try_join!:
async fn with_global_timeout() -> Result<(String, String), Box<dyn Error>> {
async fn op_a() -> Result<String, Box<dyn Error>> {
tokio::time::sleep(Duration::from_secs(10)).await;
Ok("a".to_string())
}
async fn op_b() -> Result<String, Box<dyn Error>> {
Ok("b".to_string())
}
let result = timeout(
Duration::from_secs(1),
try_join!(op_a(), op_b())
).await;
match result {
Ok(Ok((a, b))) => Ok((a, b)),
Ok(Err(e)) => Err(e),
Err(_) => Err("global timeout".into()),
}
}Wrap futures with timeout to add time constraints to try_join! operations.
Error propagation model:
// try_join! behavior:
// 1. Starts all futures concurrently
// 2. Polls each future fairly
// 3. If any returns Err, returns Err immediately
// 4. If all return Ok, returns Ok((T1, T2, ...))
// Comparison:
// join! - waits for ALL futures, returns (Result<T1, E1>, Result<T2, E2>, ...)
// try_join! - short-circuits on first Err, returns Result<(T1, T2, ...), E>Key characteristics:
| Aspect | Behavior |
|--------|----------|
| Concurrency | All futures run concurrently |
| Short-circuit | Returns immediately on first error |
| Error type | All futures must have compatible error types |
| Cancellation | Doesn't cancel running futures on error |
| Ordering | Polls futures in macro argument order |
| Result type | Result<(T1, T2, ...), E> |
When to use try_join!:
When not to use:
await)join!)tokio::select!)Key insight: try_join! brings the elegance of ? error propagation to concurrent async operations. It executes all futures simultaneously and returns as soon as any fails, making it ideal for aggregating independent operations where partial success is meaningless. The macro requires compatible error types and doesn't provide cancellation semanticsāfutures that have started continue running even after one fails. For atomic transactions or when you need to cancel other operations on failure, use proper transaction handling or tokio::select! with cancellation tokens instead.