What is the purpose of futures::stream::TryStreamExt::map_err for transforming stream errors?

TryStreamExt::map_err transforms the error type of a fallible stream while preserving the stream's item type, allowing error transformation at the stream level before downstream consumers handle errors. This enables centralized error conversion, consistent error handling, and better error context enrichment without affecting the stream's success values.

The TryStream Trait and TryStreamExt

use futures::stream::{Stream, TryStream};
use std::task::{Context, Poll};
 
// TryStream is a stream that can fail
// Items are wrapped in Result<T, E>
pub trait TryStream: Stream<Item = Result<T, E>> {
    // Associated types and methods for fallible streams
}
 
// TryStreamExt provides combinators for TryStream
pub trait TryStreamExt: TryStream {
    fn map_err<E2, F>(self, f: F) -> MapErr<Self, F>
    where
        F: FnMut(Self::Error) -> E2,
        Self: Sized;
    
    // ... other methods
}

TryStreamExt extends TryStream with combinators that work with fallible streams.

Basic map_err Usage

use futures::stream::{self, TryStreamExt};
 
async fn basic_usage() {
    // Stream that can fail
    let stream = stream::iter(vec
![Ok(1), Ok(2), Err("failed"), Ok(3)]);
    
    // Transform errors to a different type
    let mapped = stream.map_err(|e| format!("error: {}", e));
    
    // Now errors are String instead of &str
    // Items remain i32
    futures::pin_mut!(mapped);
    
    while let Some(result) = mapped.next().await {
        match result {
            Ok(n) => println!("item: {}", n),
            Err(e) => println!("error: {}", e),
        }
    }
}

map_err transforms the error type while leaving items unchanged.

Error Type Conversion

use futures::stream::{self, TryStreamExt};
use std::io;
 
async fn convert_error_types() {
    // Stream with io::Error
    let stream: stream::Iter<std::vec::IntoIter<Result<i32, io::Error>>> = 
        stream::iter(vec
![Ok(1), Err(io::Error::new(io::ErrorKind::Other, "io error"))]);
    
    // Convert to String errors
    let string_errors = stream.map_err(|e| e.to_string());
    
    // Convert to custom error type
    let custom_errors = string_errors.map_err(|e| MyError::from_message(e));
    
    // Convert to anyhow::Error
    let anyhow_errors = custom_errors.map_err(anyhow::Error::from);
}
 
#[derive(Debug)]
struct MyError {
    message: String,
}
 
impl MyError {
    fn from_message(msg: String) -> Self {
        MyError { message: msg }
    }
}

map_err enables converting between error types to match downstream expectations.

Preserving Success Values

use futures::stream::{self, TryStreamExt};
 
async fn preserve_items() {
    let stream = stream::iter(vec
![Ok(1), Ok(2), Ok(3), Err("error")]);
    
    // map_err only touches errors
    let mapped = stream.map_err(|e| format!("wrapped: {}", e));
    
    // Success values pass through unchanged
    // 1, 2, 3 remain as i32
    // Only "error" becomes "wrapped: error"
    
    let results: Vec<_> = mapped.collect::<Vec<_>>().await;
    // [Ok(1), Ok(2), Ok(3), Err("wrapped: error")]
}

The item type remains unchanged; only the error type is transformed.

Chaining with Other TryStreamExt Methods

use futures::stream::{self, TryStreamExt};
 
async fn chaining_methods() {
    let stream = stream::iter(vec
![Ok(1), Ok(2), Err("error"), Ok(3)]);
    
    let processed = stream
        .map_err(|e| format!("context: {}", e))
        .map_ok(|n| n * 2)
        .and_then(|n| async move { Ok(n + 10) })
        .map_err(|e| format!("processed: {}", e));
    
    // Error transformations compose naturally
    // Multiple map_err calls layer transformations
}

map_err integrates seamlessly with other TryStreamExt combinators.

Adding Error Context

use futures::stream::{self, TryStreamExt};
use std::io;
 
async fn add_context() {
    let stream = stream::iter(vec
![
        Ok(1),
        Err(io::Error::new(io::ErrorKind::NotFound, "file missing"))
    ]);
    
    // Add context to errors
    let contextualized = stream.map_err(|e| {
        format!("Database operation failed: {}", e)
    });
    
    // Use anyhow for context
    let anyhow_context = stream
        .map_err(|e| anyhow::Error::from(e))
        .map_err(|e| e.context("during database stream processing"));
}

