How do I work with Rayon for Data Parallelism in Rust?
Walkthrough
Rayon is a data parallelism library that makes it easy to convert sequential computations into parallel ones. It provides a work-stealing thread pool and parallel iterators that automatically distribute work across multiple CPU cores. The key insight is that Rayon guarantees the same result as sequential execution, just faster.
Key concepts:
- Parallel iterators —
.par_iter()instead of.iter() - Work stealing — Threads steal tasks from each other for load balancing
- Data parallelism — Same operation on different data elements
- Join — Run two closures in parallel
- Scope — Create scoped parallel tasks
When to use Rayon:
- CPU-bound computations on collections
- Data processing pipelines
- Map/filter/reduce operations on large datasets
- Image processing, simulations, scientific computing
When NOT to use Rayon:
- I/O-bound operations (use async instead)
- Very small collections (overhead > benefit)
- Operations with side effects
- When order must be preserved strictly
Code Examples
Basic Parallel Iteration
use rayon::prelude::*;
fn main() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// Sequential
let sum: i32 = data.iter().sum();
println!("Sequential sum: {}", sum);
// Parallel (just add par_)
let parallel_sum: i32 = data.par_iter().sum();
println!("Parallel sum: {}", parallel_sum);
}Parallel Map
use rayon::prelude::*;
fn main() {
let data = vec![1, 2, 3, 4, 5];
// Parallel map
let squares: Vec<i32> = data.par_iter()
.map(|x| x * x)
.collect();
println!("Squares: {:?}", squares);
// Parallel map with expensive computation
let results: Vec<i32> = data.par_iter()
.map(|x| {
// Simulate expensive work
(1..=*x).sum()
})
.collect();
println!("Results: {:?}", results);
}Parallel Filter
use rayon::prelude::*;
fn main() {
let data: Vec<i32> = (1..=100).collect();
// Parallel filter
let evens: Vec<i32> = data.par_iter()
.filter(|x| *x % 2 == 0)
.cloned()
.collect();
println!("Even numbers: {} found", evens.len());
}Parallel Filter Map
use rayon::prelude::*;
fn main() {
let data = vec!["1", "2", "three", "4", "five", "6"];
// Parse strings to numbers, skip failures
let numbers: Vec<i32> = data.par_iter()
.filter_map(|s| s.parse().ok())
.collect();
println!("Parsed numbers: {:?}", numbers);
}Parallel Reduce
use rayon::prelude::*;
fn main() {
let data: Vec<i32> = (1..=100).collect();
// Parallel reduce (sum)
let sum = data.par_iter().reduce(|| 0, |a, b| a + b);
println!("Sum: {}", sum);
// Parallel reduce (max)
let max = data.par_iter().reduce(|| i32::MIN, |a, b| a.max(b));
println!("Max: {}", max);
# Parallel reduce (product)
let product: i32 = data.par_iter().take(5).reduce(|| 1, |a, b| a * b);
println!("Product of first 5: {}", product);
}Parallel Fold
use rayon::prelude::*;
fn main() {
let data: Vec<i32> = (1..=10).collect();
# Parallel fold
let sum = data.par_iter().fold(|| 0, |acc, x| acc + x);
println!("Sum: {}", sum);
# Fold then reduce
let total = data.par_iter()
.fold(|| 0, |acc, x| acc + x)
.reduce(|| 0, |a, b| a + b);
println!("Total: {}", total);
}Parallel For Each
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
fn main() {
let data: Vec<i32> = (1..=100).collect();
let counter = Arc::new(AtomicUsize::new(0));
# Parallel for_each
data.par_iter().for_each(|x| {
# Process each item
counter.fetch_add(1, Ordering::Relaxed);
});
println!("Processed {} items", counter.load(Ordering::Relaxed));
}Parallel Find and Any
use rayon::prelude::*;
fn main() {
let data: Vec<i32> = (1..=1000).collect();
# Parallel find
let found = data.par_iter().find(|x| **x > 500);
println!("Found: {:?}", found);
# Parallel find_first
let first = data.par_iter().find_first(|x| **x > 500);
println!("First > 500: {:?}", first);
# Parallel any
let has_large = data.par_iter().any(|x| *x > 900);
println!("Has number > 900: {}", has_large);
# Parallel all
let all_positive = data.par_iter().all(|x| *x > 0);
println!("All positive: {}", all_positive);
}Parallel Sort
use rayon::prelude::*;
fn main() {
let mut data = vec![5, 3, 8, 1, 9, 2, 7, 4, 6, 0];
# Parallel sort
data.par_sort();
println!("Sorted: {:?}", data);
# Parallel sort unstable (faster)
let mut data2 = vec![5, 3, 8, 1, 9, 2, 7, 4, 6, 0];
data2.par_sort_unstable();
println!("Sorted unstable: {:?}", data2);
# Parallel sort by
let mut pairs = vec![(2, 'b'), (1, 'a'), (3, 'c')];
pairs.par_sort_by(|a, b| a.0.cmp(&b.0));
println!("Sorted pairs: {:?}", pairs);
}Parallel Partition
use rayon::prelude::*;
fn main() {
let data: Vec<i32> = (1..=10).collect();
# Partition into two vectors
let (evens, odds): (Vec<i32>, Vec<i32>) = data.par_iter()
.partition(|x| *x % 2 == 0);
println!("Evens: {:?}", evens);
println!("Odds: {:?}", odds);
}Join for Fork-Join Parallelism
use rayon::prelude::*;
fn main() {
let data1 = vec![1, 2, 3, 4, 5];
let data2 = vec![6, 7, 8, 9, 10];
# Run two computations in parallel
let (sum1, sum2) = rayon::join(
|| data1.iter().sum::<i32>(),
|| data2.iter().sum::<i32>()
);
println!("Sum 1: {}, Sum 2: {}", sum1, sum2);
}Recursive Parallelism
use rayon::prelude::*;
fn quick_sort<T: Ord + Send>(mut data: Vec<T>) -> Vec<T> {
if data.len() <= 1 {
return data;
}
let pivot = data.swap_remove(0);
let (less, greater): (Vec<T>, Vec<T>) = data
.into_iter()
.partition(|x| *x < pivot);
# Sort partitions in parallel
let (sorted_less, sorted_greater) = rayon::join(
|| quick_sort(less),
|| quick_sort(greater)
);
let mut result = sorted_less;
result.push(pivot);
result.extend(sorted_greater);
result
}
fn main() {
let data = vec![5, 3, 8, 1, 9, 2, 7, 4, 6, 0];
let sorted = quick_sort(data);
println!("Sorted: {:?}", sorted);
}Scoped Threads
use rayon::prelude::*;
fn main() {
let mut data = vec![1, 2, 3, 4, 5];
# Scoped parallel access
rayon::scope(|s| {
s.spawn(|_| {
data[0] = 10;
});
s.spawn(|_| {
data[1] = 20;
});
});
println!("Modified data: {:?}", data);
}Thread Pool Configuration
use rayon::{ThreadPool, ThreadPoolBuilder};
fn main() {
# Create custom thread pool
let pool: ThreadPool = ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap();
let data: Vec<i32> = (1..=100).collect();
# Use the pool
let sum: i32 = pool.install(|| {
data.par_iter().sum()
});
println!("Sum: {}", sum);
}Current Thread Index
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::Mutex;
fn main() {
let data: Vec<i32> = (1..=1000).collect();
let per_thread: Mutex<HashMap<usize, usize>> = Mutex::new(HashMap::new());
data.par_iter().for_each(|_| {
let thread_idx = rayon::current_thread_index().unwrap();
*per_thread.lock().unwrap().entry(thread_idx).or_insert(0) += 1;
});
println!("Work per thread: {:?}", per_thread.into_inner().unwrap());
}Flat Map Parallel
use rayon::prelude::*;
fn main() {
let data = vec![[1, 2], [3, 4], [5, 6]];
# Flatten nested structures in parallel
let flat: Vec<i32> = data.par_iter()
.flat_map(|arr| arr.iter().cloned())
.collect();
println!("Flattened: {:?}", flat);
}Parallel Zip
use rayon::prelude::*;
fn main() {
let a = vec![1, 2, 3, 4, 5];
let b = vec![10, 20, 30, 40, 50];
# Zip two iterators in parallel
let sums: Vec<i32> = a.par_iter()
.zip(b.par_iter())
.map(|(x, y)| x + y)
.collect();
println!("Sums: {:?}", sums);
}Parallel Chains
use rayon::prelude::*;
fn main() {
let a = vec![1, 2, 3];
let b = vec![4, 5, 6];
# Chain parallel iterators
let combined: Vec<i32> = a.par_iter()
.chain(b.par_iter())
.cloned()
.collect();
println!("Combined: {:?}", combined);
}Image Processing Example
use rayon::prelude::*;
struct Pixel {
r: u8,
g: u8,
b: u8,
}
impl Pixel {
fn grayscale(&self) -> u8 {
((self.r as u16 + self.g as u16 + self.b as u16) / 3) as u8
}
fn invert(&self) -> Self {
Pixel {
r: 255 - self.r,
g: 255 - self.g,
b: 255 - self.b,
}
}
}
fn main() {
let mut image: Vec<Pixel> = (0..100_000)
.map(|i| Pixel { r: i as u8, g: (i * 2) as u8, b: (i * 3) as u8 })
.collect();
# Parallel image processing
image.par_iter_mut().for_each(|pixel| {
let gray = pixel.grayscale();
pixel.r = gray;
pixel.g = gray;
pixel.b = gray;
});
println!("Processed {} pixels", image.len());
}Matrix Operations
use rayon::prelude::*;
type Matrix = Vec<Vec<f64>>;
fn matrix_multiply(a: &Matrix, b: &Matrix) -> Matrix {
let n = a.len();
let m = b[0].len();
let p = b.len();
(0..n).into_par_iter()
.map(|i| {
(0..m).map(|j| {
(0..p).map(|k| a[i][k] * b[k][j]).sum()
}).collect()
})
.collect()
}
fn main() {
let a = vec![
vec![1.0, 2.0],
vec![3.0, 4.0],
];
let b = vec![
vec![5.0, 6.0],
vec![7.0, 8.0],
];
let result = matrix_multiply(&a, &b);
println!("Result: {:?}", result);
}Word Frequency Counter
use rayon::prelude::*;
use std::collections::HashMap;
fn main() {
let text = "the quick brown fox jumps over the lazy dog the fox was quick";
let words: Vec<&str> = text.split_whitespace().collect();
# Parallel word frequency counting
let freq: HashMap<&str, usize> = words.par_iter()
.fold(
|| HashMap::new(),
|mut map, word| {
*map.entry(word).or_insert(0) += 1;
map
}
)
.reduce(
|| HashMap::new(),
|mut a, b| {
for (word, count) in b {
*a.entry(word).or_insert(0) += count;
}
a
}
);
println!("Word frequencies: {:?}", freq);
}Benchmark Comparison
use rayon::prelude::*;
use std::time::Instant;
fn main() {
let data: Vec<i32> = (1..=10_000_000).collect();
# Sequential
let start = Instant::now();
let seq_sum: i32 = data.iter().sum();
let seq_time = start.elapsed();
# Parallel
let start = Instant::now();
let par_sum: i32 = data.par_iter().sum();
let par_time = start.elapsed();
println!("Sequential sum: {} ({:?})", seq_sum, seq_time);
println!("Parallel sum: {} ({:?})", par_sum, par_time);
println!("Speedup: {:.2}x", seq_time.as_secs_f64() / par_time.as_secs_f64());
}Collect into Different Types
use rayon::prelude::*;
use std::collections::{HashMap, HashSet};
fn main() {
let data = vec![1, 2, 3, 4, 5, 1, 2, 3];
# Collect into Vec
let vec: Vec<i32> = data.par_iter().cloned().collect();
println!("Vec: {:?}", vec);
# Collect into HashSet
let set: HashSet<i32> = data.par_iter().cloned().collect();
println!("Set: {:?}", set);
# Collect into HashMap
let map: HashMap<i32, i32> = data.par_iter()
.map(|x| (*x, x * x))
.collect();
println!("Map: {:?}", map);
}Summary
Rayon Key Imports:
use rayon::prelude::*; // Parallel iterator traits
use rayon::join; // Fork-join parallelism
use rayon::scope; // Scoped threadsSequential to Parallel Conversion:
| Sequential | Parallel |
|---|---|
.iter() |
.par_iter() |
.iter_mut() |
.par_iter_mut() |
.into_iter() |
.into_par_iter() |
Parallel Iterator Methods:
| Method | Description |
|---|---|
map |
Transform each element |
filter |
Keep matching elements |
filter_map |
Filter and map combined |
fold |
Accumulate with identity |
reduce |
Combine all elements |
for_each |
Execute action on each |
collect |
Collect into container |
find |
Find matching element |
any |
Check if any matches |
all |
Check if all match |
sort |
Sort in parallel |
Key Functions:
| Function | Description |
|---|---|
join(f1, f2) |
Run two closures in parallel |
scope(s) |
Create scoped parallel tasks |
spawn(f) |
Spawn a task in scope |
Performance Considerations:
| Factor | Recommendation |
|---|---|
| Collection size | Large collections (> 1000 elements) |
| Work per item | Expensive operations benefit more |
| Overhead | Small collections may be slower |
| Memory | Parallel requires more memory |
Key Points:
- Just add
par_to convert iterators to parallel - Work-stealing thread pool for load balancing
joinfor fork-join parallelismscopefor spawning tasks with borrowed data- Perfect for CPU-bound computations
- Avoid I/O operations in parallel iterators
- Results are deterministic like sequential
- Custom thread pools for fine control
