What is the difference between futures::stream::StreamExt::filter and Iterator::filter in terms of ownership and async?

Iterator::filter performs synchronous filtering by borrowing each element and applying a predicate closure, returning a new iterator that yields only elements matching the condition. futures::stream::StreamExt::filter performs asynchronous filtering where the predicate is an async closure that returns a Future, requiring ownership semantics compatible with async execution. The key differences are that StreamExt::filter takes ownership of elements before passing them to the async predicate, cannot borrow from the stream items due to async lifetime constraints, and yields elements only after the predicate future completes. This means the async filter may need to clone or copy data that the sync filter could simply borrow.

Iterator::filter: Synchronous Borrowing

fn iterator_filter_example() {
    let numbers = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    
    // Iterator::filter borrows each element
    let evens: Vec<i32> = numbers
        .iter()
        .filter(|n| *n % 2 == 0)
        .copied()
        .collect();
    
    println!("Evens: {:?}", evens);
    
    // The closure receives &&i32 (borrow of the borrow)
    // Elements are never moved, just borrowed
}

Iterator::filter borrows elements, allowing the original collection to remain valid.

StreamExt::filter: Asynchronous Filtering

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
 
#[tokio::main]
async fn stream_filter_example() {
    let numbers = vec![1, 2, 3, 4, 5];
    
    // StreamExt::filter with async predicate
    let evens: Vec<i32> = stream::iter(numbers)
        .filter(|n| async move {
            // Async operations possible here
            sleep(Duration::from_millis(10)).await;
            n % 2 == 0
        })
        .collect()
        .await;
    
    println!("Evens: {:?}", evens);
}

StreamExt::filter accepts an async closure that can perform async operations during filtering.

Ownership: Borrowing vs Moving

use futures::stream::{self, StreamExt};
 
fn iterator_ownership() {
    let numbers = vec![1, 2, 3, 4, 5];
    
    // Iterator filter borrows
    let mut iter = numbers.iter().filter(|n| **n > 2);
    
    // numbers is still valid - we only borrowed
    println!("Original: {:?}", numbers);
    
    // Elements are references
    let first = iter.next();
    println!("First filtered: {:?}", first); // Some(&3)
}
 
#[tokio::main]
async fn stream_ownership() {
    let numbers = vec![1, 2, 3, 4, 5];
    
    // Stream filter takes ownership
    let filtered: Vec<i32> = stream::iter(numbers)
        .filter(|n| async move { *n > 2 })
        .collect()
        .await;
    
    // numbers is consumed by stream::iter
    // println!("{:?}", numbers); // Error: borrow of moved value
    
    // Elements are owned values
    println!("Filtered: {:?}", filtered); // [3, 4, 5]
}

Iterator::filter works with references; StreamExt::filter consumes the stream items.

Async Closure Semantics

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
 
#[tokio::main]
async fn async_predicate_benefits() {
    let urls = vec!["url1", "url2", "url3"];
    
    // With StreamExt::filter, we can do async checks
    let valid_urls: Vec<&str> = stream::iter(urls)
        .filter(|url| async move {
            // Simulate async validation
            sleep(Duration::from_millis(10)).await;
            !url.is_empty()
        })
        .collect()
        .await;
    
    println!("Valid URLs: {:?}", valid_urls);
    
    // This would be impossible with Iterator::filter
    // Iterator filter closures must be synchronous
}
 
// Iterator::filter cannot do async
fn iterator_sync_only() {
    let urls = vec!["url1", "url2", "url3"];
    
    // This works - sync predicate
    let valid: Vec<&&str> = urls
        .iter()
        .filter(|url| !url.is_empty())
        .collect();
    
    // This would NOT compile - async in sync context
    // let invalid = urls.iter().filter(|url| async { true });
}

StreamExt::filter enables async operations within the predicate; Iterator::filter is synchronous only.

The Borrowing Problem in Async Contexts

use futures::stream::{self, StreamExt};
 
#[derive(Debug, Clone)]
struct Item {
    id: u32,
    data: String,
}
 
#[tokio::main]
async fn borrow_issue() {
    let items = vec![
        Item { id: 1, data: "first".to_string() },
        Item { id: 2, data: "second".to_string() },
    ];
    
    // This fails to compile:
    // stream::iter(&items)
    //     .filter(|item| async move { item.id > 0 })
    //     .collect::<Vec<_>>()
    //     .await;
    
    // Error: async closure captures references that outlive the closure
    // The future must own its data
    
    // Solutions:
    
    // 1. Move ownership
    let filtered: Vec<Item> = stream::iter(items)
        .filter(|item| async move { item.id > 0 })
        .collect()
        .await;
    
    // 2. Clone if you need the original
    let items = vec![
        Item { id: 1, data: "first".to_string() },
        Item { id: 2, data: "second".to_string() },
    ];
    let filtered: Vec<Item> = stream::iter(items.clone())
        .filter(|item| async move { item.id > 0 })
        .collect()
        .await;
    
    println!("Original: {:?}", items);
    println!("Filtered: {:?}", filtered);
}