map_err can enrich errors with additional context for debugging.

Error Normalization

use futures::stream::{self, TryStreamExt};
 
#[derive(Debug)]
enum AppError {
    Io(std::io::Error),
    Parse(String),
    Database(String),
}
 
async fn normalize_errors() {
    // Different error types from different sources
    let io_stream = stream::iter(vec
![Ok::<_, std::io::Error>(1)]);
    let parse_stream = stream::iter(vec
![Ok::<_, String>(2)]);
    
    // Normalize to AppError
    let normalized_io = io_stream.map_err(AppError::Io);
    let normalized_parse = parse_stream.map_err(AppError::Parse);
    
    // Both streams now have Result<T, AppError>
    // Can be combined, collected, etc.
}

map_err enables normalizing heterogeneous error types to a common type.

Interaction with try_collect

use futures::stream::{self, TryStreamExt};
 
async fn try_collect_example() {
    let stream = stream::iter(vec
![Ok(1), Ok(2), Err("failure")]);
    
    // try_collect stops on first error
    // map_err transforms error before collection
    let result: Result<Vec<i32>, String> = stream
        .map_err(|e| format!("error: {}", e))
        .try_collect()
        .await;
    
    // Returns Err("error: failure")
}

map_err transforms errors before try_collect encounters them.

Comparison with map_ok

use futures::stream::{self, TryStreamExt};
 
async fn map_ok_vs_map_err() {
    let stream = stream::iter(vec
![Ok(1), Ok(2), Err("error")]);
    
    // map_ok transforms success values
    let transformed_ok = stream.clone().map_ok(|n| n * 2);
    // Items: Ok(2), Ok(4), Err("error")
    
    // map_err transforms error values
    let transformed_err = stream.clone().map_err(|e| format!("wrapped: {}", e));
    // Items: Ok(1), Ok(2), Err("wrapped: error")
    
    // Both can be chained
    let both = stream
        .map_ok(|n| n * 2)
        .map_err(|e| format!("wrapped: {}", e));
    // Items: Ok(2), Ok(4), Err("wrapped: error")
}

map_ok and map_err are complementary; one transforms items, the other errors.

Error Recovery Patterns

use futures::stream::{self, TryStreamExt};
 
async fn error_recovery() {
    let stream = stream::iter(vec
![Ok(1), Err("transient"), Ok(2), Err("fatal"), Ok(3)]);
    
    // Convert some errors to recoverable states
    // This is more sophisticated than map_err alone
    
    // Simple pattern: mark errors as recoverable
    let marked = stream.map_err(|e| {
        if e == "transient" {
            RecoverableError::Transient(e.to_string())
        } else {
            RecoverableError::Fatal(e.to_string())
        }
    });
    
    // Downstream can decide whether to continue
}
 
#[derive(Debug)]
enum RecoverableError {
    Transient(String),
    Fatal(String),
}

map_err can classify errors for downstream recovery strategies.

Handling Multiple Error Sources

use futures::stream::{self, TryStreamExt, TryStream};
 
async fn multiple_sources() {
    // Stream of Results with different error types
    // (Note: this requires same error type for combination)
    
    // Pattern: Union type for multiple error sources
    let stream1 = stream::iter(vec
![Ok::<_, io::Error>(1)])
        .map_err(StreamError::Io);
    
    let stream2 = stream::iter(vec
![Ok::<_, String>(2)])
        .map_err(StreamError::Parse);
    
    // Both now have Result<T, StreamError>
    // Can be combined with select or chain
}
 
use std::io;
#[derive(Debug)]
enum StreamError {
    Io(io::Error),
    Parse(String),
}

map_err unifies error types from multiple stream sources.

Early Error Transformation

use futures::stream::{self, TryStreamExt};
 
async fn early_transformation() {
    // Transform errors early in the pipeline
    let stream = stream::iter(vec
![Ok(1), Err("raw error")]);
    
    let early_transform = stream
        .map_err(|e| AppError::with_context(e, "early in pipeline"))
        .map_ok(|n| n * 2)
        .and_then(|n| async move {
            if n > 10 {
                Err(AppError::with_context("too large", "validation"))
            } else {
                Ok(n)
            }
        });
    
    // All errors are AppError, consistent handling
}
 
#[derive(Debug)]
struct AppError {
    message: String,
    context: String,
}
 
