What are the differences between futures::stream::Stream and std::iter::Iterator in terms of execution model?

Stream and Iterator both represent sequences of values, but they differ fundamentally in how values are produced. Iterators yield values synchronously on demand, while streams yield values asynchronously with potential waiting between items.

Iterator: Synchronous Pull-Based Sequences

The Iterator trait produces values synchronously:

trait Iterator {
    type Item;
    
    fn next(&mut self) -> Option<Self::Item>;
    
    // Many provided methods...
}
 
// Simple usage
fn iterator_example() {
    let numbers = vec![1, 2, 3, 4, 5];
    let mut iter = numbers.into_iter();
    
    // next() returns immediately with a value or None
    assert_eq!(iter.next(), Some(1));
    assert_eq!(iter.next(), Some(2));
    assert_eq!(iter.next(), Some(3));
    // ...
}

When you call next() on an iterator, it either returns immediately with Some(item) or None if exhausted.

Stream: Asynchronous Pull-Based Sequences

The Stream trait produces values asynchronously:

use futures::stream::{Stream, StreamExt};
 
// Simplified trait definition
trait Stream {
    type Item;
    
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) 
        -> Poll<Option<Self::Item>>;
}
 
// Stream usage requires async
async fn stream_example() {
    use futures::stream::iter;
    
    let mut stream = iter(vec![1, 2, 3, 4, 5]);
    
    // next() returns a future that resolves to Option<Item>
    while let Some(value) = stream.next().await {
        println!("Got: {}", value);
    }
}

When you call next() on a stream, you get a future that may wait for data to become available.

The Execution Model Difference

use std::time::Duration;
use futures::stream::{self, StreamExt};
use tokio::time::interval;
 
// Iterator: Always returns immediately
fn iterator_blocking() {
    let data = vec![1, 2, 3];
    let mut iter = data.into_iter();
    
    // These calls never block
    let _ = iter.next();  // Immediate
    let _ = iter.next();  // Immediate
    let _ = iter.next();  // Immediate
}
 
// Stream: Can wait between items
async fn stream_waiting() {
    // Create a stream that yields every second
    let mut interval_stream = interval(Duration::from_secs(1))
        .map(|_| "tick");
    
    // Each next() may wait
    let _ = interval_stream.next().await;  // Waits ~1 second
    let _ = interval_stream.next().await;  // Waits another second
    let _ = interval_stream.next().await;  // Waits another second
}

The key difference: iterators are always ready; streams may need to wait.

The Poll Model

use std::task::{Poll, Context};
use std::pin::Pin;
 
// Iterator's next is simple
fn iterator_next_example() {
    let mut iter = vec![1, 2, 3].into_iter();
    
    // Direct return, always immediate
    match iter.next() {
        Some(v) => println!("Got: {}", v),
        None => println!("Done"),
    }
}
 
// Stream uses Poll to indicate readiness
// Poll::Ready(Some(item)) - item available
// Poll::Ready(None) - stream finished
// Poll::Pending - not ready yet, wake me later

Streams use Poll to communicate their state to the executor.

Blocking vs Non-Blocking Contexts

use std::fs::File;
use std::io::{BufRead, BufReader};
use futures::stream::{Stream, StreamExt};
use tokio::fs::File as AsyncFile;
use tokio::io::{AsyncBufReadExt, BufReader as AsyncBufReader};
 
// Iterator: Can block
fn file_lines_iterator(path: &str) -> impl Iterator<Item = String> {
    let file = File::open(path).unwrap();
    let reader = BufReader::new(file);
    reader.lines().map(|l| l.unwrap())
}
 
// This would block the current thread if reading from disk
fn use_iterator() {
    for line in file_lines_iterator("large_file.txt") {
        // Thread is blocked during I/O
        println!("{}", line);
    }
}
 
// Stream: Non-blocking in async context
async fn file_lines_stream(path: &str) -> impl Stream<Item = String> {
    use futures::stream::unfold;
    let file = AsyncFile::open(path).await.unwrap();
    let reader = AsyncBufReader::new(file);
    
    // This stream won't block - it yields to the executor during I/O
    futures::stream::try_unfold(reader, |mut r| async move {
        let mut line = String::new();
        match r.read_line(&mut line).await {
            Ok(0) => Ok(None),  // EOF
            Ok(_) => Ok(Some((line, r))),
            Err(e) => Err(e),
        }
    })
    .filter_map(|r| async move { r.ok() })
}

Iterators can block; streams integrate with async runtimes.

Combinators: Similar API, Different Execution

use futures::stream::{self, StreamExt};
 
// Iterator combinators are synchronous
fn iterator_combinators() {
    let result: Vec<i32> = (1..=10)
        .filter(|x| x % 2 == 0)  // Synchronous filter
        .map(|x| x * 2)          // Synchronous map
        .take(3)                  // Synchronous take
        .collect();
    
    println!("{:?}", result);  // [4, 8, 12]
}
 
// Stream combinators are async
async fn stream_combinators() {
    let result: Vec<i32> = stream::iter(1..=10)
        .filter(|x| async move { x % 2 == 0 })  // Async filter
        .map(|x| async move { x * 2 })           // Async map
        .take(3)                                  // Async take
        .collect()
        .await;
    
    println!("{:?}", result);  // [4, 8, 12]
}

