How does rayon::iter::ParallelIterator::for_each_init support thread-local state during parallel iteration?
for_each_init creates per-thread state by initializing a separate copy of state for each worker thread that participates in the parallel iteration, allowing each thread to accumulate results or maintain context without synchronization overhead. The method takes an init function that creates thread-local state and a clone function to create additional copies when threads steal work, ensuring each thread has its own isolated state instance.
The Problem: Shared State in Parallel Iteration
use rayon::prelude::*;
fn shared_state_problem() {
// Naive approach: shared mutable state
let mut counter = std::sync::atomic::AtomicUsize::new(0);
(0..1_000_000).into_par_iter().for_each(|_| {
counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
});
// Works, but every increment requires:
// 1. Atomic operation
// 2. Cache coherence traffic
// 3. Potential contention between threads
// This is slow for high-frequency updates
}Sharing state across threads requires synchronization (atomics, locks), causing contention and cache coherence overhead.
The Solution: Thread-Local State
use rayon::prelude::*;
fn thread_local_state() {
// Each thread gets its own counter
let total = std::sync::atomic::AtomicUsize::new(0);
(0..1_000_000).into_par_iter().for_each_init(
// init: Create initial state for each thread
|| 0usize,
// op: Process element with thread-local state
|counter, _| {
*counter += 1;
},
);
// But wait - how do we get the result?
// for_each_init doesn't return the state
}Each thread accumulates into its own state, avoiding synchronization for each update.
for_each_init Signature
use rayon::prelude::*;
fn signature_explanation() {
// for_each_init has this signature:
// fn for_each_init<OP, INIT, T>(
// self,
// init: INIT, // Creates thread-local state
// op: OP, // Processes each element
// ) where
// INIT: Fn() -> T + Sync, // Returns initial state
// OP: Fn(&mut T, Self::Item) + Sync, // Uses state and element
// T: Send, // State can be sent between threads
// Key points:
// - init is called for each thread that needs state
// - op receives mutable reference to state and the element
// - State is NOT returned - for_each_init returns ()
}for_each_init is for side-effect operations where you don't need the accumulated state back.
Initialization Per Thread
use rayon::prelude::*;
use std::cell::Cell;
fn per_thread_init() {
let thread_count = std::sync::atomic::AtomicUsize::new(0);
(0..100).into_par_iter().for_each_init(
|| {
// This is called ONCE per worker thread
// (or more if work is stolen)
thread_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Vec::new() // Each thread gets its own Vec
},
|vec, item| {
// Each thread accumulates into its own Vec
vec.push(item);
},
);
// Number of Vecs created == number of threads that did work
// (Plus potential additional ones from work stealing)
}The init function is called once per thread, creating isolated state instances.
Work Stealing and State Cloning
use rayon::prelude::*;
fn work_stealing() {
// Rayon uses work stealing: threads can steal work from each other
// When work is split between threads:
// 1. Original thread keeps its state
// 2. New thread calls init() to get its own state
// If state needs to be "inherited" (rare), you'd use fold instead
(0..1000).into_par_iter().for_each_init(
|| {
println!("Creating new state");
Vec::new()
},
|vec, item| {
vec.push(item);
},
);
// You might see "Creating new state" multiple times
// Each worker thread creates its own state
// Work stealing causes init to be called again
}When work is stolen, the stealing thread calls init() to create its own state copy.
Collecting Thread-Local Results
use rayon::prelude::*;
fn collecting_results() {
// for_each_init doesn't return state
// Use fold() for collecting per-thread results
let result: Vec<i32> = (0..100)
.into_par_iter()
.fold(
|| Vec::new(), // Per-thread state
|mut vec, item| { // Accumulate
vec.push(item);
vec
},
|mut vec1, vec2| { // Combine thread results
vec1.extend(vec2);
vec1
},
)
.collect();
// fold() combines thread results
// for_each_init() is for side effects only
}For collecting results, use fold() instead of for_each_init().
Use Case: Thread-Local Buffer
use rayon::prelude::*;
use std::sync::Mutex;
fn thread_local_buffer() {
let output = Mutex::new(Vec::new());
// Each thread batches into its own Vec
// Then flushes to shared output periodically
(0..10000).into_par_iter().for_each_init(
|| Vec::with_capacity(100),
|local_buffer, item| {
local_buffer.push(item);
// Flush when buffer is full
if local_buffer.len() >= 100 {
let mut output = output.lock().unwrap();
output.append(local_buffer);
local_buffer.clear();
}
},
);
// Benefit: Fewer mutex acquisitions
// Each thread buffers 100 items before locking
// Instead of locking per item
}Thread-local buffers reduce synchronization by batching operations.
Use Case: Thread-Local Random State
use rayon::prelude::*;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
fn thread_local_rng() {
// Random number generators should NOT be shared
// Each thread needs its own RNG
let mut results = Vec::new();
(0..100).into_par_iter().for_each_init(
|| StdRng::from_entropy(), // Each thread gets its own RNG
|rng, _| {
let value: i32 = rng.gen();
// Use value...
},
);
// Each thread has independent RNG state
// No contention for random number generation
}RNGs should be per-thread to avoid contention and maintain statistical properties.
Use Case: Thread-Local Counters
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
fn thread_local_counter() {
let total_count = AtomicUsize::new(0);
(0..1_000_000).into_par_iter().for_each_init(
|| 0usize, // Local counter per thread
|local_count, _| {
*local_count += 1; // Fast: no atomic, no contention
// Periodically flush to global
if *local_count >= 10000 {
total_count.fetch_add(*local_count, Ordering::Relaxed);
*local_count = 0;
}
},
);
// Instead of 1M atomic increments
// We do ~100 atomic increments (flush every 10K)
// Much faster due to reduced contention
}Thread-local counters with periodic flushing reduce atomic operation overhead dramatically.
Comparing Approaches
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
fn comparison() {
let data: Vec<i32> = (0..1_000_000).collect();
// Approach 1: Shared atomic counter (slow)
let counter1 = AtomicUsize::new(0);
data.par_iter().for_each(|_| {
counter1.fetch_add(1, Ordering::Relaxed);
});
// Cost: 1M atomic operations
// Approach 2: Thread-local counter (fast)
let counter2 = AtomicUsize::new(0);
data.par_iter().for_each_init(
|| 0usize,
|local, _| {
*local += 1;
},
);
// Issue: Local counter is discarded!
// Approach 3: fold() to collect (correct)
let total: usize = data.par_iter()
.fold(
|| 0usize,
|local, _| local + 1,
|a, b| a + b,
)
.sum();
// Cost: ~thread_count additions + final combine
}Use fold() when you need the accumulated result; for_each_init is for side effects.
Thread-Local HashMap
use rayon::prelude::*;
use std::collections::HashMap;
use std::sync::Mutex;
fn thread_local_hashmap() {
// Counting occurrences with thread-local HashMaps
let final_counts = Mutex::new(HashMap::new());
(0..100_000).into_par_iter().for_each_init(
|| HashMap::new(), // Each thread has its own HashMap
|local_counts, item| {
*local_counts.entry(item % 100).or_insert(0) += 1;
// Periodically flush to global
if local_counts.len() >= 50 {
let mut final_counts = final_counts.lock().unwrap();
for (k, v) in local_counts.drain() {
*final_counts.entry(k).or_insert(0) += v;
}
}
},
);
// Flush remaining
// ... (would need additional handling)
// Benefit: Most operations are lock-free
// Each thread works on its own HashMap
// Only periodic merges need synchronization
}Thread-local HashMaps avoid lock contention for frequent insertions.
Thread-Local Resource Pools
use rayon::prelude::*;
fn thread_local_pool() {
// Expensive resources (database connections, file handles)
// Should be per-thread to avoid contention
(items).into_par_iter().for_each_init(
|| {
// Create one expensive resource per thread
// e.g., database connection
create_expensive_resource()
},
|resource, item| {
// Use resource without synchronization
resource.process(item);
},
);
// Each thread gets its own resource
// No contention for resource access
}
fn create_expensive_resource() -> Resource {
// Simulated expensive resource creation
Resource
}
struct Resource;
impl Resource {
fn process(&mut self, _item: i32) {}
}Per-thread resources (connections, handles) avoid synchronization and can be reused within a thread.
Nested Parallel Iteration
use rayon::prelude::*;
fn nested_parallel() {
let matrix: Vec<Vec<i32>> = (0..100).map(|_| (0..100).collect()).collect();
// Nested parallel iteration with thread-local state
matrix.par_iter().for_each_init(
|| Vec::new(), // Thread-local accumulator for inner sums
|local_acc, row| {
let row_sum: i32 = row.iter().sum();
local_acc.push(row_sum);
},
);
// Each outer thread has its own accumulator
// But local_acc is discarded at end!
// For nested collection, use fold:
let row_sums: Vec<i32> = matrix.par_iter()
.fold(
|| Vec::new(),
|mut acc, row| {
acc.push(row.iter().sum());
acc
},
|mut a, b| {
a.extend(b);
a
},
)
.flatten()
.collect();
}Nested parallelism works naturally with thread-local state, but use fold() to collect results.
Interaction with Rayon Thread Pool
use rayon::prelude::*;
use rayon::ThreadPool;
fn thread_pool_interaction() {
// Rayon has a global thread pool
// Default: number of CPU cores
// for_each_init creates one state per worker thread in pool
// If pool has 8 threads, up to 8 init() calls
let pool = ThreadPool::new(rayon::ThreadPoolBuilder::new().num_threads(4)).unwrap();
pool.install(|| {
(0..100).into_par_iter().for_each_init(
|| {
println!("Thread created state");
0usize
},
|counter, _| {
*counter += 1;
},
);
});
// You'll see "Thread created state" ~4 times
// (Plus additional for work stealing splits)
}State is created per worker thread in the active thread pool.
Thread-Local Timing
use rayon::prelude::*;
use std::time::Instant;
fn thread_local_timing() {
let total_time = std::sync::Mutex::new(std::time::Duration::ZERO);
(0..1000).into_par_iter().for_each_init(
|| Instant::now(), // Thread-local start time
|start, _| {
// Do work
std::thread::sleep(std::time::Duration::from_millis(1));
// Record time for this batch
let elapsed = start.elapsed();
*total_time.lock().unwrap() += elapsed;
*start = Instant::now(); // Reset for next batch
},
);
// Each thread tracks its own timing locally
// No contention for Instant creation
}Per-thread timing avoids contention on shared timing state.
Debugging Thread-Local State
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
fn debug_thread_ids() {
let thread_count = AtomicUsize::new(0);
(0..100).into_par_iter().for_each_init(
|| {
let id = thread_count.fetch_add(1, Ordering::Relaxed);
println!("Init called for thread local {}", id);
id
},
|id, item| {
if item < 5 {
println!("Thread local {} processing item {}", id, item);
}
},
);
// You'll see:
// "Init called for thread local 0"
// "Init called for thread local 1"
// etc.
// Showing how many thread-local states were created
}Debug output reveals how many init calls occur and which thread processes which items.
Memory and Performance
use rayon::prelude::*;
fn memory_performance() {
// Thread-local state trades memory for reduced contention
// Each thread has its own state copy
// Memory: sizeof(State) * num_threads
// But:
// - No lock contention
// - No cache coherence traffic per operation
// - Better cache locality (state is thread-local)
// Trade-off:
// Small state (counters, small buffers): Great trade-off
// Large state (big HashMaps): Consider carefully
(0..1_000_000).into_par_iter().for_each_init(
|| Vec::with_capacity(1000), // ~8KB per thread
|vec, item| {
vec.push(item);
},
);
// For 8 threads: ~64KB total
// For atomic approach: ~0KB, but heavy contention
}Thread-local state trades memory for performanceāevaluate the trade-off for your use case.
Comparison with Other Patterns
use rayon::prelude::*;
fn pattern_comparison() {
// Pattern 1: for_each (no state)
(0..100).into_par_iter().for_each(|x| {
// Stateless operation
println!("{}", x);
});
// Pattern 2: for_each_init (thread-local state, side effects)
(0..100).into_par_iter().for_each_init(
|| Vec::new(),
|buf, x| {
buf.push(x); // Thread-local accumulation
},
);
// Pattern 3: fold (thread-local state, returns result)
let sum: i32 = (0..100).into_par_iter()
.fold(
|| 0,
|acc, x| acc + x,
|a, b| a + b,
);
// Pattern 4: for_each with Mutex (shared state)
let shared = std::sync::Mutex::new(Vec::new());
(0..100).into_par_iter().for_each(|x| {
shared.lock().unwrap().push(x); // Contention!
});
// Choose based on:
// - Need result? Use fold
// - Side effects with state? Use for_each_init
// - Shared state unavoidable? Use for_each + Mutex
// - No state? Use for_each
}| Pattern | State Type | Returns Result | Contention |
|---|---|---|---|
| for_each | None | No | N/A |
| for_each_init | Thread-local | No | Low |
| fold | Thread-local | Yes | Low |
| for_each + Mutex | Shared | No | High |
Summary Table
fn summary_table() {
// | Method | State | Returns | Use Case |
// |--------|-------|---------|----------|
// | for_each | None | () | Stateless ops |
// | for_each_init | Thread-local | () | Stateful side effects |
// | fold | Thread-local | T | Stateful with result |
// | for_each + Mutex | Shared | () | Unavoidable sharing |
// | Scenario | Recommended Pattern |
// |----------|---------------------|
// | Count with global sum | fold() |
// | Batch to shared output | for_each_init + periodic flush |
// | Per-thread RNG | for_each_init |
// | Per-thread buffer | for_each_init |
// | Resource per thread | for_each_init |
}Synthesis
Quick reference:
use rayon::prelude::*;
fn quick_reference() {
// for_each_init: Thread-local state for side effects
(0..1000).into_par_iter().for_each_init(
|| Vec::new(), // Create state per thread
|local_state, item| { // Process with state
local_state.push(item);
},
);
// fold: Thread-local state with result collection
let total: usize = (0..1000).into_par_iter()
.fold(
|| 0usize,
|acc, item| acc + 1,
|a, b| a + b,
);
// Choose for_each_init when:
// - Need thread-local state
// - Don't need accumulated result
// - Performing side effects (writing to output)
}Key insight: for_each_init solves the problem of efficient thread-local state in parallel iteration by calling the init function once per worker thread (or when work is stolen), creating isolated state instances that avoid synchronization overhead. Each thread accumulates into its own state without contending with other threads for locks or atomic operations. The pattern is ideal for batching operations (buffering before writing to shared output), maintaining per-thread resources (RNGs, connections), and aggregating with periodic flushes to shared state. For cases where you need the accumulated results back, use fold() instead, which combines thread-local results into a final value. The trade-off is memory: each thread has its own state copy, but this is usually worth it for the elimination of contention in tight loops where atomic operations or lock acquisitions would dominate the runtime.