impl AppError {
    fn with_context(msg: impl Into<String>, ctx: &str) -> Self {
        AppError {
            message: msg.into(),
            context: ctx.to_string(),
        }
    }
}

Transform early to establish consistent error types throughout the pipeline.

Integration with try_for_each

use futures::stream::{self, TryStreamExt};
 
async fn try_for_each_example() {
    let stream = stream::iter(vec
![Ok(1), Ok(2), Err("process error")]);
    
    // try_for_each stops on error
    let result = stream
        .map_err(|e| format!("stream error: {}", e))
        .try_for_each(|n| async move {
            println!("processing: {}", n);
            Ok::<_, String>(())
        })
        .await;
    
    // Result includes transformed error
}

map_err ensures errors are transformed before try_for_each processes them.

Performance Considerations

use futures::stream::{self, TryStreamExt};
 
async fn performance() {
    // map_err is lazy - closure only called on error
    let stream = stream::iter(vec
![Ok(1), Ok(2), Ok(3)]);
    
    let transformed = stream.map_err(|e| {
        // This closure is never called if no errors
        expensive_error_transformation(e)
    });
    
    // No overhead for successful items
    // Only incurs cost when errors occur
}
 
fn expensive_error_transformation(e: &str) -> String {
    // Could be expensive, but only runs on error
    format!("detailed error with context: {}", e)
}

map_err has zero overhead for successful items; the closure only runs on errors.

Error Logging Side Effects

use futures::stream::{self, TryStreamExt};
 
async fn logging_side_effects() {
    let stream = stream::iter(vec
![Ok(1), Err("error1"), Ok(2), Err("error2")]);
    
    let logged = stream.map_err(|e| {
        // Log error before transforming
        eprintln!("Stream error occurred: {}", e);
        e.to_string()
    });
    
    // Errors are both logged and transformed
    // Use for observability and debugging
}

map_err can perform side effects like logging while transforming errors.

Type Signature Analysis

use futures::stream::TryStreamExt;
use futures::stream::{Stream, TryStream};
 
// map_err type signature:
fn map_err<Self, E2, F>(self, f: F) -> MapErr<Self, F>
where
    F: FnMut(Self::Error) -> E2,
    Self: TryStream + Sized,
{
    // Implementation returns MapErr wrapper
}
 
// MapErr is a stream wrapper:
pub struct MapErr<St, F> {
    stream: St,
    f: F,
}
 
// MapErr implements Stream:
impl<St, F, E2> Stream for MapErr<St, F>
where
    St: TryStream,
    F: FnMut(St::Error) -> E2,
{
    type Item = Result<St::Ok, E2>;
    
    fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>)
        -> std::task::Poll<Option<Self::Item>>
    {
        // Poll inner stream, apply f to errors
    }
}

MapErr wraps the stream and transforms errors on each poll.

Comparison with inspect_err

use futures::stream::{self, TryStreamExt};
 
async fn inspect_err_comparison() {
    let stream = stream::iter(vec
![Ok(1), Err("error")]);
    
    // map_err: Transform error type
    let transformed = stream.clone()
        .map_err(|e| format!("wrapped: {}", e));
    // Type changes from Result<T, &str> to Result<T, String>
    
    // inspect_err: Look at errors without transforming
    let inspected = stream.clone()
        .inspect_err(|e| println!("saw error: {}", e));
    // Type stays Result<T, &str>
    
    // Use map_err to transform, inspect_err to observe
}

inspect_err observes errors without changing types; map_err transforms them.

Handling Fatal vs Non-Fatal Errors

use futures::stream::{self, TryStreamExt};
 
#[derive(Debug)]
enum ErrorClassification {
    Fatal(String),
    Recoverable(String),
}
 
async fn error_classification() {
    let stream = stream::iter(vec
![
        Ok(1),
        Err("connection timeout"),
        Ok(2),
        Err("out of memory"),
        Ok(3),
    ]);
    
    // Classify errors at the stream level
    let classified = stream.map_err(|e| {
        if e.contains("timeout") || e.contains("retry") {
            ErrorClassification::Recoverable(e.to_string())
        } else {
            ErrorClassification::Fatal(e.to_string())
        }
    });
    
    // Downstream can decide how to handle each classification
}

map_err can classify errors for downstream decision-making.

Complete Example: Database Stream Processing

use futures::stream::{self, TryStreamExt};
use std::io;
 
