How does rayon::iter::IntoParallelIterator convert sequential collections for parallel processing?
rayon::iter::IntoParallelIterator is a trait that converts sequential collections into parallel iterators, enabling data-parallel operations by splitting collections into chunks that can be processed concurrently across multiple threads. The trait provides the bridge between standard Rust collections and Rayon's parallel execution model, automatically determining how to partition data for optimal parallel processing.
Basic Comparison: Sequential vs Parallel Iteration
use rayon::prelude::*;
fn comparison() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// Sequential iteration (standard library):
let sum: i32 = data.iter().sum();
// Parallel iteration (Rayon):
let sum: i32 = data.par_iter().sum();
// IntoParallelIterator enables .par_iter() conversion:
// par_iter() comes from IntoParallelIterator trait
}IntoParallelIterator provides the into_par_iter() method; IntoParallelRefIterator provides par_iter().
The IntoParallelIterator Trait
use rayon::iter::{IntoParallelIterator, ParallelIterator};
// The trait definition:
pub trait IntoParallelIterator {
type Iter: ParallelIterator<Item = Self::Item>;
type Item: Send;
fn into_par_iter(self) -> Self::Iter;
}
// IntoParallelRefIterator for references:
pub trait IntoParallelRefIterator<'data> {
type Iter: ParallelIterator<Item = Self::Item>;
type Item: Send + 'data;
fn par_iter(&'data self) -> Self::Iter;
}IntoParallelIterator consumes the collection; par_iter() iterates by reference.
Basic Usage with Vec
use rayon::prelude::*;
fn vec_usage() {
let data = vec![1, 2, 3, 4, 5];
// Reference iteration (doesn't consume):
let doubled: Vec<i32> = data.par_iter()
.map(|x| x * 2)
.collect();
// data still available
// Owned iteration (consumes):
let sum: i32 = data.into_par_iter()
.sum();
// data consumed
}par_iter() borrows; into_par_iter() takes ownership.
How Collections Split for Parallelism
use rayon::prelude::*;
fn splitting() {
let data: Vec<i32> = (0..1000).collect();
// Rayon splits the collection into chunks:
// Each thread processes a chunk
// Results are combined at the end
// The split is based on:
// - Number of available threads
// - Work-stealing dynamics
// - Minimum split threshold
// Example: 8 threads, 1000 items
// Might split into chunks of ~125 items each
// But actual split is dynamic and adaptive
data.par_iter().for_each(|x| {
// This runs in parallel across threads
// Order of execution is not guaranteed
});
}Rayon uses work-stealing to dynamically distribute work across threads.
Implementing for Custom Types
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::slice::Iter;
use std::slice;
struct MyCollection<T> {
data: Vec<T>,
}
impl<'a, T: Send + Sync + 'a> IntoParallelIterator for &'a MyCollection<T> {
type Iter = Iter<'a, T>;
type Item = &'a T;
fn into_par_iter(self) -> Self::Iter {
self.data.par_iter()
}
}
impl<T: Send> IntoParallelIterator for MyCollection<T> {
type Iter = rayon::vec::IntoIter<T>;
type Item = T;
fn into_par_iter(self) -> Self::Iter {
self.data.into_par_iter()
}
}Custom types implement IntoParallelIterator by delegating to their inner collection.
Work Splitting Algorithm
use rayon::prelude::*;
fn work_splitting() {
let data: Vec<i32> = (0..1_000_000).collect();
// Rayon's splitting works like this:
// 1. Initial: Split into ~num_threads chunks
// 2. Each thread processes its chunk
// 3. If thread finishes early, it "steals" work from others
// 4. Split continues until minimum granularity reached
// Minimum split is controlled by:
// - The split iterator's behavior
// - Adaptive algorithm based on workload
// You can see splits with fold:
let chunk_count = data.par_iter()
.fold(|| 0, |count, _| count + 1)
.reduce(|| 0, |a, b| a + b);
// This counts how many elements each thread processed
}Work-stealing enables load balancing across threads dynamically.
Send Bound Requirement
use rayon::prelude::*;
// Items must be Send for parallel iteration:
fn send_requirement() {
// This works: i32 is Send
let data: Vec<i32> = vec![1, 2, 3];
let sum: i32 = data.par_iter().sum();
// This would fail if T were not Send:
// use std::rc::Rc;
// let data: Vec<Rc<i32>> = vec![Rc::new(1)];
// data.par_iter().sum(); // Compile error: Rc is not Send
// Send is required because items move between threads
}IntoParallelIterator requires Item: Send because data moves across thread boundaries.
Sync Bound for References
use rayon::prelude::*;
use std::sync::Arc;
fn sync_requirement() {
// When iterating by reference, items must be Sync:
// Arc<T> is Send + Sync when T is Send + Sync:
let data: Vec<Arc<String>> = vec![Arc::new("hello".to_string())];
// Can iterate in parallel:
data.par_iter().for_each(|s| {
println!("{}", s);
});
// References to data must be Sync for par_iter:
// because multiple threads may read the same data
}par_iter() requires Item: Sync because multiple threads may reference the same data.
Split Iterators and Granularity
use rayon::prelude::*;
fn granularity() {
let data: Vec<i32> = (0..100).collect();
// Default splitting is adaptive:
// - Large collections: aggressive splitting
// - Small collections: minimal splitting
// For small workloads, overhead may exceed benefit:
let sum: i32 = data.par_iter().sum(); // Overhead may dominate
// For large workloads, parallelism shines:
let data: Vec<i32> = (0..1_000_000).collect();
let sum: i32 = data.par_iter().sum(); // Worth the overhead
// Work cost affects optimal split:
// Cheap operations: need larger chunks
// Expensive operations: can use smaller chunks
// Rayon adapts based on available parallelism:
// let pool = rayon::ThreadPoolBuilder::new()
// .num_threads(4)
// .build();
// pool.install(|| {
// data.par_iter().sum()
// });
}Rayon's split algorithm adapts to workload size and operation cost.
Parallel Iterator Adapters
use rayon::prelude::*;
fn adapters() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// Map:
let doubled: Vec<i32> = data.par_iter()
.map(|x| x * 2)
.collect();
// Filter:
let evens: Vec<&i32> = data.par_iter()
.filter(|x| *x % 2 == 0)
.collect();
// Filter + Map:
let processed: Vec<i32> = data.par_iter()
.filter_map(|x| if *x % 2 == 0 { Some(*x * 2) } else { None })
.collect();
// Flat map:
let expanded: Vec<i32> = data.par_iter()
.flat_map(|x| vec![*x, *x * 10])
.collect();
// Fold (parallel reduce):
let sum: i32 = data.par_iter()
.fold(|| 0, |acc, x| acc + x)
.reduce(|| 0, |a, b| a + b);
}IntoParallelIterator enables all parallel iterator adapters after conversion.
Reduction Operations
use rayon::prelude::*;
fn reduction() {
let data: Vec<i32> = (0..1000).collect();
// Sum (built-in):
let sum: i32 = data.par_iter().sum();
// Reduce (custom reduction):
let product: i32 = data.par_iter()
.reduce(|| 1, |a, b| a * b);
// Reduce with identity:
let max: Option<i32> = data.par_iter()
.copied()
.reduce(|a, b| if a > b { a } else { b });
// Fold + reduce (two-phase reduction):
// fold: per-thread accumulator
// reduce: combine thread results
let sum: i32 = data.par_iter()
.fold(|| 0, |acc, &x| acc + x)
.reduce(|| 0, |a, b| a + b);
}Parallel reduction combines results from each thread.
Collection Types
use rayon::prelude::*;
use std::collections::{HashMap, BTreeMap, HashSet};
fn collection_types() {
// Vec:
let vec = vec![1, 2, 3, 4, 5];
let sum: i32 = vec.par_iter().sum();
// Array:
let arr = [1, 2, 3, 4, 5];
let sum: i32 = arr.par_iter().sum();
// Range:
let sum: i32 = (0..100).into_par_iter().sum();
// Slice:
let slice = &[1, 2, 3, 4, 5];
let sum: i32 = slice.par_iter().sum();
// HashMap (into parallel iterator):
let map: HashMap<u32, String> = HashMap::new();
map.into_par_iter().for_each(|(k, v)| {
// Process key-value pairs in parallel
});
// HashSet:
let set: HashSet<i32> = vec![1, 2, 3].into_iter().collect();
set.par_iter().for_each(|x| {
// Process elements in parallel
});
}IntoParallelIterator is implemented for standard collections.
Range Parallel Iteration
use rayon::prelude::*;
fn range_iteration() {
// Ranges support parallel iteration:
let sum: i64 = (0..1_000_000)
.into_par_iter()
.map(|x| x as i64)
.sum();
// Inclusive ranges:
let sum: i64 = (0..=100)
.into_par_iter()
.sum();
// Step by (not directly parallel):
// StepIterator doesn't implement IntoParallelIterator
// Instead, use map with filter:
let sum: i64 = (0..100)
.into_par_iter()
.filter(|x| x % 2 == 0) // Even numbers only
.sum();
}Ranges convert directly to parallel iterators via into_par_iter().
Sequential Fallback
use rayon::prelude::*;
fn sequential_fallback() {
// Small collections may not benefit from parallelism:
let small: Vec<i32> = vec![1, 2, 3];
// Rayon still processes in parallel, but overhead dominates
// For very small collections, sequential is often faster:
let sum: i32 = small.iter().sum(); // Might be faster
// For expensive operations, even small data benefits:
let hashes: Vec<String> = small.par_iter()
.map(|x| expensive_hash(*x))
.collect();
// Parallel is worth it if operation is expensive
}
fn expensive_hash(x: i32) -> String {
// Simulate expensive computation
format!("{:?}", x)
}Small collections or cheap operations may be faster sequentially.
Thread Pool Integration
use rayon::prelude::*;
use rayon::ThreadPool;
fn thread_pool() {
// Custom thread pool:
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap();
let data: Vec<i32> = (0..1000).collect();
// Run parallel work in custom pool:
let sum: i32 = pool.install(|| {
data.par_iter().sum()
});
// Global pool (default):
let sum: i32 = data.par_iter().sum(); // Uses global pool
}Parallel iteration uses Rayon's global thread pool by default.
Error Handling in Parallel
use rayon::prelude::*;
fn error_handling() -> Result<i32, String> {
let data: Vec<i32> = (0..100).collect();
// try_fold and try_reduce for fallible operations:
let result: Result<i32, String> = data.par_iter()
.try_fold(|| Ok(0), |acc, &x| {
if x < 50 {
Ok(acc + x)
} else {
Err(format!("Value {} too large", x))
}
})
.try_reduce(|| Ok(0), |a, b| Ok(a + b));
result
}
fn panic_handling() {
let data: Vec<i32> = vec![1, 2, 3, 4, 5];
// Panics propagate:
let result = std::panic::catch_unwind(|| {
data.par_iter()
.map(|x| {
if *x == 3 {
panic!("Found 3!");
}
*x * 2
})
.collect::<Vec<_>>()
});
// Result is Err due to panic
assert!(result.is_err());
}Parallel operations propagate errors and panics across threads.
Collecting Results
use rayon::prelude::*;
fn collecting() {
let data: Vec<i32> = (0..10).collect();
// Collect into Vec (order preserved):
let doubled: Vec<i32> = data.par_iter()
.map(|x| x * 2)
.collect();
// Collect into other collections:
use std::collections::HashSet;
let set: HashSet<i32> = data.par_iter()
.copied()
.collect();
// Collect into map:
use std::collections::HashMap;
let map: HashMap<i32, i32> = data.par_iter()
.map(|&x| (x, x * 2))
.collect();
// Collect with FromParallelIterator:
// Many collections implement FromParallelIterator
}collect() preserves order and works with standard collections.
Performance Characteristics
use rayon::prelude::*;
use std::time::Instant;
fn performance() {
let data: Vec<i64> = (0..10_000_000).collect();
// Sequential:
let start = Instant::now();
let sum: i64 = data.iter().sum();
let seq_time = start.elapsed();
// Parallel:
let start = Instant::now();
let sum: i64 = data.par_iter().sum();
let par_time = start.elapsed();
// Parallel is typically faster for:
// - Large collections
// - CPU-bound operations
// - Operations with similar cost per item
// Parallel may be slower for:
// - Small collections
// - I/O-bound operations
// - Operations with varying cost
// - Very cheap operations
println!("Sequential: {:?}", seq_time);
println!("Parallel: {:?}", par_time);
}Parallel iteration benefits increase with data size and operation cost.
Nested Parallel Iteration
use rayon::prelude::*;
fn nested() {
let matrix: Vec<Vec<i32>> = vec![
vec![1, 2, 3],
vec![4, 5, 6],
vec![7, 8, 9],
];
// Parallel iteration over rows:
let row_sums: Vec<i32> = matrix.par_iter()
.map(|row| row.iter().sum())
.collect();
// Nested parallel iteration:
let flat: Vec<i32> = matrix.par_iter()
.flat_map(|row| row.par_iter().copied())
.collect();
// Be careful with nesting depth:
// Too much parallelism can cause overhead
}Nested parallel operations can be effective but watch for overhead.
Real-World Example: Parallel Processing
use rayon::prelude::*;
struct DataProcessor;
impl DataProcessor {
fn process_batch(items: &[String]) -> Vec<ProcessedItem> {
items.iter()
.map(|s| ProcessedItem {
hash: Self::hash(s),
length: s.len(),
})
.collect()
}
fn hash(s: &str) -> u64 {
// Simulate expensive hashing
let mut h = 0;
for c in s.chars() {
h = h.wrapping_mul(31).wrapping_add(c as u64);
}
h
}
fn process_all(items: &[String]) -> Vec<ProcessedItem> {
// Process batches in parallel:
items.par_iter()
.map(|s| ProcessedItem {
hash: Self::hash(s),
length: s.len(),
})
.collect()
}
}
#[derive(Debug)]
struct ProcessedItem {
hash: u64,
length: usize,
}
fn real_world() {
let items: Vec<String> = (0..1000)
.map(|i| format!("item_{}", i))
.collect();
let processed = DataProcessor::process_all(&items);
println!("Processed {} items", processed.len());
}Parallel processing shines for CPU-bound operations on large datasets.
Real-World Example: Parallel Map
use rayon::prelude::*;
use std::collections::HashMap;
struct User {
id: u32,
name: String,
}
fn parallel_map() {
let users: Vec<User> = (0..100)
.map(|i| User {
id: i,
name: format!("User{}", i),
})
.collect();
// Transform to HashMap in parallel:
let user_map: HashMap<u32, String> = users.par_iter()
.map(|user| (user.id, user.name.clone()))
.collect();
// Process and filter:
let active_users: Vec<User> = users.into_par_iter()
.filter(|user| user.id % 2 == 0)
.collect();
println!("Active users: {}", active_users.len());
}IntoParallelIterator enables parallel transformations and filtering.
Key Points
fn key_points() {
// 1. IntoParallelIterator converts collections to parallel iterators
// 2. par_iter() borrows, into_par_iter() consumes
// 3. Items must be Send (or Send + Sync for references)
// 4. Work is split across threads using work-stealing
// 5. Split is adaptive based on collection size and thread count
// 6. Parallel overhead may exceed benefit for small collections
// 7. Implementation delegates to underlying collection's split
// 8. Enables map, filter, fold, reduce, and other adapters
// 9. collect() preserves order for Vec
// 10. Works with ranges, slices, Vec, HashMap, HashSet
// 11. Uses global thread pool by default
// 12. Custom thread pools via ThreadPoolBuilder
// 13. Error handling via try_fold/try_reduce
// 14. Panics propagate from parallel execution
// 15. Best for large collections and CPU-bound work
}Key insight: IntoParallelIterator is the foundational trait that bridges Rust's sequential collections to Rayon's parallel execution model. When you call par_iter() or into_par_iter(), the trait implementation determines how the collection splits into chunks for parallel processing. Rayon's work-stealing scheduler then distributes these chunks across the thread pool, stealing work from busy threads to keep all threads utilized. The Send bound on Item is essential because data moves between threads during this process. The conversion itself is essentially free—it just wraps the collection in a parallel iterator type—but the real magic happens when you chain operations like map, filter, and reduce, which are then executed in parallel. This design means you write the same iterator chain you would for sequential code, but with par_iter() instead of iter(), and Rayon handles the parallelism transparently.
