What is the purpose of futures::stream::TryStreamExt::map_err for stream-level error transformation?
TryStreamExt::map_err transforms the error type of a fallible stream (Stream<Item = Result<T, E>>) into a different error type, returning a new stream that applies the transformation to each error while passing through successful values unchanged. This enables error type adaptation at the stream levelāconverting domain-specific errors into unified error types, enriching errors with context, or wrapping errors for compatibility with APIs that expect specific error types. Unlike collecting into a Result first, map_err operates on streams lazily, applying transformations element-by-element as the stream is polled.
Basic Error Transformation
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
enum DbError {
ConnectionFailed,
QueryFailed(String),
}
#[derive(Debug)]
enum AppError {
Database(DbError),
Timeout,
}
impl From<DbError> for AppError {
fn from(err: DbError) -> Self {
AppError::Database(err)
}
}
async fn example() -> Result<(), AppError> {
let db_results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Ok(2),
Err(DbError::QueryFailed("timeout".to_string())),
Ok(3),
];
// Convert stream of Result<i32, DbError> to Result<i32, AppError>
let mut stream = stream::iter(db_results)
.map_err(|e| AppError::Database(e));
while let Some(result) = stream.next().await {
match result {
Ok(val) => println!("Got value: {}", val),
Err(e) => println!("Got error: {:?}", e),
}
}
Ok(())
}map_err wraps each error while preserving successful values unchanged.
Stream-Level vs Single-Value Error Handling
use futures::stream::{self, TryStreamExt};
// Single Result: use Result::map_err
fn transform_single() -> Result<i32, AppError> {
let result: Result<i32, DbError> = Ok(42);
result.map_err(|e| AppError::Database(e))
}
// Stream of Results: use TryStreamExt::map_err
fn transform_stream() -> impl futures::Stream<Item = Result<i32, AppError>> {
let results: Vec<Result<i32, DbError>> = vec![Ok(1), Ok(2), Ok(3)];
stream::iter(results)
.map_err(|e| AppError::Database(e))
}
// Key difference:
// - Result::map_err transforms a single Result
// - TryStreamExt::map_err transforms every error in a streamTryStreamExt::map_err operates on each element in the stream, not just one value.
Lazy Evaluation and Stream Composition
use futures::stream::{self, TryStreamExt};
async fn example() {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Err(DbError::ConnectionFailed),
Ok(2),
];
// map_err is lazy - transformation doesn't happen yet
let stream = stream::iter(results)
.map_err(|e| {
println!("Transforming error: {:?}", e);
AppError::Database(e)
});
println!("Stream created, but nothing transformed yet");
// Transformation happens only when polling
futures::pin_mut!(stream);
// First poll - processes first element
let item = stream.next().await;
// Output: Got value: 1 (no transformation, was Ok)
// Second poll - processes second element
let item = stream.next().await;
// Output: Transforming error: ConnectionFailed
// Got error: Database(ConnectionFailed)
}map_err is lazyāthe transformation function runs only when errors are polled from the stream.
Chaining Error Transformations
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
enum Layer1Error {
NotFound,
Invalid(String),
}
#[derive(Debug)]
enum Layer2Error {
Source(Layer1Error),
Timeout,
}
#[derive(Debug)]
enum AppError {
Inner(Layer2Error),
Config(String),
}
async fn example() {
let results: Vec<Result<i32, Layer1Error>> = vec![
Ok(1),
Err(Layer1Error::NotFound),
Err(Layer1Error::Invalid("bad data".to_string())),
];
// Chain multiple map_err calls
let stream = stream::iter(results)
.map_err(|e| Layer2Error::Source(e))
.map_err(|e| AppError::Inner(e));
// Now have Stream<Item = Result<i32, AppError>>
// Each error is transformed twice through the chain
}Multiple map_err calls compose naturally, transforming errors through layers.
Combining with Other TryStreamExt Methods
use futures::stream::{self, TryStreamExt};
async fn example() -> Result<Vec<String>, AppError> {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Ok(2),
Err(DbError::ConnectionFailed),
Ok(3),
];
// map_err works with other TryStreamExt combinators
let processed: Result<Vec<String>, AppError> = stream::iter(results)
.map_err(|e| AppError::Database(e))
.map_ok(|n| n.to_string()) // Transform success values
.try_filter(|s| async move { s.len() > 0 }) // Filter success values
.try_collect() // Collect into Result<Vec<_>, AppError>
.await;
processed
}
// The stream pipeline:
// 1. Stream of Result<i32, DbError>
// 2. map_err -> Stream of Result<i32, AppError>
// 3. map_ok -> Stream of Result<String, AppError>
// 4. try_filter -> Stream of Result<String, AppError> (filtered)
// 5. try_collect -> Result<Vec<String>, AppError>map_err integrates seamlessly with the TryStreamExt combinator ecosystem.
Error Context Enrichment
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
enum DataError {
ParseError(String),
InvalidFormat,
}
#[derive(Debug)]
struct ContextualError {
context: String,
source: DataError,
}
impl std::fmt::Display for ContextualError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {:?}", self.context, self.source)
}
}
impl std::error::Error for ContextualError {}
async fn process_stream() -> Result<Vec<i32>, ContextualError> {
let data: Vec<Result<i32, DataError>> = vec![
Ok(1),
Err(DataError::ParseError("invalid number".to_string())),
Ok(2),
];
// Add context to each error as it occurs
stream::iter(data)
.map_err(|e| ContextualError {
context: "Processing data stream".to_string(),
source: e,
})
.try_collect()
.await
}map_err enriches errors with contextual information at the stream level.
Converting Between Crate Error Types
use futures::stream::{self, TryStreamExt};
// Common pattern: converting library errors to application errors
// Database crate has its own error type
mod database {
#[derive(Debug)]
pub enum Error {
ConnectionFailed,
QueryError(String),
}
pub fn query_results() -> Vec<Result<i32, Error>> {
vec![Ok(1), Err(Error::QueryError("syntax error".into()))]
}
}
// HTTP crate has its own error type
mod http {
#[derive(Debug)]
pub enum Error {
RequestFailed(String),
Timeout,
}
}
// Application error unifies them
#[derive(Debug)]
enum AppError {
Database(database::Error),
Http(http::Error),
Internal(String),
}
// Convert database errors
fn convert_db_stream() -> impl futures::Stream<Item = Result<i32, AppError>> {
stream::iter(database::query_results())
.map_err(|e| AppError::Database(e))
}
// Convert HTTP errors
fn convert_http_stream() -> impl futures::Stream<Item = Result<String, AppError>> {
let http_results: Vec<Result<String, http::Error>> = vec![Ok("response".into())];
stream::iter(http_results)
.map_err(|e| AppError::Http(e))
}map_err converts domain-specific error types into unified application error types.
Handling Partial Stream Failures
use futures::stream::{self, TryStreamExt};
async fn process_with_continue_on_error() {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Err(DbError::ConnectionFailed),
Ok(2),
Err(DbError::QueryFailed("error".into())),
Ok(3),
];
// Collect successes, but transform errors for logging
let mut stream = stream::iter(results)
.map_err(|e| {
eprintln!("Stream error: {:?}", e);
e
});
// Process stream, continuing through errors
while let Some(result) = stream.next().await {
match result {
Ok(value) => println!("Got value: {}", value),
Err(e) => println!("Skipping error: {:?}", e),
}
}
// Output:
// Got value: 1
// Stream error: ConnectionFailed
// Skipping error: ConnectionFailed
// Got value: 2
// Stream error: QueryFailed("error")
// Skipping error: QueryFailed("error")
// Got value: 3
}map_err can log or track errors while allowing the stream to continue.
map_err with try_fold
use futures::stream::{self, TryStreamExt};
async fn fold_with_error_transformation() -> Result<i32, AppError> {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Ok(2),
Ok(3),
];
// try_fold with transformed errors
stream::iter(results)
.map_err(|e| AppError::Database(e))
.try_fold(0, |acc, val| async move { Ok(acc + val) })
.await
}
// If any error occurs, try_fold short-circuits with the transformed errortry_fold and other try_ methods work with the transformed error type.
map_err vs inspect_err
use futures::stream::{self, TryStreamExt, StreamExt};
async fn compare_methods() {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Err(DbError::ConnectionFailed),
Ok(2),
];
// map_err: TRANSFORMS the error type
// Returns Stream<Item = Result<T, NewError>>
let _mapped = stream::iter(results.clone())
.map_err(|e| AppError::Database(e));
// inspect_err: OBSERVES without transforming
// Returns Stream<Item = Result<T, E>> (same error type)
let _inspected = stream::iter(results)
.inspect_err(|e| println!("Saw error: {:?}", e));
// Use map_err when you need to change error types
// Use inspect_err when you need to log/track errors but keep the type
}map_err transforms error types; inspect_err observes errors without transformation.
Error Recovery After map_err
use futures::stream::{self, TryStreamExt};
async fn recover_from_errors() {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Err(DbError::ConnectionFailed),
Ok(2),
Err(DbError::QueryFailed("error".into())),
];
// Transform errors, then recover with try_filter_map
let recovered: Vec<i32> = stream::iter(results)
.map_err(|e| AppError::Database(e))
.try_filter_map(|val| async move {
// Return Some to keep, None to filter, Err to fail
Ok(Some(val))
})
.try_collect::<Vec<_>>()
.await
.unwrap_or_else(|e| {
println!("Final error: {:?}", e);
vec![]
});
}Combine map_err with error recovery combinators for resilient stream processing.
Type Signature Analysis
use futures::stream::TryStreamExt;
use futures::Stream;
// The signature of map_err (simplified):
// fn map_err<F, E2>(self, f: F) -> MapErr<Self, F>
// where
// F: FnMut(Self::Error) -> E2,
// Self: TryStream,
// Self::Ok: Unpin,
// Self::Error: Unpin,
// Input: Stream<Item = Result<T, E1>>
// Output: Stream<Item = Result<T, E2>>
// The transformation only affects the error variant:
// - Ok(T) -> Ok(T) (unchanged)
// - Err(E1) -> Err(E2) (transformed by f)
// Example showing type flow:
fn type_flow_example() -> impl Stream<Item = Result<String, AppError>> {
let results: Vec<Result<i32, DbError>> = vec![];
// Type: Stream<Item = Result<i32, DbError>>
let stream1 = futures::stream::iter(results);
// Type: Stream<Item = Result<i32, AppError>>
let stream2 = stream1.map_err(|e: DbError| AppError::Database(e));
// Type: Stream<Item = Result<String, AppError>>
let stream3 = stream2.map_ok(|n: i32| n.to_string());
stream3
}The type signature shows that map_err changes only the error type parameter.
Async Error Transformation
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
struct DetailedError {
code: i32,
message: String,
}
impl From<DbError> for DetailedError {
fn from(e: DbError) -> Self {
match e {
DbError::ConnectionFailed => DetailedError {
code: 1001,
message: "Database connection failed".to_string(),
},
DbError::QueryFailed(s) => DetailedError {
code: 1002,
message: format!("Query failed: {}", s),
},
}
}
}
// map_err takes FnMut, not async closure
// For async transformation, use and_then with error handling:
async fn async_error_transform() -> Result<Vec<i32>, DetailedError> {
let results: Vec<Result<i32, DbError>> = vec![Ok(1), Err(DbError::ConnectionFailed)];
// map_err is synchronous (FnMut)
stream::iter(results)
.map_err(|e| DetailedError::from(e)) // Synchronous transformation
.try_collect()
.await
}
// If you need async error transformation:
async fn complex_transform() {
let results: Vec<Result<i32, DbError>> = vec![];
stream::iter(results)
.map_err(|e| {
// This closure must be synchronous
// For async, you'd need a different pattern
AppError::Database(e)
});
}map_err uses synchronous closures; for async transformations, consider and_then patterns.
Integration with try_collect
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
enum ParseError {
InvalidChar(char),
TooLong,
}
#[derive(Debug)]
enum AppError {
Parse(ParseError),
Empty,
}
async fn parse_all(input: Vec<&str>) -> Result<Vec<i32>, AppError> {
// Parse strings into numbers with error transformation
stream::iter(input)
.map(|s| s.parse::<i32>().map_err(|_| ParseError::InvalidChar(s.chars().next().unwrap())))
.map_err(|e| AppError::Parse(e))
.try_collect()
.await
.map(|v| if v.is_empty() { Err(AppError::Empty) } else { Ok(v) })
.unwrap_or(Ok(vec![]))
}
async fn parse_pipeline() {
let inputs = vec!["1", "2", "three", "4"];
let result: Result<Vec<i32>, AppError> = stream::iter(inputs)
.map(|s| s.parse::<i32>())
.map_err(|_| AppError::Parse(ParseError::InvalidChar('t')))
.try_collect()
.await;
match result {
Ok(nums) => println!("Parsed: {:?}", nums),
Err(e) => println!("Error: {:?}", e),
}
}try_collect stops at the first error; map_err ensures it's the right error type.
Real-World Pattern: Database Stream
use futures::stream::{self, TryStreamExt};
// Real pattern: database query results to application errors
struct User {
id: i32,
name: String,
}
#[derive(Debug)]
enum RepositoryError {
ConnectionPool(String),
QueryFailed(String),
NotFound,
}
#[derive(Debug)]
enum ServiceError {
Repository(RepositoryError),
InvalidState(String),
}
impl From<RepositoryError> for ServiceError {
fn from(e: RepositoryError) -> Self {
ServiceError::Repository(e)
}
}
async fn fetch_users_stream() -> impl futures::Stream<Item = Result<User, ServiceError>> {
// Simulated database results
let results: Vec<Result<User, RepositoryError>> = vec![
Ok(User { id: 1, name: "Alice".into() }),
Ok(User { id: 2, name: "Bob".into() }),
Err(RepositoryError::QueryFailed("connection lost".into())),
];
stream::iter(results)
.map_err(|e| ServiceError::Repository(e))
}
async fn process_users() -> Result<Vec<String>, ServiceError> {
fetch_users_stream()
.await
.map_ok(|user| user.name)
.try_collect()
.await
}Database streaming APIs commonly use map_err to lift repository errors into service errors.
Synthesis
TryStreamExt::map_err purpose:
| Aspect | Behavior |
|---|---|
| Input | Stream<Item = Result<T, E1>> |
| Output | Stream<Item = Result<T, E2>> |
| Success values | Passed through unchanged |
| Errors | Transformed by closure |
| Evaluation | Lazy (on poll) |
Common use cases:
- Converting domain-specific errors to unified application errors
- Adding context to errors in stream pipelines
- Logging/tracking errors while allowing stream continuation
- Chaining with other
TryStreamExtcombinators
Combinator ecosystem:
| Combinator | Purpose |
|---|---|
map_err |
Transform error type |
map_ok |
Transform success type |
inspect_err |
Observe errors without transforming |
try_filter |
Filter based on success values |
try_fold |
Accumulate with early termination on error |
try_collect |
Collect into Result<Collection, Error> |
Key insight: TryStreamExt::map_err brings the familiar Result::map_err pattern to streams, enabling error transformation at the stream level. This is essential for composing streams across API boundaries where error types differ, and for enriching errors with contextual information as they flow through processing pipelines. The lazy evaluation ensures transformations only happen when needed, making it efficient for stream processing where not all elements may be consumed.
