Loading page…
Rust walkthroughs
Loading page…
rayon::iter::ParallelBridge convert sequential iterators to parallel and what are the performance implications?rayon::iter::ParallelBridge is a trait that converts a sequential Iterator into a ParallelIterator by pulling items from the iterator on multiple threads. The bridge creates work from the sequential iterator dynamically: threads pull items one at a time from a shared source, converting a non-parallel source into parallel execution. This is useful when you have an existing sequential iterator that cannot implement IntoParallelIterator directly, or when the iterator's source doesn't support parallel iteration natively. The key trade-off is that pulling from a shared iterator introduces synchronization overhead compared to natively parallel data structures, making ParallelBridge best suited for cases where the per-item work is substantial enough to outweigh the synchronization cost.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
fn main() {
// Sequential iterator converted to parallel
let results: Vec<i32> = (0..10)
.iter()
.par_bridge()
.map(|x| x * x)
.collect();
println!("Results: {:?}", results);
// Works with any iterator
let sum: i32 = [1, 2, 3, 4, 5]
.iter()
.par_bridge()
.sum();
println!("Sum: {}", sum);
}.par_bridge() converts any Iterator into a ParallelIterator.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
fn main() {
let counter = AtomicUsize::new(0);
// par_bridge pulls from a shared iterator
// Each thread pulls items one at a time
(0..100)
.into_iter()
.inspect(|_| {
// This shows sequential access to iterator
counter.fetch_add(1, Ordering::SeqCst);
})
.par_bridge()
.map(|x| {
// This runs in parallel
x * x
})
.for_each(|_| {});
println!("Iterator calls: {}", counter.load(Ordering::SeqCst));
// All 100 items pulled through iterator
}The iterator is shared across threads; each thread pulls items sequentially.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
use std::time::Instant;
fn main() {
let data: Vec<i32> = (0..1_000_000).collect();
// Native parallel iteration (no synchronization)
let start = Instant::now();
let sum1: i32 = data.par_iter().map(|&x| x).sum();
let native_time = start.elapsed();
// ParBridge with synchronization overhead
let start = Instant::now();
let sum2: i32 = data.iter().par_bridge().map(|&x| x).sum();
let bridge_time = start.elapsed();
println!("Native parallel: {:?}", native_time);
println!("par_bridge: {:?}", bridge_time);
println!("Both sums equal: {}", sum1 == sum2);
// par_bridge is typically slower due to synchronization
// The difference depends on work per item
}Native parallel iterators avoid synchronization; par_bridge requires a shared lock.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
use std::io::{BufRead, BufReader};
fn main() {
// USE CASE 1: Reading lines from stdin (can't be parallel natively)
let stdin = std::io::stdin();
let reader = BufReader::new(stdin.lock());
// Can't do .par_iter() on stdin lines
// Must use par_bridge
let line_count = reader
.lines()
.filter_map(|l| l.ok())
.par_bridge()
.count();
println!("Lines: {}", line_count);
}
fn read_network_stream() {
// USE CASE 2: Network streams
// Streams are inherently sequential, can't split for parallel
// USE CASE 3: External iterators you don't control
// If a library returns Iterator, not IntoParallelIterator
}
fn expensive_processing_example() {
// USE CASE 4: When per-item work is expensive
// Overhead becomes negligible compared to computation
let results: Vec<_> = (0..100)
.into_iter()
.par_bridge()
.map(|x| {
// Expensive computation makes overhead worthwhile
(0..100_000).fold(x, |acc, i| acc.wrapping_add(i))
})
.collect();
}par_bridge is essential when the data source is inherently sequential.
use rayon::prelude::*;
use rayon::iter::ParallelBridge;
use std::time::Instant;
fn main() {
let data: Vec<i32> = (0..100_000).collect();
// Native parallel: can split data into chunks
// Each thread processes a chunk independently
let start = Instant::now();
let native_sum: i32 = data.par_iter()
.map(|&x| expensive_computation(x))
.sum();
let native_time = start.elapsed();
// ParBridge: threads pull from shared iterator
// Synchronization on each pull
let start = Instant::now();
let bridge_sum: i32 = data.iter()
.par_bridge()
.map(|&x| expensive_computation(x))
.sum();
let bridge_time = start.elapsed();
// With expensive computation, overhead is minimal
// With cheap computation, overhead dominates
}
fn expensive_computation(x: i32) -> i32 {
// Simulate expensive work
(0..100).fold(x, |acc, _| acc.wrapping_mul(7).wrapping_add(1))
}
fn cheap_computation(x: i32) -> i32 {
x * 2
}Native parallel is faster when available; par_bridge fills the gap for sequential sources.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
fn main() {
// par_bridge uses work stealing
// Threads that finish early can steal work from others
let results: Vec<_> = (0..100)
.into_iter()
.par_bridge()
.map(|i| {
// Variable work per item
let work = if i % 10 == 0 { 1000 } else { 10 };
(0..work).fold(i, |acc, _| acc + 1)
})
.collect();
// Work stealing helps balance load
// Threads with lighter work help with heavier items
}Work stealing from the shared iterator provides natural load balancing.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
fn main() {
// Fast, predictable iterator: good for par_bridge
let fast: Vec<i32> = (0..1000)
.into_iter()
.par_bridge()
.map(|x| x * 2)
.collect();
// Slow iterator: limits parallelism
// If iterator is slow, threads wait for items
fn slow_iterator() -> impl Iterator<Item = i32> {
(0..100).map(|x| {
std::thread::sleep(std::time::Duration::from_millis(1));
x
})
}
// If pulling from iterator is slow, parallelism is bottlenecked
// Threads spend time waiting for next item
// Cloneable iterators can be split for better parallelism
// Non-cloneable iterators must use par_bridge
}The speed of the sequential iterator affects parallelism potential.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
struct StreamingSource {
items: Vec<i32>,
}
impl Iterator for StreamingSource {
type Item = i32;
fn next(&mut self) -> Option<Self::Item> {
// Simulates a streaming source
if self.items.is_empty() {
None
} else {
let item = self.items.remove(0);
Some(item)
}
}
}
fn main() {
let source = StreamingSource { items: (0..100).collect() };
// Can't implement IntoParallelIterator for StreamingSource
// (it doesn't own all data splitably)
// But par_bridge works
let results: Vec<i32> = source
.par_bridge()
.map(|x| x * 2)
.collect();
println!("Processed {} items", results.len());
}Any Iterator can use par_bridge without additional implementation.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
fn main() {
// Bounded iterator: par_bridge knows size hint
let bounded: Vec<i32> = (0..100)
.into_iter()
.par_bridge()
.collect();
// Unbounded iterator: par_bridge pulls until None
let mut counter = 0;
let unbounded = std::iter::from_fn(move || {
counter += 1;
if counter <= 50 { Some(counter) } else { None }
});
let results: Vec<i32> = unbounded
.par_bridge()
.map(|x| x * 2)
.collect();
println!("Unbounded results: {:?}", results.len());
}par_bridge handles both bounded and unbounded iterators.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
fn main() {
// par_bridge doesn't buffer items
// Each thread pulls one at a time
// This is different from collecting to Vec first:
let data: Vec<i32> = (0..100_000).collect();
// Option 1: Native parallel (fastest for owned data)
let sum1: i32 = data.par_iter().sum();
// Option 2: Collect then parallel (extra memory)
let sum2: i32 = data.iter().collect::<Vec<_>>().par_iter().sum();
// Option 3: par_bridge (no extra memory, but sync overhead)
let sum3: i32 = data.iter().par_bridge().sum();
// For owned data, prefer native parallel
// For streaming/unowned data, use par_bridge
}par_bridge doesn't require pre-collecting data, avoiding memory overhead.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
fn main() {
// Errors propagate through par_bridge
let results: Vec<Result<i32, String>> = (0..10)
.into_iter()
.par_bridge()
.map(|x| {
if x == 5 {
Err(format!("Error at {}", x))
} else {
Ok(x * 2)
}
})
.collect();
for result in results {
match result {
Ok(v) => println!("Success: {}", v),
Err(e) => println!("Error: {}", e),
}
}
// try_fold and try_reduce work with errors
let result: Result<i32, String> = (0..10)
.into_iter()
.par_bridge()
.try_fold(|| Ok(0), |acc, x| {
if x == 5 {
Err("Found 5".to_string())
} else {
Ok(acc? + x)
}
})
.try_reduce(|| Ok(0), |a, b| Ok(a + b));
}Error handling works through the parallel iterator chain.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
use std::fs::File;
use std::io::{BufRead, BufReader};
fn main() {
// Process lines from a file in parallel
let file = File::open("large_file.txt").unwrap();
let reader = BufReader::new(file);
// Lines() returns an iterator, not parallel iterator
let word_count: usize = reader
.lines()
.filter_map(|l| l.ok())
.par_bridge()
.map(|line| line.split_whitespace().count())
.sum();
println!("Total words: {}", word_count);
}
fn process_stdin() {
// Process stdin in parallel (sequential source)
let stdin = std::io::stdin();
let reader = BufReader::new(stdin.lock());
reader
.lines()
.filter_map(|l| l.ok())
.par_bridge()
.for_each(|line| {
// Process each line in parallel
let processed = line.to_uppercase();
println!("{}", processed);
});
}File and stream processing are prime use cases for par_bridge.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
use std::time::Instant;
fn main() {
let size = 1_000_000;
let data: Vec<i32> = (0..size).collect();
// Benchmark 1: Very cheap operation (overhead dominates)
let start = Instant::now();
let _: i32 = data.iter().par_bridge().map(|&x| x + 1).sum();
let cheap_bridge = start.elapsed();
let start = Instant::now();
let _: i32 = data.par_iter().map(|&x| x + 1).sum();
let cheap_native = start.elapsed();
// Benchmark 2: Expensive operation (overhead negligible)
let start = Instant::now();
let _: i32 = data.iter().par_bridge()
.map(|&x| expensive_work(x))
.sum();
let expensive_bridge = start.elapsed();
let start = Instant::now();
let _: i32 = data.par_iter()
.map(|&x| expensive_work(x))
.sum();
let expensive_native = start.elapsed();
println!("Cheap operation:");
println!(" par_bridge: {:?}", cheap_bridge);
println!(" native: {:?}", cheap_native);
println!(" overhead ratio: {:.2}x",
cheap_bridge.as_nanos() as f64 / cheap_native.as_nanos() as f64);
println!("\nExpensive operation:");
println!(" par_bridge: {:?}", expensive_bridge);
println!(" native: {:?}", expensive_native);
println!(" overhead ratio: {:.2}x",
expensive_bridge.as_nanos() as f64 / expensive_native.as_nanos() as f64);
}
fn expensive_work(x: i32) -> i32 {
(0..1000).fold(x, |acc, _| acc.wrapping_mul(31).wrapping_add(17))
}Overhead ratio decreases as per-item work increases.
use rayon::iter::ParallelBridge;
use rayon::prelude::*;
fn main() {
// 1. Iterators from external libraries
// If a library returns impl Iterator, you can't make it parallel
// 2. Streaming data
// stdin, network, file streams are inherently sequential
// 3. Generators that can't be split
// Some computations generate items one at a time
// 4. Callback-based APIs converted to iterators
// Some systems emit items through callbacks
}
fn generator_example() {
// A generator that can't be parallelized natively
let mut state = 0u64;
let generator = std::iter::from_fn(move || {
// Complex stateful generation
state = state.wrapping_mul(1103515245).wrapping_add(12345);
Some(state)
});
// Must use par_bridge for parallel processing
let results: Vec<u64> = generator
.take(1000)
.par_bridge()
.map(|x| x % 100)
.collect();
}Stateful or external iterators require par_bridge for parallel processing.
Core mechanism: par_bridge() wraps a sequential Iterator and allows multiple threads to pull items concurrently, converting sequential iteration into parallel execution.
How it works:
Performance implications:
When to use:
When NOT to use:
par_iter() is available (collections, ranges)Key insight: par_bridge is a bridge of necessity, not a bridge of convenience. It enables parallelism for sources that can't natively support it, but introduces synchronization overhead. The performance impact depends entirely on the ratio of synchronization cost to per-item computation. For cheap operations, the overhead can dominate; for expensive operations, it's essentially free. Always prefer native parallel iterators when available, but use par_bridge when it's the only way to parallelize sequential data.