Loading page…
Rust walkthroughs
Loading page…
futures::stream::Stream and std::iter::Iterator in terms of execution model?Stream and Iterator both represent sequences of values, but they differ fundamentally in how values are produced. Iterators yield values synchronously on demand, while streams yield values asynchronously with potential waiting between items.
The Iterator trait produces values synchronously:
trait Iterator {
type Item;
fn next(&mut self) -> Option<Self::Item>;
// Many provided methods...
}
// Simple usage
fn iterator_example() {
let numbers = vec![1, 2, 3, 4, 5];
let mut iter = numbers.into_iter();
// next() returns immediately with a value or None
assert_eq!(iter.next(), Some(1));
assert_eq!(iter.next(), Some(2));
assert_eq!(iter.next(), Some(3));
// ...
}When you call next() on an iterator, it either returns immediately with Some(item) or None if exhausted.
The Stream trait produces values asynchronously:
use futures::stream::{Stream, StreamExt};
// Simplified trait definition
trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
// Stream usage requires async
async fn stream_example() {
use futures::stream::iter;
let mut stream = iter(vec![1, 2, 3, 4, 5]);
// next() returns a future that resolves to Option<Item>
while let Some(value) = stream.next().await {
println!("Got: {}", value);
}
}When you call next() on a stream, you get a future that may wait for data to become available.
use std::time::Duration;
use futures::stream::{self, StreamExt};
use tokio::time::interval;
// Iterator: Always returns immediately
fn iterator_blocking() {
let data = vec![1, 2, 3];
let mut iter = data.into_iter();
// These calls never block
let _ = iter.next(); // Immediate
let _ = iter.next(); // Immediate
let _ = iter.next(); // Immediate
}
// Stream: Can wait between items
async fn stream_waiting() {
// Create a stream that yields every second
let mut interval_stream = interval(Duration::from_secs(1))
.map(|_| "tick");
// Each next() may wait
let _ = interval_stream.next().await; // Waits ~1 second
let _ = interval_stream.next().await; // Waits another second
let _ = interval_stream.next().await; // Waits another second
}The key difference: iterators are always ready; streams may need to wait.
use std::task::{Poll, Context};
use std::pin::Pin;
// Iterator's next is simple
fn iterator_next_example() {
let mut iter = vec![1, 2, 3].into_iter();
// Direct return, always immediate
match iter.next() {
Some(v) => println!("Got: {}", v),
None => println!("Done"),
}
}
// Stream uses Poll to indicate readiness
// Poll::Ready(Some(item)) - item available
// Poll::Ready(None) - stream finished
// Poll::Pending - not ready yet, wake me laterStreams use Poll to communicate their state to the executor.
use std::fs::File;
use std::io::{BufRead, BufReader};
use futures::stream::{Stream, StreamExt};
use tokio::fs::File as AsyncFile;
use tokio::io::{AsyncBufReadExt, BufReader as AsyncBufReader};
// Iterator: Can block
fn file_lines_iterator(path: &str) -> impl Iterator<Item = String> {
let file = File::open(path).unwrap();
let reader = BufReader::new(file);
reader.lines().map(|l| l.unwrap())
}
// This would block the current thread if reading from disk
fn use_iterator() {
for line in file_lines_iterator("large_file.txt") {
// Thread is blocked during I/O
println!("{}", line);
}
}
// Stream: Non-blocking in async context
async fn file_lines_stream(path: &str) -> impl Stream<Item = String> {
use futures::stream::unfold;
let file = AsyncFile::open(path).await.unwrap();
let reader = AsyncBufReader::new(file);
// This stream won't block - it yields to the executor during I/O
futures::stream::try_unfold(reader, |mut r| async move {
let mut line = String::new();
match r.read_line(&mut line).await {
Ok(0) => Ok(None), // EOF
Ok(_) => Ok(Some((line, r))),
Err(e) => Err(e),
}
})
.filter_map(|r| async move { r.ok() })
}Iterators can block; streams integrate with async runtimes.
use futures::stream::{self, StreamExt};
// Iterator combinators are synchronous
fn iterator_combinators() {
let result: Vec<i32> = (1..=10)
.filter(|x| x % 2 == 0) // Synchronous filter
.map(|x| x * 2) // Synchronous map
.take(3) // Synchronous take
.collect();
println!("{:?}", result); // [4, 8, 12]
}
// Stream combinators are async
async fn stream_combinators() {
let result: Vec<i32> = stream::iter(1..=10)
.filter(|x| async move { x % 2 == 0 }) // Async filter
.map(|x| async move { x * 2 }) // Async map
.take(3) // Async take
.collect()
.await;
println!("{:?}", result); // [4, 8, 12]
}Stream combinators return futures or new streams, requiring .await for execution.
use futures::stream::{self, StreamExt};
// Iterator: Can't do async in combinators
fn iterator_no_async() {
// This won't work:
// (1..=10).map(|x| async_operation(x).await)
// Would need to collect futures and then await them
let futures: Vec<_> = (1..=10)
.map(|x| async_operation(x))
.collect();
// But then you're not iterating, you're batching
}
// Stream: Async operations in combinators
async fn stream_with_async() {
let stream = stream::iter(1..=10)
.then(|x| async move {
// Each item can have async work
simulate_network_request(x).await
})
.filter(|result| async move {
// Async filtering too
result.is_ok()
});
futures::pin_mut!(stream);
while let Some(value) = stream.next().await {
println!("Got: {:?}", value);
}
}
async fn simulate_network_request(id: i32) -> Result<String, &'static str> {
// Simulated async I/O
Ok(format!("Response for {}", id))
}Streams allow async operations per-item; iterators require batching async work.
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
// Iterator: No backpressure mechanism
fn iterator_no_backpressure() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// Consumer pulls as fast as it can
for item in data {
// No way to signal "slow down" to producer
// Producer already has all data in memory
}
}
// Stream: Built-in backpressure
async fn stream_backpressure() {
let mut stream = stream::iter(1..=100)
.throttle(Duration::from_millis(100)); // Rate limit
// Stream only produces when polled
while let Some(item) = stream.next().await {
println!("Processing: {}", item);
// We control the pace by when we call next()
if item % 10 == 0 {
sleep(Duration::from_secs(1)).await; // Slow down
}
}
}
// Buffer with limits
async fn stream_buffering() {
let stream = stream::iter(1..=100)
.map(|x| async move { expensive_async_work(x).await })
.buffer_unordered(5); // At most 5 concurrent futures
futures::pin_mut!(stream);
while let Some(result) = stream.next().await {
println!("Result: {}", result);
}
}
async fn expensive_async_work(x: i32) -> i32 {
sleep(Duration::from_millis(100)).await;
x * 2
}Streams naturally support backpressure through the poll model.
use futures::stream::{self, StreamExt};
// Iterator: Errors crash or need immediate handling
fn iterator_errors() {
let data: Vec<Result<i32, &str>> = vec![Ok(1), Err("error"), Ok(3)];
// Must handle errors immediately
for result in data {
match result {
Ok(v) => println!("Value: {}", v),
Err(e) => eprintln!("Error: {}", e), // Handle or propagate
}
}
// Or use filter_map to skip errors
let ok_values: Vec<_> = data.into_iter()
.filter_map(|r| r.ok())
.collect();
}
// Stream: Errors can be handled asynchronously
async fn stream_errors() {
let stream = stream::iter(vec![
Ok(1),
Err("error"),
Ok(3)
]);
// Handle errors in stream pipeline
let ok_stream = stream
.filter_map(|result| async move {
match result {
Ok(v) => Some(v),
Err(e) => {
eprintln!("Async error handling: {}", e);
None
}
}
});
futures::pin_mut!(ok_stream);
while let Some(value) = ok_stream.next().await {
println!("Got: {}", value);
}
}Both handle errors, but streams allow async error recovery.
use futures::stream::{self, StreamExt};
// Iterator: Sequential processing
fn iterator_sequential() {
let items: Vec<i32> = (1..=5)
.map(|x| {
// This runs sequentially
std::thread::sleep(std::time::Duration::from_millis(100));
x * 2
})
.collect();
// Total time: ~500ms (5 items × 100ms)
}
// Stream: Concurrent processing
async fn stream_concurrent() {
let items: Vec<i32> = stream::iter(1..=5)
.map(|x| async move {
sleep(std::time::Duration::from_millis(100)).await;
x * 2
})
.buffer_unordered(5) // Process all concurrently
.collect()
.await;
// Total time: ~100ms (all run in parallel)
}Streams can process items concurrently; iterators are inherently sequential.
use futures::stream::{self, StreamExt};
// Iterator: Often loads all data
fn iterator_memory() {
// Collecting into Vec loads everything
let data: Vec<i32> = (1..=1_000_000).collect();
// But iteration can be lazy
let sum: i32 = (1..=1_000_000)
.map(|x| x * 2) // Lazy, no allocation
.sum(); // Processes one at a time
}
// Stream: Naturally lazy
async fn stream_memory() {
// Stream is inherently lazy
let sum: i32 = stream::iter(1..=1_000_000)
.map(|x| async move { x * 2 })
.then(|fut| fut) // Flatten the async
.fold(0, |acc, x| async move { acc + x })
.await;
// Never loads all items at once
// With external data source
async fn infinite_stream() {
// Stream from network - no end known
// Can process indefinitely without memory issues
let mut stream = network_event_stream();
while let Some(event) = stream.next().await {
process_event(event).await;
}
}
}Both can be lazy, but streams are designed for potentially infinite or external data.
use futures::stream::{self, StreamExt};
use tokio::time::{timeout, Duration};
// Iterator: Hard to cancel mid-iteration
fn iterator_cancellation() {
for item in 1..=1_000_000 {
// Can break, but can't cancel externally
if item > 10 {
break; // Manual cancellation
}
println!("{}", item);
}
}
// Stream: Natural cancellation support
async fn stream_cancellation() {
let stream = stream::iter(1..=1_000_000);
// Timeout cancels the stream
let result = timeout(Duration::from_millis(10), async {
stream.for_each(|item| async move {
println!("{}", item);
}).await
}).await;
match result {
Ok(_) => println!("Stream completed"),
Err(_) => println!("Stream timed out and was cancelled"),
}
}
// Take limits items
async fn stream_take_cancellation() {
stream::iter(1..=1_000_000)
.take(10) // Only process 10 items, then cancel
.for_each(|item| async move {
println!("{}", item);
})
.await;
}Streams integrate with async cancellation mechanisms.
use futures::stream::{self, StreamExt};
// Iterator to Stream
async fn iterator_to_stream() {
let iterator = 1..=10;
let stream = stream::iter(iterator);
// Now can use async combinators
let doubled: Vec<i32> = stream
.then(|x| async move { x * 2 })
.collect()
.await;
}
// Stream to Iterator (when possible)
async fn stream_to_iterator() {
use futures::stream::TryStreamExt;
let stream = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
// Collect first, then iterate
let items: Vec<i32> = stream
.filter_map(|r| async move { r.ok() })
.collect()
.await;
for item in items {
println!("{}", item);
}
// Note: Can't easily convert stream to sync iterator
// because each next() needs to await
}Iterators easily become streams; streams are harder to convert back due to async.
// Use Iterator when:
// - Data is already in memory
// - Processing is CPU-bound
// - No async I/O needed
// - Simple transformations
fn iterator_use_cases() {
// In-memory collections
let sum: i32 = [1, 2, 3, 4, 5].iter().sum();
// CPU-bound transformations
let squares: Vec<i32> = (1..=100)
.map(|x| x * x)
.collect();
// File processing (blocking is acceptable)
let lines: Vec<String> = std::fs::read_to_string("file.txt")
.unwrap()
.lines()
.map(String::from)
.collect();
}
// Use Stream when:
// - Data comes from async I/O
// - Processing involves async operations per item
// - Need concurrency or backpressure
// - Data is potentially infinite
// - Cancellation is important
async fn stream_use_cases() {
// Network data
let events = network_event_stream();
events.for_each(|e| async move {
process_event(e).await;
}).await;
// Async processing per item
let results = stream::iter(urls)
.then(|url| fetch_url(url)) // Async fetch
.buffer_unordered(10); // Concurrent
// Infinite sources
let timer = tokio::time::interval(Duration::from_secs(1));
timer.for_each(|_| async move {
println!("Tick");
}).await;
}The fundamental difference between Iterator and Stream is the execution model:
Iterator:
next() returns Option<Item> synchronouslyStream:
poll_next() returns Poll<Option<Item>>Pending and wake later when data is readyKey practical differences:
Async operations: Streams can perform async work per-item; iterators cannot.
Backpressure: Streams naturally signal readiness via Poll::Pending; iterators always pull immediately.
Concurrency: Streams support concurrent processing via .buffer_unordered(); iterators are sequential.
Cancellation: Streams integrate with async cancellation; iterators require explicit break.
I/O integration: Streams work with async I/O; iterators work with blocking I/O.
Choose iterators for in-memory, CPU-bound processing. Choose streams for I/O-bound work, concurrent processing, infinite data sources, or when you need backpressure and cancellation support.