How does rayon::iter::ParallelIterator::for_each_with share state across parallel iterations safely?
rayon::iter::ParallelIterator::for_each_with provides each thread worker with its own clone of an initial value, allowing parallel closures to mutate thread-local state efficiently without synchronization overhead. Each worker thread receives a cloned copy of the initial value and maintains exclusive access to it, eliminating contention while still enabling stateful parallel processing.
Basic for_each_with Usage
use rayon::prelude::*;
fn basic_for_each_with() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// Create a shared state that will be cloned for each thread
let buffer: Vec<i32> = Vec::new();
data.par_iter()
.for_each_with(buffer, |local_buffer, &item| {
// Each thread gets its own clone of buffer
// local_buffer is &mut Vec<i32> - mutable reference to thread-local copy
local_buffer.push(item * 2);
// This is thread-local: no synchronization needed
// Each thread has its own independent buffer
});
// Note: Changes to local_buffer are lost when thread finishes
// Use for_each_with for side effects that don't need collection
}Each thread gets its own cloned buffer to mutate without synchronization.
Understanding the Clone Pattern
use rayon::prelude::*;
fn clone_pattern() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
// Initial state - must implement Clone
let initial_value = String::from("Items: ");
data.par_iter()
.for_each_with(initial_value, |local_string, &item| {
// Each thread gets: initial_value.clone()
// local_string is &mut to thread's clone
local_string.push_str(&item.to_string());
local_string.push_str(", ");
// Thread-local mutation - no lock needed
println!("Thread local: {}", local_string);
});
// Important: Each thread's local_string is independent
// Final state is NOT returned - use for_each_init for collection
}The initial value is cloned once per worker thread, not per item.
Thread Pool and Worker Threads
use rayon::prelude::*;
use std::cell::RefCell;
fn worker_thread_behavior() {
let data: Vec<i32> = (0..100).collect();
// Rayon uses a thread pool with N worker threads
// for_each_with clones the value for each worker thread
// NOT for each item
// If there are 4 worker threads:
// - 4 clones of initial state created
// - Each worker processes ~25 items with its own state
let initial = Vec::new();
data.par_iter()
.for_each_with(initial, |local_vec, &item| {
// local_vec is the same across multiple items on same thread
local_vec.push(item);
// Thread 1 might process: 0, 1, 2, 3, ... with same local_vec
// Thread 2 might process: 25, 26, 27, ... with different local_vec
});
}Clones are per worker thread, not per iteration item.
Comparison: for_each vs for_each_with vs for_each_init
use rayon::prelude::*;
use std::sync::Mutex;
fn comparison() {
let data: Vec<i32> = (0..1000).collect();
// 1. for_each: No shared state
data.par_iter()
.for_each(|&item| {
// Cannot accumulate state across iterations
// Each iteration is independent
});
// 2. for_each_with: Shared state cloned per thread
let initial = Vec::new();
data.par_iter()
.for_each_with(initial, |local_vec, &item| {
// Each thread gets initial.clone()
// local_vec is mutable, thread-local
// No synchronization needed
local_vec.push(item);
});
// 3. for_each_init: Fresh state per thread
data.par_iter()
.for_each_init(
|| Vec::new(), // Called once per thread
|local_vec, &item| {
// Each thread gets fresh Vec from initializer
local_vec.push(item);
}
);
// 4. Mutex: Synchronized shared state (slower)
let shared = Mutex::new(Vec::new());
data.par_iter()
.for_each(|&item| {
let mut guard = shared.lock().unwrap();
guard.push(item);
// Lock contention! Performance penalty
});
}Each approach has different semantics and performance characteristics.
When to Use for_each_with
use rayon::prelude::*;
use std::fs::File;
use std::io::Write;
fn when_to_use() {
let files: Vec<String> = vec
!["file1.txt", "file2.txt", "file3.txt"];
// Use for_each_with when:
// 1. You need stateful processing per thread
// 2. State creation is expensive - clone once per thread
// 3. State implements Clone efficiently
// Example: Buffer for batched writes
let buffer = Vec::new();
files.par_iter()
.for_each_with(buffer, |local_buffer, filename| {
// Accumulate in thread-local buffer
local_buffer.push(format!("Processing: {}\n", filename));
// Flush when buffer is full
if local_buffer.len() >= 10 {
// Write batch...
local_buffer.clear();
}
});
// Example: Thread-local counter
let counter = std::sync::atomic::AtomicUsize::new(0);
files.par_iter()
.for_each_with(counter, |local_counter, _filename| {
// Each thread has its own AtomicUsize
local_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
}Use for_each_with when state creation is expensive but cloning is cheap.
for_each_init vs for_each_with
use rayon::prelude::*;
fn init_vs_with() {
let data: Vec<i32> = (0..100).collect();
// for_each_with: Clone initial value
// - Requires Clone
// - Same initial state for each thread (cloned)
// - Good when Clone is cheap
let initial_buffer = Vec::with_capacity(100);
data.par_iter()
.for_each_with(initial_buffer, |buf, &item| {
buf.push(item);
});
// for_each_init: Call initializer for each thread
// - Requires FnOnce -> T
// - Can create different state per thread
// - Good when initialization is complex
data.par_iter()
.for_each_init(
|| Vec::with_capacity(100), // Called once per thread
|buf, &item| {
buf.push(item);
}
);
}Use for_each_init when you need custom initialization per thread.
Mutable Thread-Local State
use rayon::prelude::*;
fn mutable_state() {
let data: Vec<String> = vec
!["hello", "world", "rust", "rayon"]
;
// Thread-local buffer for string processing
let buffer = String::new();
data.par_iter()
.for_each_with(buffer, |local_buffer, item| {
// Clear buffer for this item
local_buffer.clear();
// Use buffer for processing
local_buffer.push_str(item);
local_buffer.make_ascii_uppercase();
// Process with thread-local buffer
println!("Processed: {}", local_buffer);
// Buffer is reused for next item on this thread
});
}Thread-local state is reused across items processed by the same thread.
Expensive Initialization Pattern
use rayon::prelude::*;
use std::collections::HashMap;
fn expensive_init() {
let data: Vec<&str> = vec
!["key1", "key2", "key3", "key4"]
;
// Expensive state that should be created once per thread
let lookup_table: HashMap<&str, &str> = [
("key1", "value1"),
("key2", "value2"),
("key3", "value3"),
("key4", "value4"),
].iter().cloned().collect();
data.par_iter()
.for_each_with(lookup_table, |table, &key| {
// Table is cloned once per thread, not per item
if let Some(value) = table.get(key) {
println!("{} -> {}", key, value);
}
});
// Alternative with for_each_init - no Clone needed
data.par_iter()
.for_each_init(
|| {
// Create table fresh for each thread
let mut table = HashMap::new();
table.insert("key1", "value1");
table.insert("key2", "value2");
table.insert("key3", "value3");
table.insert("key4", "value4");
table
},
|table, &key| {
if let Some(value) = table.get(key) {
println!("{} -> {}", key, value);
}
}
);
}Clone expensive state once per thread rather than recreating for each item.
Thread-Local Resources
use rayon::prelude::*;
use std::cell::RefCell;
// Thread-local resources that aren't Send/Sync
thread_local! {
static LOCAL_BUFFER: RefCell<Vec<u8>> = RefCell::new(Vec::new());
}
fn thread_local_resources() {
let data: Vec<i32> = (0..100).collect();
// Use thread-local for non-Send types
data.par_iter()
.for_each(|&item| {
LOCAL_BUFFER.with(|buffer| {
let mut buf = buffer.borrow_mut();
buf.push(item as u8);
if buf.len() >= 10 {
// Process buffer...
buf.clear();
}
});
});
}
// Alternative with for_each_init for truly thread-local mutable state
fn with_init() {
let data: Vec<i32> = (0..100).collect();
data.par_iter()
.for_each_init(
|| Vec::new(), // Each thread gets fresh Vec
|local_vec, &item| {
local_vec.push(item);
if local_vec.len() >= 10 {
// Process accumulated items
local_vec.clear();
}
}
);
}Use for_each_init to create thread-local mutable state without Clone.
Collecting Results from Threads
use rayon::prelude::*;
fn collecting_results() {
let data: Vec<i32> = (0..100).collect();
// WRONG: for_each_with doesn't return results
// Changes to thread-local state are lost
let initial = Vec::new();
data.par_iter()
.for_each_with(initial, |local, &item| {
local.push(item);
});
// local is dropped when thread finishes - results lost
// RIGHT: Use map_reduce pattern for collecting
let sum: i32 = data.par_iter()
.map(|&x| x * 2)
.sum();
// RIGHT: Use fold for per-thread accumulation
let result: Vec<i32> = data.par_iter()
.fold(
|| Vec::new(), // Per-thread initializer
|mut acc, &item| { // Per-thread accumulation
acc.push(item * 2);
acc
}
)
.flatten() // Combine thread results
.collect();
}for_each_with is for side effects; use other patterns to collect results.
Thread-Safe vs Thread-Local
use rayon::prelude::*;
use std::sync::Mutex;
fn thread_safe_vs_local() {
let data: Vec<i32> = (0..1000).collect();
// Thread-Safe: Shared across all threads
// Requires synchronization
let shared_counter = Mutex::new(0);
data.par_iter()
.for_each(|&item| {
let mut guard = shared_counter.lock().unwrap();
*guard += item;
});
// Thread-Local: Each thread has its own
// No synchronization needed
let local_counter = 0usize;
let total = data.par_iter()
.fold(
|| 0, // Per-thread counter
|acc, &item| acc + item // Thread-local accumulation
)
.sum::<usize>(); // Combine results
// for_each_with: Thread-local with Clone
let initial = 0usize;
let mut per_thread_sums = Vec::new();
data.par_iter()
.for_each_with(initial, |local_sum, &item| {
*local_sum += item;
// But we can't get this back!
});
}Use thread-local state when you don't need to share across threads.
Practical Example: Batched Processing
use rayon::prelude::*;
fn batched_processing() {
let items: Vec<String> = (0..1000).map(|i| format!("item_{}", i)).collect();
// Batch items within each thread for batched writes
let batch_size = 10;
items.par_iter()
.for_each_with(Vec::new(), |batch, item| {
batch.push(item.clone());
if batch.len() >= batch_size {
// Process batch
println!("Processing batch of {} items", batch.len());
// In real code: write to database, file, etc.
batch.clear();
}
});
// Handle remaining items (they're lost in this simple example)
// In practice, use fold/reduce to collect and process remainders
}Batch processing benefits from thread-local buffers.
Error Handling with for_each_with
use rayon::prelude::*;
fn error_handling() {
let data: Vec<i32> = vec
![1, 2, 3, 4, 5]
;
// for_each_with doesn't propagate errors
// Use try_for_each_with for fallible operations
let result: Result<(), _> = data.par_iter()
.try_for_each_with(String::new(), |buffer, &item| {
if item > 3 {
return Err("Item too large");
}
buffer.push_str(&format!("{}, ", item));
Ok(())
});
match result {
Ok(()) => println!("All items processed"),
Err(e) => println!("Error: {}", e),
}
}Use try_for_each_with for operations that can fail.
Performance Considerations
use rayon::prelude::*;
use std::time::Instant;
fn performance_comparison() {
let data: Vec<i32> = (0..1_000_000).collect();
// 1. Mutex-based (slowest - lock contention)
let start = Instant::now();
let shared = std::sync::Mutex::new(Vec::new());
data.par_iter()
.for_each(|&item| {
shared.lock().unwrap().push(item);
});
println!("Mutex: {:?}", start.elapsed());
// 2. for_each_with (fast - no contention)
let start = Instant::now();
let initial = Vec::new();
data.par_iter()
.for_each_with(initial, |local, &item| {
local.push(item);
});
println!("for_each_with: {:?}", start.elapsed());
// 3. for_each_init (similar to for_each_with)
let start = Instant::now();
data.par_iter()
.for_each_init(
|| Vec::new(),
|local, &item| {
local.push(item);
}
);
println!("for_each_init: {:?}", start.elapsed());
// 4. Sequential (baseline)
let start = Instant::now();
let mut vec = Vec::new();
data.iter().for_each(|&item| vec.push(item));
println!("Sequential: {:?}", start.elapsed());
}Thread-local state avoids lock contention and is much faster than Mutex.
Signature and Type Constraints
use rayon::prelude::*;
// Understanding the signature:
// fn for_each_with<C, F>(self, init: C, f: F)
// where
// C: Clone + Send, // init must be Clone (for each thread) and Send
// F: Fn(&mut C, T) + Sync + Send, // closure receives &mut C and item
// T: Send, // items must be Send
struct MyState {
buffer: Vec<String>,
}
// Must implement Clone for for_each_with
impl Clone for MyState {
fn clone(&self) -> Self {
MyState {
buffer: Vec::new(), // Fresh buffer for each thread
}
}
}
fn signature_example() {
let data = vec
!["a", "b", "c"]
;
let initial = MyState { buffer: Vec::new() };
data.par_iter()
.for_each_with(initial, |state, &item| {
// state: &mut MyState
// item: &str
state.buffer.push(item.to_string());
});
}The initial state must be Clone + Send for for_each_with.
Synthesis
Quick reference:
| Method | State Creation | Use Case |
|---|---|---|
for_each |
None | Stateless operations |
for_each_with |
Clone per thread | Cheap-to-clone state |
for_each_init |
FnOnce per thread | Expensive initialization |
Mutex<T> |
Single shared | Must synchronize state |
Thread model:
use rayon::prelude::*;
fn thread_model() {
// Rayon thread pool: 4 worker threads
// Data: 100 items
let initial = Vec::new();
(0..100).into_par_iter()
.for_each_with(initial, |local, item| {
// Thread 0: processes items 0-24 with initial.clone()
// Thread 1: processes items 25-49 with initial.clone()
// Thread 2: processes items 50-74 with initial.clone()
// Thread 3: processes items 75-99 with initial.clone()
// local is reused across items on same thread
// No synchronization between threads
});
}Common patterns:
use rayon::prelude::*;
fn patterns() {
// Pattern 1: Reusable buffer per thread
let buffer = String::new();
items.par_iter()
.for_each_with(buffer, |buf, item| {
buf.clear();
// Use buf for processing
});
// Pattern 2: Thread-local lookup table
let lookup = build_expensive_lookup();
items.par_iter()
.for_each_with(lookup, |table, item| {
if let Some(value) = table.get(item) {
// Process with lookup
}
});
// Pattern 3: Batch accumulation
let batch = Vec::with_capacity(100);
items.par_iter()
.for_each_with(batch, |local_batch, item| {
local_batch.push(item);
if local_batch.len() >= 100 {
// Process batch
local_batch.clear();
}
});
}Key insight: for_each_with enables stateful parallel processing by cloning an initial value for each worker thread in Rayon's thread pool, then allowing each thread to mutate its own copy through a mutable reference in the closure. This pattern eliminates synchronization overhead because each thread has exclusive access to its stateâthe clone happens once per worker thread (not per item), and the same mutable state is reused for all items processed by that thread. The key requirement is that the initial state implements Clone + Send: Clone to create per-thread copies, and Send so it can be transferred to worker threads. Use for_each_with when you need thread-local state that's cheap to clone; use for_each_init when initialization is expensive or you need fresh state per thread without Clone; use Mutex<T> when threads truly need to share mutable state (with performance penalty from lock contention). Remember that for_each_with is for side effectsâchanges to thread-local state are lost when threads finish. To collect results, use fold + reduce patterns or map + collect instead.