Async closures require owned data; borrowed references create lifetime issues.

Comparison with sync borrowing

use futures::stream::{self, StreamExt};
 
#[derive(Debug)]
struct Record {
    id: u32,
    value: i32,
}
 
fn iterator_can_borrow() {
    let records = vec![
        Record { id: 1, value: 10 },
        Record { id: 2, value: -5 },
        Record { id: 3, value: 15 },
    ];
    
    // Iterator::filter borrows freely
    let positive: Vec<&Record> = records
        .iter()
        .filter(|r| r.value > 0)
        .collect();
    
    // Can still use original
    println!("All records: {:?}", records);
    println!("Positive: {:?}", positive);
    
    // Can have multiple filtered views
    let negative: Vec<&Record> = records
        .iter()
        .filter(|r| r.value < 0)
        .collect();
    
    println!("Negative: {:?}", negative);
}
 
#[tokio::main]
async fn stream_must_own() {
    let records = vec![
        Record { id: 1, value: 10 },
        Record { id: 2, value: -5 },
        Record { id: 3, value: 15 },
    ];
    
    // StreamExt::filter consumes
    let positive: Vec<Record> = stream::iter(records)
        .filter(|r| async move { r.value > 0 })
        .collect()
        .await;
    
    // records is consumed, cannot use here
    
    // To have multiple views, need to clone upfront
    let records = vec![
        Record { id: 1, value: 10 },
        Record { id: 2, value: -5 },
        Record { id: 3, value: 15 },
    ];
    
    let positive: Vec<Record> = stream::iter(records.clone())
        .filter(|r| async move { r.value > 0 })
        .collect()
        .await;
    
    let negative: Vec<Record> = stream::iter(records)
        .filter(|r| async move { r.value < 0 })
        .collect()
        .await;
    
    println!("Positive: {:?}", positive);
    println!("Negative: {:?}", negative);
}

Iterator::filter enables multiple filtered views; StreamExt::filter consumes the source.

FilterMap: Combining Filter and Map

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn filter_map_example() {
    let inputs = vec!["123", "abc", "456", "def"];
    
    // StreamExt::filter_map - filter and transform in one step
    let numbers: Vec<i32> = stream::iter(inputs)
        .filter_map(|s| async move {
            // Return Some to keep, None to filter out
            s.parse::<i32>().ok()
        })
        .collect()
        .await;
    
    println!("Numbers: {:?}", numbers); // [123, 456]
}
 
fn iterator_filter_map() {
    let inputs = vec!["123", "abc", "456", "def"];
    
    // Iterator::filter_map - same pattern, synchronous
    let numbers: Vec<i32> = inputs
        .iter()
        .filter_map(|s| s.parse::<i32>().ok())
        .collect();
    
    println!("Numbers: {:?}", numbers); // [123, 456]
}

Both provide filter_map for combined filtering and transformation.

Concurrent Filtering

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
 
#[tokio::main]
async fn concurrent_filter() {
    let items = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    
    // Sequential filtering
    let start = std::time::Instant::now();
    let sequential: Vec<i32> = stream::iter(items.clone())
        .filter(|n| async move {
            sleep(Duration::from_millis(50)).await;
            *n % 2 == 0
        })
        .collect()
        .await;
    println!("Sequential: {:?} ({:?})", sequential, start.elapsed());
    
    // Concurrent filtering with buffer_unordered
    let start = std::time::Instant::now();
    let concurrent: Vec<i32> = stream::iter(items)
        .filter(|n| async move {
            sleep(Duration::from_millis(50)).await;
            *n % 2 == 0
        })
        .buffer_unordered(5)  // Process up to 5 predicates concurrently
        .collect()
        .await;
    println!("Concurrent: {:?} ({:?})", concurrent, start.elapsed());
    
    // Note: buffer_unordered affects execution order
}
 
// Iterator::filter has no concurrency - it's synchronous

StreamExt::filter can be combined with concurrency primitives; Iterator::filter is always sequential.

Type Signatures Compared

use futures::stream::{Stream, StreamExt};
use std::future::Future;
 
// Iterator::filter signature (simplified)
// fn filter<P>(self, predicate: P) -> Filter<Self, P>
// where
//     P: FnMut(&Self::Item) -> bool,
//
// Key points:
// - predicate receives &Item (borrow)
// - returns bool (synchronous)
// - Filter iterator yields &Item
 
