What is the purpose of futures::stream::TryStreamExt::map_err for transforming stream errors?
TryStreamExt::map_err transforms the error type of a fallible stream while preserving the stream's item type, allowing error transformation at the stream level before downstream consumers handle errors. This enables centralized error conversion, consistent error handling, and better error context enrichment without affecting the stream's success values.
The TryStream Trait and TryStreamExt
use futures::stream::{Stream, TryStream};
use std::task::{Context, Poll};
// TryStream is a stream that can fail
// Items are wrapped in Result<T, E>
pub trait TryStream: Stream<Item = Result<T, E>> {
// Associated types and methods for fallible streams
}
// TryStreamExt provides combinators for TryStream
pub trait TryStreamExt: TryStream {
fn map_err<E2, F>(self, f: F) -> MapErr<Self, F>
where
F: FnMut(Self::Error) -> E2,
Self: Sized;
// ... other methods
}TryStreamExt extends TryStream with combinators that work with fallible streams.
Basic map_err Usage
use futures::stream::{self, TryStreamExt};
async fn basic_usage() {
// Stream that can fail
let stream = stream::iter(vec
![Ok(1), Ok(2), Err("failed"), Ok(3)]);
// Transform errors to a different type
let mapped = stream.map_err(|e| format!("error: {}", e));
// Now errors are String instead of &str
// Items remain i32
futures::pin_mut!(mapped);
while let Some(result) = mapped.next().await {
match result {
Ok(n) => println!("item: {}", n),
Err(e) => println!("error: {}", e),
}
}
}map_err transforms the error type while leaving items unchanged.
Error Type Conversion
use futures::stream::{self, TryStreamExt};
use std::io;
async fn convert_error_types() {
// Stream with io::Error
let stream: stream::Iter<std::vec::IntoIter<Result<i32, io::Error>>> =
stream::iter(vec
![Ok(1), Err(io::Error::new(io::ErrorKind::Other, "io error"))]);
// Convert to String errors
let string_errors = stream.map_err(|e| e.to_string());
// Convert to custom error type
let custom_errors = string_errors.map_err(|e| MyError::from_message(e));
// Convert to anyhow::Error
let anyhow_errors = custom_errors.map_err(anyhow::Error::from);
}
#[derive(Debug)]
struct MyError {
message: String,
}
impl MyError {
fn from_message(msg: String) -> Self {
MyError { message: msg }
}
}map_err enables converting between error types to match downstream expectations.
Preserving Success Values
use futures::stream::{self, TryStreamExt};
async fn preserve_items() {
let stream = stream::iter(vec
![Ok(1), Ok(2), Ok(3), Err("error")]);
// map_err only touches errors
let mapped = stream.map_err(|e| format!("wrapped: {}", e));
// Success values pass through unchanged
// 1, 2, 3 remain as i32
// Only "error" becomes "wrapped: error"
let results: Vec<_> = mapped.collect::<Vec<_>>().await;
// [Ok(1), Ok(2), Ok(3), Err("wrapped: error")]
}The item type remains unchanged; only the error type is transformed.
Chaining with Other TryStreamExt Methods
use futures::stream::{self, TryStreamExt};
async fn chaining_methods() {
let stream = stream::iter(vec
![Ok(1), Ok(2), Err("error"), Ok(3)]);
let processed = stream
.map_err(|e| format!("context: {}", e))
.map_ok(|n| n * 2)
.and_then(|n| async move { Ok(n + 10) })
.map_err(|e| format!("processed: {}", e));
// Error transformations compose naturally
// Multiple map_err calls layer transformations
}map_err integrates seamlessly with other TryStreamExt combinators.
Adding Error Context
use futures::stream::{self, TryStreamExt};
use std::io;
async fn add_context() {
let stream = stream::iter(vec
![
Ok(1),
Err(io::Error::new(io::ErrorKind::NotFound, "file missing"))
]);
// Add context to errors
let contextualized = stream.map_err(|e| {
format!("Database operation failed: {}", e)
});
// Use anyhow for context
let anyhow_context = stream
.map_err(|e| anyhow::Error::from(e))
.map_err(|e| e.context("during database stream processing"));
}map_err can enrich errors with additional context for debugging.
Error Normalization
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
enum AppError {
Io(std::io::Error),
Parse(String),
Database(String),
}
async fn normalize_errors() {
// Different error types from different sources
let io_stream = stream::iter(vec
![Ok::<_, std::io::Error>(1)]);
let parse_stream = stream::iter(vec
![Ok::<_, String>(2)]);
// Normalize to AppError
let normalized_io = io_stream.map_err(AppError::Io);
let normalized_parse = parse_stream.map_err(AppError::Parse);
// Both streams now have Result<T, AppError>
// Can be combined, collected, etc.
}map_err enables normalizing heterogeneous error types to a common type.
Interaction with try_collect
use futures::stream::{self, TryStreamExt};
async fn try_collect_example() {
let stream = stream::iter(vec
![Ok(1), Ok(2), Err("failure")]);
// try_collect stops on first error
// map_err transforms error before collection
let result: Result<Vec<i32>, String> = stream
.map_err(|e| format!("error: {}", e))
.try_collect()
.await;
// Returns Err("error: failure")
}map_err transforms errors before try_collect encounters them.
Comparison with map_ok
use futures::stream::{self, TryStreamExt};
async fn map_ok_vs_map_err() {
let stream = stream::iter(vec
![Ok(1), Ok(2), Err("error")]);
// map_ok transforms success values
let transformed_ok = stream.clone().map_ok(|n| n * 2);
// Items: Ok(2), Ok(4), Err("error")
// map_err transforms error values
let transformed_err = stream.clone().map_err(|e| format!("wrapped: {}", e));
// Items: Ok(1), Ok(2), Err("wrapped: error")
// Both can be chained
let both = stream
.map_ok(|n| n * 2)
.map_err(|e| format!("wrapped: {}", e));
// Items: Ok(2), Ok(4), Err("wrapped: error")
}map_ok and map_err are complementary; one transforms items, the other errors.
Error Recovery Patterns
use futures::stream::{self, TryStreamExt};
async fn error_recovery() {
let stream = stream::iter(vec
![Ok(1), Err("transient"), Ok(2), Err("fatal"), Ok(3)]);
// Convert some errors to recoverable states
// This is more sophisticated than map_err alone
// Simple pattern: mark errors as recoverable
let marked = stream.map_err(|e| {
if e == "transient" {
RecoverableError::Transient(e.to_string())
} else {
RecoverableError::Fatal(e.to_string())
}
});
// Downstream can decide whether to continue
}
#[derive(Debug)]
enum RecoverableError {
Transient(String),
Fatal(String),
}map_err can classify errors for downstream recovery strategies.
Handling Multiple Error Sources
use futures::stream::{self, TryStreamExt, TryStream};
async fn multiple_sources() {
// Stream of Results with different error types
// (Note: this requires same error type for combination)
// Pattern: Union type for multiple error sources
let stream1 = stream::iter(vec
![Ok::<_, io::Error>(1)])
.map_err(StreamError::Io);
let stream2 = stream::iter(vec
![Ok::<_, String>(2)])
.map_err(StreamError::Parse);
// Both now have Result<T, StreamError>
// Can be combined with select or chain
}
use std::io;
#[derive(Debug)]
enum StreamError {
Io(io::Error),
Parse(String),
}map_err unifies error types from multiple stream sources.
Early Error Transformation
use futures::stream::{self, TryStreamExt};
async fn early_transformation() {
// Transform errors early in the pipeline
let stream = stream::iter(vec
![Ok(1), Err("raw error")]);
let early_transform = stream
.map_err(|e| AppError::with_context(e, "early in pipeline"))
.map_ok(|n| n * 2)
.and_then(|n| async move {
if n > 10 {
Err(AppError::with_context("too large", "validation"))
} else {
Ok(n)
}
});
// All errors are AppError, consistent handling
}
#[derive(Debug)]
struct AppError {
message: String,
context: String,
}
impl AppError {
fn with_context(msg: impl Into<String>, ctx: &str) -> Self {
AppError {
message: msg.into(),
context: ctx.to_string(),
}
}
}Transform early to establish consistent error types throughout the pipeline.
Integration with try_for_each
use futures::stream::{self, TryStreamExt};
async fn try_for_each_example() {
let stream = stream::iter(vec
![Ok(1), Ok(2), Err("process error")]);
// try_for_each stops on error
let result = stream
.map_err(|e| format!("stream error: {}", e))
.try_for_each(|n| async move {
println!("processing: {}", n);
Ok::<_, String>(())
})
.await;
// Result includes transformed error
}map_err ensures errors are transformed before try_for_each processes them.
Performance Considerations
use futures::stream::{self, TryStreamExt};
async fn performance() {
// map_err is lazy - closure only called on error
let stream = stream::iter(vec
![Ok(1), Ok(2), Ok(3)]);
let transformed = stream.map_err(|e| {
// This closure is never called if no errors
expensive_error_transformation(e)
});
// No overhead for successful items
// Only incurs cost when errors occur
}
fn expensive_error_transformation(e: &str) -> String {
// Could be expensive, but only runs on error
format!("detailed error with context: {}", e)
}map_err has zero overhead for successful items; the closure only runs on errors.
Error Logging Side Effects
use futures::stream::{self, TryStreamExt};
async fn logging_side_effects() {
let stream = stream::iter(vec
![Ok(1), Err("error1"), Ok(2), Err("error2")]);
let logged = stream.map_err(|e| {
// Log error before transforming
eprintln!("Stream error occurred: {}", e);
e.to_string()
});
// Errors are both logged and transformed
// Use for observability and debugging
}map_err can perform side effects like logging while transforming errors.
Type Signature Analysis
use futures::stream::TryStreamExt;
use futures::stream::{Stream, TryStream};
// map_err type signature:
fn map_err<Self, E2, F>(self, f: F) -> MapErr<Self, F>
where
F: FnMut(Self::Error) -> E2,
Self: TryStream + Sized,
{
// Implementation returns MapErr wrapper
}
// MapErr is a stream wrapper:
pub struct MapErr<St, F> {
stream: St,
f: F,
}
// MapErr implements Stream:
impl<St, F, E2> Stream for MapErr<St, F>
where
St: TryStream,
F: FnMut(St::Error) -> E2,
{
type Item = Result<St::Ok, E2>;
fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>)
-> std::task::Poll<Option<Self::Item>>
{
// Poll inner stream, apply f to errors
}
}MapErr wraps the stream and transforms errors on each poll.
Comparison with inspect_err
use futures::stream::{self, TryStreamExt};
async fn inspect_err_comparison() {
let stream = stream::iter(vec
![Ok(1), Err("error")]);
// map_err: Transform error type
let transformed = stream.clone()
.map_err(|e| format!("wrapped: {}", e));
// Type changes from Result<T, &str> to Result<T, String>
// inspect_err: Look at errors without transforming
let inspected = stream.clone()
.inspect_err(|e| println!("saw error: {}", e));
// Type stays Result<T, &str>
// Use map_err to transform, inspect_err to observe
}inspect_err observes errors without changing types; map_err transforms them.
Handling Fatal vs Non-Fatal Errors
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
enum ErrorClassification {
Fatal(String),
Recoverable(String),
}
async fn error_classification() {
let stream = stream::iter(vec
![
Ok(1),
Err("connection timeout"),
Ok(2),
Err("out of memory"),
Ok(3),
]);
// Classify errors at the stream level
let classified = stream.map_err(|e| {
if e.contains("timeout") || e.contains("retry") {
ErrorClassification::Recoverable(e.to_string())
} else {
ErrorClassification::Fatal(e.to_string())
}
});
// Downstream can decide how to handle each classification
}map_err can classify errors for downstream decision-making.
Complete Example: Database Stream Processing
use futures::stream::{self, TryStreamExt};
use std::io;
#[derive(Debug)]
enum DbError {
Connection(String),
Query(String),
Timeout(String),
}
async fn database_stream_example() {
// Simulated database query stream
let query_results: Vec<Result<i32, io::Error>> = vec
![
Ok(1),
Ok(2),
Err(io::Error::new(io::ErrorKind::TimedOut, "timeout")),
Ok(3),
Err(io::Error::new(io::ErrorKind::ConnectionReset, "connection lost")),
];
let stream = stream::iter(query_results);
let processed = stream
// Convert io::Error to domain-specific DbError
.map_err(|e| {
match e.kind() {
io::ErrorKind::TimedOut => DbError::Timeout(e.to_string()),
io::ErrorKind::ConnectionReset | io::ErrorKind::ConnectionAborted => {
DbError::Connection(e.to_string())
}
_ => DbError::Query(e.to_string()),
}
})
// Process successful rows
.map_ok(|row| {
println!("Processing row: {}", row);
row * 10
})
// Add processing context to errors
.map_err(|e| {
format!("Database error during batch processing: {:?}", e)
});
// Collect results
let results: Vec<_> = processed.collect().await;
for result in results {
match result {
Ok(value) => println!("Result: {}", value),
Err(e) => eprintln!("Error: {}", e),
}
}
}
fn main() {
futures::executor::block_on(database_stream_example());
}Comparison Table
fn comparison() {
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// β Method β Purpose β Type Change β Side Effects β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
// β map_err β Transform error type β Yes (E β E2) β Yes, allowedβ
// β map_ok β Transform item type β Yes (T β T2) β No β
// β inspect_err β Observe errors β No β Yes β
// β inspect_ok β Observe items β No β Yes β
// β and_then β Chain fallible async β Item + error β No β
// β or_else β Recover from errors β Yes β No β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
}Summary
fn summary() {
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// β Use Case β map_err Benefit β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
// β Error type conversion β Convert to downstream-compatible types β
// β Error normalization β Unify multiple error sources β
// β Context enrichment β Add context for debugging β
// β Error classification β Mark recoverable vs fatal β
// β Logging/observability β Log errors as they pass through β
// β Early transformation β Establish consistent types early β
// βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
// Key points:
// 1. map_err transforms the error type while preserving items
// 2. The closure is only called for errors, no overhead for success
// 3. Works seamlessly with other TryStreamExt combinators
// 4. Can perform side effects like logging
// 5. Enables error type normalization across stream sources
// 6. Chain multiple map_err calls to layer transformations
// 7. Transform errors early for consistent downstream handling
// 8. map_err vs inspect_err: transform vs observe
// 9. Integrates with try_collect, try_for_each, etc.
// 10. Part of the TryStreamExt trait for fallible streams
}Key insight: TryStreamExt::map_err is the stream-level equivalent of Result::map_err, enabling error transformation in asynchronous stream pipelines. While map_ok transforms successful values, map_err transforms failures without affecting the stream's item type. This is essential for normalizing errors from heterogeneous sources, enriching errors with context for debugging, and converting between error types to match downstream expectations. The transformation happens lazilyβonly when an error actually occursβso there's zero overhead for successful stream elements. map_err integrates naturally with other TryStreamExt methods like map_ok, and_then, try_collect, and try_for_each, allowing you to build comprehensive stream processing pipelines where error handling is a first-class concern alongside item processing. Use map_err when you need to transform error types or add context; use inspect_err when you only need to observe errors without changing their type.
