Loading page…
Rust walkthroughs
Loading page…
futures::stream::StreamExt::filter and Iterator::filter in terms of ownership and async?Iterator::filter performs synchronous filtering by borrowing each element and applying a predicate closure, returning a new iterator that yields only elements matching the condition. futures::stream::StreamExt::filter performs asynchronous filtering where the predicate is an async closure that returns a Future, requiring ownership semantics compatible with async execution. The key differences are that StreamExt::filter takes ownership of elements before passing them to the async predicate, cannot borrow from the stream items due to async lifetime constraints, and yields elements only after the predicate future completes. This means the async filter may need to clone or copy data that the sync filter could simply borrow.
fn iterator_filter_example() {
let numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// Iterator::filter borrows each element
let evens: Vec<i32> = numbers
.iter()
.filter(|n| *n % 2 == 0)
.copied()
.collect();
println!("Evens: {:?}", evens);
// The closure receives &&i32 (borrow of the borrow)
// Elements are never moved, just borrowed
}Iterator::filter borrows elements, allowing the original collection to remain valid.
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn stream_filter_example() {
let numbers = vec![1, 2, 3, 4, 5];
// StreamExt::filter with async predicate
let evens: Vec<i32> = stream::iter(numbers)
.filter(|n| async move {
// Async operations possible here
sleep(Duration::from_millis(10)).await;
n % 2 == 0
})
.collect()
.await;
println!("Evens: {:?}", evens);
}StreamExt::filter accepts an async closure that can perform async operations during filtering.
use futures::stream::{self, StreamExt};
fn iterator_ownership() {
let numbers = vec![1, 2, 3, 4, 5];
// Iterator filter borrows
let mut iter = numbers.iter().filter(|n| **n > 2);
// numbers is still valid - we only borrowed
println!("Original: {:?}", numbers);
// Elements are references
let first = iter.next();
println!("First filtered: {:?}", first); // Some(&3)
}
#[tokio::main]
async fn stream_ownership() {
let numbers = vec![1, 2, 3, 4, 5];
// Stream filter takes ownership
let filtered: Vec<i32> = stream::iter(numbers)
.filter(|n| async move { *n > 2 })
.collect()
.await;
// numbers is consumed by stream::iter
// println!("{:?}", numbers); // Error: borrow of moved value
// Elements are owned values
println!("Filtered: {:?}", filtered); // [3, 4, 5]
}Iterator::filter works with references; StreamExt::filter consumes the stream items.
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn async_predicate_benefits() {
let urls = vec!["url1", "url2", "url3"];
// With StreamExt::filter, we can do async checks
let valid_urls: Vec<&str> = stream::iter(urls)
.filter(|url| async move {
// Simulate async validation
sleep(Duration::from_millis(10)).await;
!url.is_empty()
})
.collect()
.await;
println!("Valid URLs: {:?}", valid_urls);
// This would be impossible with Iterator::filter
// Iterator filter closures must be synchronous
}
// Iterator::filter cannot do async
fn iterator_sync_only() {
let urls = vec!["url1", "url2", "url3"];
// This works - sync predicate
let valid: Vec<&&str> = urls
.iter()
.filter(|url| !url.is_empty())
.collect();
// This would NOT compile - async in sync context
// let invalid = urls.iter().filter(|url| async { true });
}StreamExt::filter enables async operations within the predicate; Iterator::filter is synchronous only.
use futures::stream::{self, StreamExt};
#[derive(Debug, Clone)]
struct Item {
id: u32,
data: String,
}
#[tokio::main]
async fn borrow_issue() {
let items = vec![
Item { id: 1, data: "first".to_string() },
Item { id: 2, data: "second".to_string() },
];
// This fails to compile:
// stream::iter(&items)
// .filter(|item| async move { item.id > 0 })
// .collect::<Vec<_>>()
// .await;
// Error: async closure captures references that outlive the closure
// The future must own its data
// Solutions:
// 1. Move ownership
let filtered: Vec<Item> = stream::iter(items)
.filter(|item| async move { item.id > 0 })
.collect()
.await;
// 2. Clone if you need the original
let items = vec![
Item { id: 1, data: "first".to_string() },
Item { id: 2, data: "second".to_string() },
];
let filtered: Vec<Item> = stream::iter(items.clone())
.filter(|item| async move { item.id > 0 })
.collect()
.await;
println!("Original: {:?}", items);
println!("Filtered: {:?}", filtered);
}Async closures require owned data; borrowed references create lifetime issues.
use futures::stream::{self, StreamExt};
#[derive(Debug)]
struct Record {
id: u32,
value: i32,
}
fn iterator_can_borrow() {
let records = vec![
Record { id: 1, value: 10 },
Record { id: 2, value: -5 },
Record { id: 3, value: 15 },
];
// Iterator::filter borrows freely
let positive: Vec<&Record> = records
.iter()
.filter(|r| r.value > 0)
.collect();
// Can still use original
println!("All records: {:?}", records);
println!("Positive: {:?}", positive);
// Can have multiple filtered views
let negative: Vec<&Record> = records
.iter()
.filter(|r| r.value < 0)
.collect();
println!("Negative: {:?}", negative);
}
#[tokio::main]
async fn stream_must_own() {
let records = vec![
Record { id: 1, value: 10 },
Record { id: 2, value: -5 },
Record { id: 3, value: 15 },
];
// StreamExt::filter consumes
let positive: Vec<Record> = stream::iter(records)
.filter(|r| async move { r.value > 0 })
.collect()
.await;
// records is consumed, cannot use here
// To have multiple views, need to clone upfront
let records = vec![
Record { id: 1, value: 10 },
Record { id: 2, value: -5 },
Record { id: 3, value: 15 },
];
let positive: Vec<Record> = stream::iter(records.clone())
.filter(|r| async move { r.value > 0 })
.collect()
.await;
let negative: Vec<Record> = stream::iter(records)
.filter(|r| async move { r.value < 0 })
.collect()
.await;
println!("Positive: {:?}", positive);
println!("Negative: {:?}", negative);
}Iterator::filter enables multiple filtered views; StreamExt::filter consumes the source.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn filter_map_example() {
let inputs = vec!["123", "abc", "456", "def"];
// StreamExt::filter_map - filter and transform in one step
let numbers: Vec<i32> = stream::iter(inputs)
.filter_map(|s| async move {
// Return Some to keep, None to filter out
s.parse::<i32>().ok()
})
.collect()
.await;
println!("Numbers: {:?}", numbers); // [123, 456]
}
fn iterator_filter_map() {
let inputs = vec!["123", "abc", "456", "def"];
// Iterator::filter_map - same pattern, synchronous
let numbers: Vec<i32> = inputs
.iter()
.filter_map(|s| s.parse::<i32>().ok())
.collect();
println!("Numbers: {:?}", numbers); // [123, 456]
}Both provide filter_map for combined filtering and transformation.
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn concurrent_filter() {
let items = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// Sequential filtering
let start = std::time::Instant::now();
let sequential: Vec<i32> = stream::iter(items.clone())
.filter(|n| async move {
sleep(Duration::from_millis(50)).await;
*n % 2 == 0
})
.collect()
.await;
println!("Sequential: {:?} ({:?})", sequential, start.elapsed());
// Concurrent filtering with buffer_unordered
let start = std::time::Instant::now();
let concurrent: Vec<i32> = stream::iter(items)
.filter(|n| async move {
sleep(Duration::from_millis(50)).await;
*n % 2 == 0
})
.buffer_unordered(5) // Process up to 5 predicates concurrently
.collect()
.await;
println!("Concurrent: {:?} ({:?})", concurrent, start.elapsed());
// Note: buffer_unordered affects execution order
}
// Iterator::filter has no concurrency - it's synchronousStreamExt::filter can be combined with concurrency primitives; Iterator::filter is always sequential.
use futures::stream::{Stream, StreamExt};
use std::future::Future;
// Iterator::filter signature (simplified)
// fn filter<P>(self, predicate: P) -> Filter<Self, P>
// where
// P: FnMut(&Self::Item) -> bool,
//
// Key points:
// - predicate receives &Item (borrow)
// - returns bool (synchronous)
// - Filter iterator yields &Item
// StreamExt::filter signature (simplified)
// fn filter<Fut, F>(self, f: F) -> Filter<Self, F>
// where
// F: FnMut(Self::Item) -> Fut,
// Fut: Future<Output = bool>,
//
// Key points:
// - predicate receives Item (owned)
// - returns Future<Output = bool> (async)
// - Filter stream yields Item (owned)
fn demonstrate_types() {
// Iterator filter yields references
let numbers = vec![1, 2, 3, 4, 5];
let mut iter = numbers.iter().filter(|n| **n > 2);
let item: Option<&i32> = iter.next();
// Stream filter yields owned values
// let mut stream = stream::iter(numbers).filter(|n| async move { *n > 2 });
// let item: Option<i32> = stream.next().await;
}The type signatures reveal the ownership difference: &Item vs Item.
use futures::stream::{self, StreamExt};
#[derive(Debug)]
struct User {
id: u32,
name: String,
active: bool,
}
// Sync: Can filter without consuming
fn sync_workflow() {
let users = vec![
User { id: 1, name: "Alice".into(), active: true },
User { id: 2, name: "Bob".into(), active: false },
User { id: 3, name: "Charlie".into(), active: true },
];
// Multiple operations on same data
let active_count = users.iter().filter(|u| u.active).count();
let active_names: Vec<&str> = users
.iter()
.filter(|u| u.active)
.map(|u| u.name.as_str())
.collect();
println!("Active count: {}", active_count);
println!("Active names: {:?}", active_names);
println!("Original users: {:?}", users);
}
// Async: Must plan ownership
#[tokio::main]
async fn async_workflow() {
let users = vec![
User { id: 1, name: "Alice".into(), active: true },
User { id: 2, name: "Bob".into(), active: false },
User { id: 3, name: "Charlie".into(), active: true },
];
// Need to clone for multiple operations
let active_count = stream::iter(users.clone())
.filter(|u| async move { u.active })
.count()
.await;
let active_names: Vec<String> = stream::iter(users)
.filter(|u| async move { u.active })
.map(|u| u.name)
.collect()
.await;
println!("Active count: {}", active_count);
println!("Active names: {:?}", active_names);
// users is consumed
}Plan for ownership transfer when using async streams.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn ownership_workarounds() {
let data = vec!["apple", "banana", "cherry"];
// Approach 1: Clone in the predicate
let filtered: Vec<&str> = stream::iter(data.clone())
.filter(|s| async move { s.len() > 5 })
.collect()
.await;
println!("Original preserved: {:?}", data);
// Approach 2: Use references with explicit lifetime
// This requires the stream to be consumed in the same scope
let filtered: Vec<&&str> = stream::iter(&data)
.then(|s| async move {
// Can't move reference into async block in some cases
// Need to use then + filter combo
s
})
.filter(|s| async { s.len() > 5 })
.collect()
.await;
// Approach 3: Process and collect early, then sync filter
let collected: Vec<&str> = data.iter().copied().collect();
let filtered: Vec<&str> = collected.into_iter()
.filter(|s| s.len() > 5)
.collect();
}Several patterns help manage ownership in async streaming contexts.
| Aspect | Iterator::filter | StreamExt::filter |
|--------|-------------------|---------------------|
| Execution | Synchronous | Asynchronous |
| Predicate receives | &Item (borrow) | Item (owned) |
| Returns | bool | Future<Output = bool> |
| Yields | &Item | Item |
| Original collection | Preserved | Consumed |
| Async operations | Not possible | Supported |
| Concurrency | Sequential only | Can be concurrent |
| Multiple passes | Easy (borrow) | Requires clone |
Iterator::filter and StreamExt::filter serve similar purposes but operate in fundamentally different execution contexts:
Iterator::filter (synchronous):
&ItemStreamExt::filter (asynchronous):
ItemFuture<Output = bool>Key insight: The ownership difference stems from Rust's async semantics. An async block captures its environment and may outlive the calling scope, so it cannot safely hold borrowed references unless their lifetimes are explicitly tracked. This constraint forces StreamExt::filter to take ownership of items, unlike Iterator::filter which operates within a single synchronous call stack where borrow lifetimes are naturally contained.
Choose Iterator::filter for pure in-memory filtering where sync is sufficient. Choose StreamExt::filter when the predicate requires async operations like database lookups, HTTP requests, or other I/O. When using async streams, design your data flow to accommodate ownership transfer or use cloning strategically where multiple passes are needed.