// StreamExt::filter signature (simplified)
// fn filter<Fut, F>(self, f: F) -> Filter<Self, F>
// where
//     F: FnMut(Self::Item) -> Fut,
//     Fut: Future<Output = bool>,
//
// Key points:
// - predicate receives Item (owned)
// - returns Future<Output = bool> (async)
// - Filter stream yields Item (owned)
 
fn demonstrate_types() {
    // Iterator filter yields references
    let numbers = vec![1, 2, 3, 4, 5];
    let mut iter = numbers.iter().filter(|n| **n > 2);
    let item: Option<&i32> = iter.next();
    
    // Stream filter yields owned values
    // let mut stream = stream::iter(numbers).filter(|n| async move { *n > 2 });
    // let item: Option<i32> = stream.next().await;
}

The type signatures reveal the ownership difference: &Item vs Item.

Practical Implications

use futures::stream::{self, StreamExt};
 
#[derive(Debug)]
struct User {
    id: u32,
    name: String,
    active: bool,
}
 
// Sync: Can filter without consuming
fn sync_workflow() {
    let users = vec![
        User { id: 1, name: "Alice".into(), active: true },
        User { id: 2, name: "Bob".into(), active: false },
        User { id: 3, name: "Charlie".into(), active: true },
    ];
    
    // Multiple operations on same data
    let active_count = users.iter().filter(|u| u.active).count();
    let active_names: Vec<&str> = users
        .iter()
        .filter(|u| u.active)
        .map(|u| u.name.as_str())
        .collect();
    
    println!("Active count: {}", active_count);
    println!("Active names: {:?}", active_names);
    println!("Original users: {:?}", users);
}
 
// Async: Must plan ownership
#[tokio::main]
async fn async_workflow() {
    let users = vec![
        User { id: 1, name: "Alice".into(), active: true },
        User { id: 2, name: "Bob".into(), active: false },
        User { id: 3, name: "Charlie".into(), active: true },
    ];
    
    // Need to clone for multiple operations
    let active_count = stream::iter(users.clone())
        .filter(|u| async move { u.active })
        .count()
        .await;
    
    let active_names: Vec<String> = stream::iter(users)
        .filter(|u| async move { u.active })
        .map(|u| u.name)
        .collect()
        .await;
    
    println!("Active count: {}", active_count);
    println!("Active names: {:?}", active_names);
    // users is consumed
}

Plan for ownership transfer when using async streams.

Working Around Ownership Constraints

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn ownership_workarounds() {
    let data = vec!["apple", "banana", "cherry"];
    
    // Approach 1: Clone in the predicate
    let filtered: Vec<&str> = stream::iter(data.clone())
        .filter(|s| async move { s.len() > 5 })
        .collect()
        .await;
    println!("Original preserved: {:?}", data);
    
    // Approach 2: Use references with explicit lifetime
    // This requires the stream to be consumed in the same scope
    let filtered: Vec<&&str> = stream::iter(&data)
        .then(|s| async move { 
            // Can't move reference into async block in some cases
            // Need to use then + filter combo
            s
        })
        .filter(|s| async { s.len() > 5 })
        .collect()
        .await;
    
    // Approach 3: Process and collect early, then sync filter
    let collected: Vec<&str> = data.iter().copied().collect();
    let filtered: Vec<&str> = collected.into_iter()
        .filter(|s| s.len() > 5)
        .collect();
}

Several patterns help manage ownership in async streaming contexts.

Summary Comparison

Aspect Iterator::filter StreamExt::filter
Execution Synchronous Asynchronous
Predicate receives &Item (borrow) Item (owned)
Returns bool Future<Output = bool>
Yields &Item Item
Original collection Preserved Consumed
Async operations Not possible Supported
Concurrency Sequential only Can be concurrent
Multiple passes Easy (borrow) Requires clone

Synthesis

Iterator::filter and StreamExt::filter serve similar purposes but operate in fundamentally different execution contexts:

Iterator::filter (synchronous):

  • Borrows elements through &Item
  • Original collection remains accessible
  • Multiple filtered views are trivial
  • Predicate must complete synchronously
  • Zero allocation for the filtering itself

StreamExt::filter (asynchronous):

  • Takes ownership of Item
  • Async predicate returns Future<Output = bool>
  • Can perform async operations (I/O, timers, etc.)
  • Original stream is consumed
  • May require cloning for multiple passes

Key insight: The ownership difference stems from Rust's async semantics. An async block captures its environment and may outlive the calling scope, so it cannot safely hold borrowed references unless their lifetimes are explicitly tracked. This constraint forces StreamExt::filter to take ownership of items, unlike Iterator::filter which operates within a single synchronous call stack where borrow lifetimes are naturally contained.

Choose Iterator::filter for pure in-memory filtering where sync is sufficient. Choose StreamExt::filter when the predicate requires async operations like database lookups, HTTP requests, or other I/O. When using async streams, design your data flow to accommodate ownership transfer or use cloning strategically where multiple passes are needed.