Stream combinators return futures or new streams, requiring .await for execution.

Async Operations in Combinators

use futures::stream::{self, StreamExt};
 
// Iterator: Can't do async in combinators
fn iterator_no_async() {
    // This won't work:
    // (1..=10).map(|x| async_operation(x).await)
    
    // Would need to collect futures and then await them
    let futures: Vec<_> = (1..=10)
        .map(|x| async_operation(x))
        .collect();
    
    // But then you're not iterating, you're batching
}
 
// Stream: Async operations in combinators
async fn stream_with_async() {
    let stream = stream::iter(1..=10)
        .then(|x| async move {
            // Each item can have async work
            simulate_network_request(x).await
        })
        .filter(|result| async move {
            // Async filtering too
            result.is_ok()
        });
    
    futures::pin_mut!(stream);
    while let Some(value) = stream.next().await {
        println!("Got: {:?}", value);
    }
}
 
async fn simulate_network_request(id: i32) -> Result<String, &'static str> {
    // Simulated async I/O
    Ok(format!("Response for {}", id))
}

Streams allow async operations per-item; iterators require batching async work.

Backpressure and Flow Control

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
 
// Iterator: No backpressure mechanism
fn iterator_no_backpressure() {
    let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    
    // Consumer pulls as fast as it can
    for item in data {
        // No way to signal "slow down" to producer
        // Producer already has all data in memory
    }
}
 
// Stream: Built-in backpressure
async fn stream_backpressure() {
    let mut stream = stream::iter(1..=100)
        .throttle(Duration::from_millis(100));  // Rate limit
    
    // Stream only produces when polled
    while let Some(item) = stream.next().await {
        println!("Processing: {}", item);
        
        // We control the pace by when we call next()
        if item % 10 == 0 {
            sleep(Duration::from_secs(1)).await;  // Slow down
        }
    }
}
 
// Buffer with limits
async fn stream_buffering() {
    let stream = stream::iter(1..=100)
        .map(|x| async move { expensive_async_work(x).await })
        .buffer_unordered(5);  // At most 5 concurrent futures
    
    futures::pin_mut!(stream);
    while let Some(result) = stream.next().await {
        println!("Result: {}", result);
    }
}
 
async fn expensive_async_work(x: i32) -> i32 {
    sleep(Duration::from_millis(100)).await;
    x * 2
}

Streams naturally support backpressure through the poll model.

Error Handling Differences

use futures::stream::{self, StreamExt};
 
// Iterator: Errors crash or need immediate handling
fn iterator_errors() {
    let data: Vec<Result<i32, &str>> = vec![Ok(1), Err("error"), Ok(3)];
    
    // Must handle errors immediately
    for result in data {
        match result {
            Ok(v) => println!("Value: {}", v),
            Err(e) => eprintln!("Error: {}", e),  // Handle or propagate
        }
    }
    
    // Or use filter_map to skip errors
    let ok_values: Vec<_> = data.into_iter()
        .filter_map(|r| r.ok())
        .collect();
}
 
// Stream: Errors can be handled asynchronously
async fn stream_errors() {
    let stream = stream::iter(vec![
        Ok(1), 
        Err("error"), 
        Ok(3)
    ]);
    
    // Handle errors in stream pipeline
    let ok_stream = stream
        .filter_map(|result| async move {
            match result {
                Ok(v) => Some(v),
                Err(e) => {
                    eprintln!("Async error handling: {}", e);
                    None
                }
            }
        });
    
    futures::pin_mut!(ok_stream);
    while let Some(value) = ok_stream.next().await {
        println!("Got: {}", value);
    }
}

Both handle errors, but streams allow async error recovery.

Concurrency Models

use futures::stream::{self, StreamExt};
 
// Iterator: Sequential processing
fn iterator_sequential() {
    let items: Vec<i32> = (1..=5)
        .map(|x| {
            // This runs sequentially
            std::thread::sleep(std::time::Duration::from_millis(100));
            x * 2
        })
        .collect();
    
    // Total time: ~500ms (5 items × 100ms)
}
 
// Stream: Concurrent processing
async fn stream_concurrent() {
    let items: Vec<i32> = stream::iter(1..=5)
        .map(|x| async move {
            sleep(std::time::Duration::from_millis(100)).await;
            x * 2
        })
        .buffer_unordered(5)  // Process all concurrently
        .collect()
        .await;
    
    // Total time: ~100ms (all run in parallel)
}

Streams can process items concurrently; iterators are inherently sequential.

Memory Usage Patterns

use futures::stream::{self, StreamExt};
 
// Iterator: Often loads all data
fn iterator_memory() {
    // Collecting into Vec loads everything
    let data: Vec<i32> = (1..=1_000_000).collect();
    
    // But iteration can be lazy
    let sum: i32 = (1..=1_000_000)
        .map(|x| x * 2)  // Lazy, no allocation
        .sum();          // Processes one at a time
}
 