#[derive(Debug)]
enum DbError {
    Connection(String),
    Query(String),
    Timeout(String),
}
 
async fn database_stream_example() {
    // Simulated database query stream
    let query_results: Vec<Result<i32, io::Error>> = vec
![
        Ok(1),
        Ok(2),
        Err(io::Error::new(io::ErrorKind::TimedOut, "timeout")),
        Ok(3),
        Err(io::Error::new(io::ErrorKind::ConnectionReset, "connection lost")),
    ];
    
    let stream = stream::iter(query_results);
    
    let processed = stream
        // Convert io::Error to domain-specific DbError
        .map_err(|e| {
            match e.kind() {
                io::ErrorKind::TimedOut => DbError::Timeout(e.to_string()),
                io::ErrorKind::ConnectionReset | io::ErrorKind::ConnectionAborted => {
                    DbError::Connection(e.to_string())
                }
                _ => DbError::Query(e.to_string()),
            }
        })
        // Process successful rows
        .map_ok(|row| {
            println!("Processing row: {}", row);
            row * 10
        })
        // Add processing context to errors
        .map_err(|e| {
            format!("Database error during batch processing: {:?}", e)
        });
    
    // Collect results
    let results: Vec<_> = processed.collect().await;
    for result in results {
        match result {
            Ok(value) => println!("Result: {}", value),
            Err(e) => eprintln!("Error: {}", e),
        }
    }
}
 
fn main() {
    futures::executor::block_on(database_stream_example());
}

Comparison Table

fn comparison() {
    // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    // β”‚ Method              β”‚ Purpose              β”‚ Type Change      β”‚ Side Effects β”‚
    // β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    // β”‚ map_err              β”‚ Transform error type β”‚ Yes (E β†’ E2)    β”‚ Yes, allowedβ”‚
    // β”‚ map_ok               β”‚ Transform item type  β”‚ Yes (T β†’ T2)    β”‚ No          β”‚
    // β”‚ inspect_err          β”‚ Observe errors       β”‚ No              β”‚ Yes         β”‚
    // β”‚ inspect_ok           β”‚ Observe items        β”‚ No              β”‚ Yes         β”‚
    // β”‚ and_then             β”‚ Chain fallible async β”‚ Item + error    β”‚ No          β”‚
    // β”‚ or_else              β”‚ Recover from errors  β”‚ Yes             β”‚ No          β”‚
    // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
}

Summary

fn summary() {
    // β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    // β”‚ Use Case                  β”‚ map_err Benefit                             β”‚
    // β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    // β”‚ Error type conversion     β”‚ Convert to downstream-compatible types      β”‚
    // β”‚ Error normalization       β”‚ Unify multiple error sources                β”‚
    // β”‚ Context enrichment       β”‚ Add context for debugging                   β”‚
    // β”‚ Error classification      β”‚ Mark recoverable vs fatal                   β”‚
    // β”‚ Logging/observability     β”‚ Log errors as they pass through             β”‚
    // β”‚ Early transformation      β”‚ Establish consistent types early            β”‚
    // β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    
    // Key points:
    // 1. map_err transforms the error type while preserving items
    // 2. The closure is only called for errors, no overhead for success
    // 3. Works seamlessly with other TryStreamExt combinators
    // 4. Can perform side effects like logging
    // 5. Enables error type normalization across stream sources
    // 6. Chain multiple map_err calls to layer transformations
    // 7. Transform errors early for consistent downstream handling
    // 8. map_err vs inspect_err: transform vs observe
    // 9. Integrates with try_collect, try_for_each, etc.
    // 10. Part of the TryStreamExt trait for fallible streams
}

Key insight: TryStreamExt::map_err is the stream-level equivalent of Result::map_err, enabling error transformation in asynchronous stream pipelines. While map_ok transforms successful values, map_err transforms failures without affecting the stream's item type. This is essential for normalizing errors from heterogeneous sources, enriching errors with context for debugging, and converting between error types to match downstream expectations. The transformation happens lazilyβ€”only when an error actually occursβ€”so there's zero overhead for successful stream elements. map_err integrates naturally with other TryStreamExt methods like map_ok, and_then, try_collect, and try_for_each, allowing you to build comprehensive stream processing pipelines where error handling is a first-class concern alongside item processing. Use map_err when you need to transform error types or add context; use inspect_err when you only need to observe errors without changing their type.