Loading pageā¦
Rust walkthroughs
Loading pageā¦
rayon, how does par_bridge convert a sequential iterator to parallel, and what are the costs?rayon::iter::par_bridge converts a sequential Iterator into a ParallelIterator by pulling items from the iterator and distributing them across rayon's thread pool. The method wraps a sequential iterator in a ParallelBridge type that internally uses a shared queue to distribute work to worker threads. The costs include synchronization overhead from the shared queue, load imbalance because work is discovered sequentially before being distributed, and the inability to perform certain parallel optimizations that are possible when the parallel iterator knows the full data upfront. Use par_bridge when you have an existing sequential iterator you cannot modify; prefer native parallel iterators when possible.
use rayon::prelude::*;
fn main() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// Convert sequential iterator to parallel
let sum: i32 = data.iter()
.map(|x| x * 2)
.par_bridge() // Bridge from sequential to parallel
.sum();
println!("Sum: {}", sum);
}par_bridge wraps any iterator to make it parallel-capable.
use rayon::prelude::*;
// par_bridge creates a ParallelBridge wrapper:
// 1. Wraps the sequential Iterator
// 2. Worker threads pull from a shared queue
// 3. The queue is populated from the sequential iterator
// 4. Workers process items in parallel
fn main() {
let data: Vec<i32> = (0..100).collect();
// Behind the scenes:
// - Main thread or one worker drives the iterator
// - Items are pushed to a concurrent queue
// - Workers pop items and process them
let result: i32 = data.iter()
.par_bridge()
.map(|x| x * 2)
.sum();
println!("Result: {}", result);
}The iterator is consumed by one thread, items are queued, workers dequeue.
use rayon::prelude::*;
use std::time::Duration;
fn main() {
// Items are discovered sequentially
// But processing happens in parallel
let result: Vec<i32> = (0..10)
.inspect(|x| println!("Discovering: {}", x)) // Sequential
.par_bridge()
.map(|x| {
std::thread::sleep(Duration::from_millis(100));
println!("Processing: {}", x); // Parallel
x * 2
})
.collect();
// Discovery order: 0, 1, 2, ... (sequential)
// Processing order: varies (parallel)
}Item discovery is sequential; processing is parallel.
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
fn main() {
let iterations = 1_000_000;
let counter = AtomicUsize::new(0);
// Native parallel iterator: minimal synchronization
let start = Instant::now();
(0..iterations)
.into_par_iter() // Native parallel
.for_each(|_| {
counter.fetch_add(1, Ordering::Relaxed);
});
let native_time = start.elapsed();
counter.store(0, Ordering::SeqCst);
// par_bridge: more synchronization
let start = Instant::now();
(0..iterations)
.par_bridge() // Bridged
.for_each(|_| {
counter.fetch_add(1, Ordering::Relaxed);
});
let bridge_time = start.elapsed();
println!("Native parallel: {:?}", native_time);
println!("par_bridge: {:?}", bridge_time);
// par_bridge typically slower due to synchronization
}par_bridge has synchronization overhead that native parallel iterators avoid.
use rayon::prelude::*;
use std::time::Instant;
fn main() {
let iterations = 100_000;
// Very fast operations
// Queue contention dominates
let start = Instant::now();
let sum: i32 = (0..iterations)
.into_par_iter() // Native
.sum();
println!("Native sum: {:?} = {}", start.elapsed(), sum);
let start = Instant::now();
let sum: i32 = (0..iterations)
.par_bridge() // Bridge
.sum();
println!("Bridge sum: {:?} = {}", start.elapsed(), sum);
// For cheap operations, par_bridge overhead is significant
}Cheap operations make queue contention visible.
use rayon::prelude::*;
use std::fs::File;
use std::io::{BufRead, BufReader};
fn main() -> std::io::Result<()> {
// File lines cannot be a native parallel iterator
// Because we don't know how many lines there are
// And lines must be read sequentially
let file = File::open("large_file.txt")?;
let reader = BufReader::new(file);
let line_count: usize = reader
.lines()
.par_bridge() // Necessary: Lines() is sequential
.map(|line| {
// Process each line in parallel
line.unwrap().len()
})
.count();
println!("Lines: {}", line_count);
Ok(())
}Sequential-only sources like file readers require par_bridge.
use rayon::prelude::*;
fn main() {
let data: Vec<i32> = (0..100).collect();
// Chain sequential transformations before par_bridge
let result: i32 = data
.iter()
.filter(|x| *x % 2 == 0) // Sequential filter
.map(|x| x * 2) // Sequential map
.par_bridge() // Then parallel
.map(|x| x + 1) // Parallel map
.sum();
println!("Result: {}", result);
// Operations BEFORE par_bridge run sequentially on one thread
// Operations AFTER par_bridge run in parallel
}Place par_bridge where parallel processing should begin.
use rayon::prelude::*;
use std::time::Duration;
fn main() {
// Uneven work distribution
let work_times: Vec<u64> = vec![10, 20, 1000, 10, 20, 1000, 10, 20];
// par_bridge discovers items sequentially
// If slow items come late, early workers sit idle
let start = std::time::Instant::now();
let _: Vec<_> = work_times
.iter()
.par_bridge()
.map(|&ms| {
std::thread::sleep(Duration::from_millis(ms));
ms
})
.collect();
println!("Time: {:?}", start.elapsed());
// Not optimal: workers can't steal work they haven't seen
}par_bridge cannot pre-balance work unknown at start.
use rayon::prelude::*;
use std::time::Duration;
fn main() {
let work_times: Vec<u64> = vec![10, 20, 1000, 10, 20, 1000, 10, 20];
// Native parallel iterator can split work optimally
// Knows all items upfront, can balance across workers
let start = std::time::Instant::now();
let _: Vec<_> = work_times
.par_iter() // Native parallel
.map(|&ms| {
std::thread::sleep(Duration::from_millis(ms));
ms
})
.collect();
println!("Time: {:?}", start.elapsed());
// Better: work stealing can redistribute
}Native parallel iterators know work distribution in advance.
use rayon::prelude::*;
fn main() {
// Rayon uses work stealing for load balancing
// But par_bridge limits stealing effectiveness
// Native: worker can split its chunk and share half
// Bridge: items come from queue, can't split unknown work
let data: Vec<i32> = (0..1000).collect();
// Native: work stealing works well
data.par_iter()
.for_each(|_| { /* work */ });
// Bridge: limited work stealing
data.iter()
.par_bridge()
.for_each(|_| { /* work */ });
}Work stealing is less effective with par_bridge.
use rayon::prelude::*;
fn main() {
// par_bridge can handle infinite/unknown-length iterators
// Native parallel iterators need known bounds
let result: i32 = (0..)
.par_bridge()
.take(1000) // Must limit somehow
.sum();
println!("Sum: {}", result);
// Would not work with into_par_iter() on (0..)
// Because into_par_iter requires ExactSizeIterator
}par_bridge works with infinite iterators (with take).
use rayon::prelude::*;
use std::time::Duration;
fn main() {
// The thread producing items can become a bottleneck
// If items are produced slowly
// Workers wait for the producer
let start = std::time::Instant::now();
let result: Vec<_> = (0..100)
.inspect(|_| {
// Slow production
std::thread::sleep(Duration::from_micros(100));
})
.par_bridge()
.map(|x| {
// Fast processing
x * 2
})
.collect();
println!("Time: {:?}", start.elapsed());
// Total time dominated by sequential production
// Workers spend time waiting
}Slow producers become bottlenecks in par_bridge.
use rayon::prelude::*;
use std::sync::mpsc;
use std::thread;
fn main() {
// Alternative 1: Collect first, then parallel
let data: Vec<i32> = (0..100).collect();
let result: i32 = data.par_iter().map(|x| x * 2).sum();
// Alternative 2: Use channel with par_bridge
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
for i in 0..100 {
tx.send(i).unwrap();
}
});
let result: i32 = rx.into_iter()
.par_bridge()
.map(|x| x * 2)
.sum();
println!("Results: {}, {}", result, result);
}Collecting first enables native parallel iteration.
use rayon::prelude::*;
fn main() {
// Collecting: requires memory for all items
// par_bridge: processes as items arrive
let large_data = (0..10_000_000);
// Option 1: Collect first
// Memory: ~80MB for Vec<i32>
// let collected: Vec<i32> = large_data.collect();
// collected.par_iter()...
// Option 2: par_bridge
// Memory: only queue buffer
let sum: i32 = large_data
.par_bridge()
.sum();
println!("Sum: {}", sum);
}par_bridge uses less memory than collecting first.
use rayon::prelude::*;
fn main() {
let data: Vec<i32> = (0..10).collect();
// par_bridge does NOT preserve order by default
let result: Vec<i32> = data
.iter()
.par_bridge()
.map(|x| x * 2)
.collect();
// Result order may differ from input order
println!("Result: {:?}", result);
// For ordered output, use collect_vec() which may reorder
// Or process in chunks that maintain order
}Parallel execution means output order is not guaranteed.
use rayon::prelude::*;
fn main() {
// par_bridge uses internal buffering
// Default buffer size is implementation-defined
// Workers pull from shared queue
// Queue size affects contention vs memory
let result: i32 = (0..1000)
.par_bridge()
.map(|x| x * 2)
.sum();
// Larger buffers: more memory, less contention
// Smaller buffers: less memory, more contention
}Buffer size trades memory for contention.
use rayon::prelude::*;
use std::time::Instant;
fn main() {
let data: Vec<i32> = (0..1_000_000).collect();
// Native parallel iterator
let start = Instant::now();
let sum1: i32 = data.par_iter().map(|x| x * 2).sum();
let native = start.elapsed();
// par_bridge on Vec iterator
let start = Instant::now();
let sum2: i32 = data.iter().par_bridge().map(|x| x * 2).sum();
let bridge = start.elapsed();
println!("Native: {:?}, Bridge: {:?}", native, bridge);
// Native is faster because:
// 1. Knows data size upfront
// 2. Can split work optimally
// 3. No queue contention
}Native parallel is always faster for collections.
use rayon::prelude::*;
use std::io::{BufRead, BufReader};
use std::fs::File;
fn main() -> std::io::Result<()> {
// Use par_bridge when:
// 1. Source is inherently sequential
let file = File::open("data.txt")?;
let reader = BufReader::new(file);
reader.lines()
.par_bridge()
.for_each(|line| {
// Process lines in parallel
});
// 2. Cannot collect due to memory constraints
// 3. Data arrives from stream/iterator
// Don't use par_bridge when:
// 1. You have a Vec/slice - use par_iter()
// 2. You can collect first - collect then par_iter()
Ok(())
}Use par_bridge only for truly sequential sources.
| Aspect | par_bridge | Native Parallel |
|--------|--------------|-----------------|
| Source type | Any Iterator | IntoParallelIterator types |
| Work distribution | Sequential discovery | Known upfront |
| Load balancing | Limited | Full work stealing |
| Synchronization | Queue contention | Minimal |
| Memory usage | Buffer only | May need collection |
| Use case | Sequential sources | Collections, slices |
par_bridge exists to bridge the gap between sequential iterators and parallel processing, but it's not a free conversion:
How it works: One thread drives the sequential iterator, pushing items into a concurrent queue. Worker threads pull items from this queue and process them in parallel. This means item discovery is sequential (the iterator must be driven by one thread) while item processing is parallel (workers compete for queue items).
The costs are fundamental to the architecture: The shared queue creates synchronization overhead. Every item passes through this bottleneck, requiring atomic operations or mutex acquisition. Native parallel iterators avoid this by splitting data into independent chunks that workers process without coordination. Additionally, load balancing is limited because the bridge doesn't know future workāit can only distribute what it has seen. If slow items come late in the iteration, early workers sit idle.
When par_bridge is necessary: File lines, network streams, or any source where items are discovered sequentially and cannot be indexed into. These sources cannot implement IntoParallelIterator without buffering, which defeats the purpose for large or streaming data.
Key insight: par_bridge is a compromise that enables parallel processing of sequential sources at the cost of reduced efficiency. If you can restructure to use native parallel iteratorsāby collecting into a Vec first, or by designing your data source to support parallel iterationāthe performance improvement is substantial. The synchronization overhead of the shared queue is not trivial, and the inability to pre-balance work means some parallelism is lost compared to what native parallel iterators achieve. Use par_bridge when the source forces sequential access, not as a convenience wrapper for collections that could use par_iter() directly.