// Stream: Naturally lazy
async fn stream_memory() {
    // Stream is inherently lazy
    let sum: i32 = stream::iter(1..=1_000_000)
        .map(|x| async move { x * 2 })
        .then(|fut| fut)  // Flatten the async
        .fold(0, |acc, x| async move { acc + x })
        .await;
    
    // Never loads all items at once
    
    // With external data source
    async fn infinite_stream() {
        // Stream from network - no end known
        // Can process indefinitely without memory issues
        let mut stream = network_event_stream();
        
        while let Some(event) = stream.next().await {
            process_event(event).await;
        }
    }
}

Both can be lazy, but streams are designed for potentially infinite or external data.

Cancellation

use futures::stream::{self, StreamExt};
use tokio::time::{timeout, Duration};
 
// Iterator: Hard to cancel mid-iteration
fn iterator_cancellation() {
    for item in 1..=1_000_000 {
        // Can break, but can't cancel externally
        if item > 10 {
            break;  // Manual cancellation
        }
        println!("{}", item);
    }
}
 
// Stream: Natural cancellation support
async fn stream_cancellation() {
    let stream = stream::iter(1..=1_000_000);
    
    // Timeout cancels the stream
    let result = timeout(Duration::from_millis(10), async {
        stream.for_each(|item| async move {
            println!("{}", item);
        }).await
    }).await;
    
    match result {
        Ok(_) => println!("Stream completed"),
        Err(_) => println!("Stream timed out and was cancelled"),
    }
}
 
// Take limits items
async fn stream_take_cancellation() {
    stream::iter(1..=1_000_000)
        .take(10)  // Only process 10 items, then cancel
        .for_each(|item| async move {
            println!("{}", item);
        })
        .await;
}

Streams integrate with async cancellation mechanisms.

Conversion Between Iterator and Stream

use futures::stream::{self, StreamExt};
 
// Iterator to Stream
async fn iterator_to_stream() {
    let iterator = 1..=10;
    let stream = stream::iter(iterator);
    
    // Now can use async combinators
    let doubled: Vec<i32> = stream
        .then(|x| async move { x * 2 })
        .collect()
        .await;
}
 
// Stream to Iterator (when possible)
async fn stream_to_iterator() {
    use futures::stream::TryStreamExt;
    
    let stream = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
    
    // Collect first, then iterate
    let items: Vec<i32> = stream
        .filter_map(|r| async move { r.ok() })
        .collect()
        .await;
    
    for item in items {
        println!("{}", item);
    }
    
    // Note: Can't easily convert stream to sync iterator
    // because each next() needs to await
}

Iterators easily become streams; streams are harder to convert back due to async.

When to Use Each

// Use Iterator when:
// - Data is already in memory
// - Processing is CPU-bound
// - No async I/O needed
// - Simple transformations
 
fn iterator_use_cases() {
    // In-memory collections
    let sum: i32 = [1, 2, 3, 4, 5].iter().sum();
    
    // CPU-bound transformations
    let squares: Vec<i32> = (1..=100)
        .map(|x| x * x)
        .collect();
    
    // File processing (blocking is acceptable)
    let lines: Vec<String> = std::fs::read_to_string("file.txt")
        .unwrap()
        .lines()
        .map(String::from)
        .collect();
}
 
// Use Stream when:
// - Data comes from async I/O
// - Processing involves async operations per item
// - Need concurrency or backpressure
// - Data is potentially infinite
// - Cancellation is important
 
async fn stream_use_cases() {
    // Network data
    let events = network_event_stream();
    events.for_each(|e| async move {
        process_event(e).await;
    }).await;
    
    // Async processing per item
    let results = stream::iter(urls)
        .then(|url| fetch_url(url))  // Async fetch
        .buffer_unordered(10);        // Concurrent
    
    // Infinite sources
    let timer = tokio::time::interval(Duration::from_secs(1));
    timer.for_each(|_| async move {
        println!("Tick");
    }).await;
}

Synthesis

The fundamental difference between Iterator and Stream is the execution model:

Iterator:

  • next() returns Option<Item> synchronously
  • Always returns immediately, never waits
  • Combinators are synchronous functions
  • Natural for in-memory data and CPU-bound work
  • Sequential by nature
  • Thread can block during iteration

Stream:

  • poll_next() returns Poll<Option<Item>>
  • May return Pending and wake later when data is ready
  • Combinators are async and return futures
  • Natural for I/O-bound work and external data sources
  • Supports concurrency, backpressure, and cancellation
  • Never blocks; yields to async runtime

Key practical differences:

  1. Async operations: Streams can perform async work per-item; iterators cannot.

  2. Backpressure: Streams naturally signal readiness via Poll::Pending; iterators always pull immediately.

  3. Concurrency: Streams support concurrent processing via .buffer_unordered(); iterators are sequential.

  4. Cancellation: Streams integrate with async cancellation; iterators require explicit break.

  5. I/O integration: Streams work with async I/O; iterators work with blocking I/O.

Choose iterators for in-memory, CPU-bound processing. Choose streams for I/O-bound work, concurrent processing, infinite data sources, or when you need backpressure and cancellation support.