Loading pageā¦
Rust walkthroughs
Loading pageā¦
futures::stream::TryStreamExt::map_err for stream-level error transformation?TryStreamExt::map_err transforms the error type of a fallible stream (Stream<Item = Result<T, E>>) into a different error type, returning a new stream that applies the transformation to each error while passing through successful values unchanged. This enables error type adaptation at the stream levelāconverting domain-specific errors into unified error types, enriching errors with context, or wrapping errors for compatibility with APIs that expect specific error types. Unlike collecting into a Result first, map_err operates on streams lazily, applying transformations element-by-element as the stream is polled.
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
enum DbError {
ConnectionFailed,
QueryFailed(String),
}
#[derive(Debug)]
enum AppError {
Database(DbError),
Timeout,
}
impl From<DbError> for AppError {
fn from(err: DbError) -> Self {
AppError::Database(err)
}
}
async fn example() -> Result<(), AppError> {
let db_results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Ok(2),
Err(DbError::QueryFailed("timeout".to_string())),
Ok(3),
];
// Convert stream of Result<i32, DbError> to Result<i32, AppError>
let mut stream = stream::iter(db_results)
.map_err(|e| AppError::Database(e));
while let Some(result) = stream.next().await {
match result {
Ok(val) => println!("Got value: {}", val),
Err(e) => println!("Got error: {:?}", e),
}
}
Ok(())
}map_err wraps each error while preserving successful values unchanged.
use futures::stream::{self, TryStreamExt};
// Single Result: use Result::map_err
fn transform_single() -> Result<i32, AppError> {
let result: Result<i32, DbError> = Ok(42);
result.map_err(|e| AppError::Database(e))
}
// Stream of Results: use TryStreamExt::map_err
fn transform_stream() -> impl futures::Stream<Item = Result<i32, AppError>> {
let results: Vec<Result<i32, DbError>> = vec![Ok(1), Ok(2), Ok(3)];
stream::iter(results)
.map_err(|e| AppError::Database(e))
}
// Key difference:
// - Result::map_err transforms a single Result
// - TryStreamExt::map_err transforms every error in a streamTryStreamExt::map_err operates on each element in the stream, not just one value.
use futures::stream::{self, TryStreamExt};
async fn example() {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Err(DbError::ConnectionFailed),
Ok(2),
];
// map_err is lazy - transformation doesn't happen yet
let stream = stream::iter(results)
.map_err(|e| {
println!("Transforming error: {:?}", e);
AppError::Database(e)
});
println!("Stream created, but nothing transformed yet");
// Transformation happens only when polling
futures::pin_mut!(stream);
// First poll - processes first element
let item = stream.next().await;
// Output: Got value: 1 (no transformation, was Ok)
// Second poll - processes second element
let item = stream.next().await;
// Output: Transforming error: ConnectionFailed
// Got error: Database(ConnectionFailed)
}map_err is lazyāthe transformation function runs only when errors are polled from the stream.
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
enum Layer1Error {
NotFound,
Invalid(String),
}
#[derive(Debug)]
enum Layer2Error {
Source(Layer1Error),
Timeout,
}
#[derive(Debug)]
enum AppError {
Inner(Layer2Error),
Config(String),
}
async fn example() {
let results: Vec<Result<i32, Layer1Error>> = vec![
Ok(1),
Err(Layer1Error::NotFound),
Err(Layer1Error::Invalid("bad data".to_string())),
];
// Chain multiple map_err calls
let stream = stream::iter(results)
.map_err(|e| Layer2Error::Source(e))
.map_err(|e| AppError::Inner(e));
// Now have Stream<Item = Result<i32, AppError>>
// Each error is transformed twice through the chain
}Multiple map_err calls compose naturally, transforming errors through layers.
use futures::stream::{self, TryStreamExt};
async fn example() -> Result<Vec<String>, AppError> {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Ok(2),
Err(DbError::ConnectionFailed),
Ok(3),
];
// map_err works with other TryStreamExt combinators
let processed: Result<Vec<String>, AppError> = stream::iter(results)
.map_err(|e| AppError::Database(e))
.map_ok(|n| n.to_string()) // Transform success values
.try_filter(|s| async move { s.len() > 0 }) // Filter success values
.try_collect() // Collect into Result<Vec<_>, AppError>
.await;
processed
}
// The stream pipeline:
// 1. Stream of Result<i32, DbError>
// 2. map_err -> Stream of Result<i32, AppError>
// 3. map_ok -> Stream of Result<String, AppError>
// 4. try_filter -> Stream of Result<String, AppError> (filtered)
// 5. try_collect -> Result<Vec<String>, AppError>map_err integrates seamlessly with the TryStreamExt combinator ecosystem.
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
enum DataError {
ParseError(String),
InvalidFormat,
}
#[derive(Debug)]
struct ContextualError {
context: String,
source: DataError,
}
impl std::fmt::Display for ContextualError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {:?}", self.context, self.source)
}
}
impl std::error::Error for ContextualError {}
async fn process_stream() -> Result<Vec<i32>, ContextualError> {
let data: Vec<Result<i32, DataError>> = vec![
Ok(1),
Err(DataError::ParseError("invalid number".to_string())),
Ok(2),
];
// Add context to each error as it occurs
stream::iter(data)
.map_err(|e| ContextualError {
context: "Processing data stream".to_string(),
source: e,
})
.try_collect()
.await
}map_err enriches errors with contextual information at the stream level.
use futures::stream::{self, TryStreamExt};
// Common pattern: converting library errors to application errors
// Database crate has its own error type
mod database {
#[derive(Debug)]
pub enum Error {
ConnectionFailed,
QueryError(String),
}
pub fn query_results() -> Vec<Result<i32, Error>> {
vec![Ok(1), Err(Error::QueryError("syntax error".into()))]
}
}
// HTTP crate has its own error type
mod http {
#[derive(Debug)]
pub enum Error {
RequestFailed(String),
Timeout,
}
}
// Application error unifies them
#[derive(Debug)]
enum AppError {
Database(database::Error),
Http(http::Error),
Internal(String),
}
// Convert database errors
fn convert_db_stream() -> impl futures::Stream<Item = Result<i32, AppError>> {
stream::iter(database::query_results())
.map_err(|e| AppError::Database(e))
}
// Convert HTTP errors
fn convert_http_stream() -> impl futures::Stream<Item = Result<String, AppError>> {
let http_results: Vec<Result<String, http::Error>> = vec![Ok("response".into())];
stream::iter(http_results)
.map_err(|e| AppError::Http(e))
}map_err converts domain-specific error types into unified application error types.
use futures::stream::{self, TryStreamExt};
async fn process_with_continue_on_error() {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Err(DbError::ConnectionFailed),
Ok(2),
Err(DbError::QueryFailed("error".into())),
Ok(3),
];
// Collect successes, but transform errors for logging
let mut stream = stream::iter(results)
.map_err(|e| {
eprintln!("Stream error: {:?}", e);
e
});
// Process stream, continuing through errors
while let Some(result) = stream.next().await {
match result {
Ok(value) => println!("Got value: {}", value),
Err(e) => println!("Skipping error: {:?}", e),
}
}
// Output:
// Got value: 1
// Stream error: ConnectionFailed
// Skipping error: ConnectionFailed
// Got value: 2
// Stream error: QueryFailed("error")
// Skipping error: QueryFailed("error")
// Got value: 3
}map_err can log or track errors while allowing the stream to continue.
use futures::stream::{self, TryStreamExt};
async fn fold_with_error_transformation() -> Result<i32, AppError> {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Ok(2),
Ok(3),
];
// try_fold with transformed errors
stream::iter(results)
.map_err(|e| AppError::Database(e))
.try_fold(0, |acc, val| async move { Ok(acc + val) })
.await
}
// If any error occurs, try_fold short-circuits with the transformed errortry_fold and other try_ methods work with the transformed error type.
use futures::stream::{self, TryStreamExt, StreamExt};
async fn compare_methods() {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Err(DbError::ConnectionFailed),
Ok(2),
];
// map_err: TRANSFORMS the error type
// Returns Stream<Item = Result<T, NewError>>
let _mapped = stream::iter(results.clone())
.map_err(|e| AppError::Database(e));
// inspect_err: OBSERVES without transforming
// Returns Stream<Item = Result<T, E>> (same error type)
let _inspected = stream::iter(results)
.inspect_err(|e| println!("Saw error: {:?}", e));
// Use map_err when you need to change error types
// Use inspect_err when you need to log/track errors but keep the type
}map_err transforms error types; inspect_err observes errors without transformation.
use futures::stream::{self, TryStreamExt};
async fn recover_from_errors() {
let results: Vec<Result<i32, DbError>> = vec![
Ok(1),
Err(DbError::ConnectionFailed),
Ok(2),
Err(DbError::QueryFailed("error".into())),
];
// Transform errors, then recover with try_filter_map
let recovered: Vec<i32> = stream::iter(results)
.map_err(|e| AppError::Database(e))
.try_filter_map(|val| async move {
// Return Some to keep, None to filter, Err to fail
Ok(Some(val))
})
.try_collect::<Vec<_>>()
.await
.unwrap_or_else(|e| {
println!("Final error: {:?}", e);
vec![]
});
}Combine map_err with error recovery combinators for resilient stream processing.
use futures::stream::TryStreamExt;
use futures::Stream;
// The signature of map_err (simplified):
// fn map_err<F, E2>(self, f: F) -> MapErr<Self, F>
// where
// F: FnMut(Self::Error) -> E2,
// Self: TryStream,
// Self::Ok: Unpin,
// Self::Error: Unpin,
// Input: Stream<Item = Result<T, E1>>
// Output: Stream<Item = Result<T, E2>>
// The transformation only affects the error variant:
// - Ok(T) -> Ok(T) (unchanged)
// - Err(E1) -> Err(E2) (transformed by f)
// Example showing type flow:
fn type_flow_example() -> impl Stream<Item = Result<String, AppError>> {
let results: Vec<Result<i32, DbError>> = vec![];
// Type: Stream<Item = Result<i32, DbError>>
let stream1 = futures::stream::iter(results);
// Type: Stream<Item = Result<i32, AppError>>
let stream2 = stream1.map_err(|e: DbError| AppError::Database(e));
// Type: Stream<Item = Result<String, AppError>>
let stream3 = stream2.map_ok(|n: i32| n.to_string());
stream3
}The type signature shows that map_err changes only the error type parameter.
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
struct DetailedError {
code: i32,
message: String,
}
impl From<DbError> for DetailedError {
fn from(e: DbError) -> Self {
match e {
DbError::ConnectionFailed => DetailedError {
code: 1001,
message: "Database connection failed".to_string(),
},
DbError::QueryFailed(s) => DetailedError {
code: 1002,
message: format!("Query failed: {}", s),
},
}
}
}
// map_err takes FnMut, not async closure
// For async transformation, use and_then with error handling:
async fn async_error_transform() -> Result<Vec<i32>, DetailedError> {
let results: Vec<Result<i32, DbError>> = vec![Ok(1), Err(DbError::ConnectionFailed)];
// map_err is synchronous (FnMut)
stream::iter(results)
.map_err(|e| DetailedError::from(e)) // Synchronous transformation
.try_collect()
.await
}
// If you need async error transformation:
async fn complex_transform() {
let results: Vec<Result<i32, DbError>> = vec![];
stream::iter(results)
.map_err(|e| {
// This closure must be synchronous
// For async, you'd need a different pattern
AppError::Database(e)
});
}map_err uses synchronous closures; for async transformations, consider and_then patterns.
use futures::stream::{self, TryStreamExt};
#[derive(Debug)]
enum ParseError {
InvalidChar(char),
TooLong,
}
#[derive(Debug)]
enum AppError {
Parse(ParseError),
Empty,
}
async fn parse_all(input: Vec<&str>) -> Result<Vec<i32>, AppError> {
// Parse strings into numbers with error transformation
stream::iter(input)
.map(|s| s.parse::<i32>().map_err(|_| ParseError::InvalidChar(s.chars().next().unwrap())))
.map_err(|e| AppError::Parse(e))
.try_collect()
.await
.map(|v| if v.is_empty() { Err(AppError::Empty) } else { Ok(v) })
.unwrap_or(Ok(vec![]))
}
async fn parse_pipeline() {
let inputs = vec!["1", "2", "three", "4"];
let result: Result<Vec<i32>, AppError> = stream::iter(inputs)
.map(|s| s.parse::<i32>())
.map_err(|_| AppError::Parse(ParseError::InvalidChar('t')))
.try_collect()
.await;
match result {
Ok(nums) => println!("Parsed: {:?}", nums),
Err(e) => println!("Error: {:?}", e),
}
}try_collect stops at the first error; map_err ensures it's the right error type.
use futures::stream::{self, TryStreamExt};
// Real pattern: database query results to application errors
struct User {
id: i32,
name: String,
}
#[derive(Debug)]
enum RepositoryError {
ConnectionPool(String),
QueryFailed(String),
NotFound,
}
#[derive(Debug)]
enum ServiceError {
Repository(RepositoryError),
InvalidState(String),
}
impl From<RepositoryError> for ServiceError {
fn from(e: RepositoryError) -> Self {
ServiceError::Repository(e)
}
}
async fn fetch_users_stream() -> impl futures::Stream<Item = Result<User, ServiceError>> {
// Simulated database results
let results: Vec<Result<User, RepositoryError>> = vec![
Ok(User { id: 1, name: "Alice".into() }),
Ok(User { id: 2, name: "Bob".into() }),
Err(RepositoryError::QueryFailed("connection lost".into())),
];
stream::iter(results)
.map_err(|e| ServiceError::Repository(e))
}
async fn process_users() -> Result<Vec<String>, ServiceError> {
fetch_users_stream()
.await
.map_ok(|user| user.name)
.try_collect()
.await
}Database streaming APIs commonly use map_err to lift repository errors into service errors.
TryStreamExt::map_err purpose:
| Aspect | Behavior |
|--------|----------|
| Input | Stream<Item = Result<T, E1>> |
| Output | Stream<Item = Result<T, E2>> |
| Success values | Passed through unchanged |
| Errors | Transformed by closure |
| Evaluation | Lazy (on poll) |
Common use cases:
TryStreamExt combinatorsCombinator ecosystem:
| Combinator | Purpose |
|------------|---------|
| map_err | Transform error type |
| map_ok | Transform success type |
| inspect_err | Observe errors without transforming |
| try_filter | Filter based on success values |
| try_fold | Accumulate with early termination on error |
| try_collect | Collect into Result<Collection, Error> |
Key insight: TryStreamExt::map_err brings the familiar Result::map_err pattern to streams, enabling error transformation at the stream level. This is essential for composing streams across API boundaries where error types differ, and for enriching errors with contextual information as they flow through processing pipelines. The lazy evaluation ensures transformations only happen when needed, making it efficient for stream processing where not all elements may be consumed.