Loading page…
Rust walkthroughs
Loading page…
Barrier is a synchronization primitive that allows multiple threads to synchronize at a specific point in their execution. When a thread reaches a barrier, it blocks until all threads have reached the same barrier point. Once all threads arrive, they are all released simultaneously.
Key concepts:
wait(), check if this thread was the "leader"When to use Barrier:
When NOT to use Barrier:
use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
// Create a barrier for 3 threads
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
for i in 0..3 {
let barrier = Arc::clone(&barrier);
let handle = thread::spawn(move || {
println!("Thread {} before barrier", i);
// Wait at barrier
barrier.wait();
println!("Thread {} after barrier", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let barrier = Arc::new(Barrier::new(4));
let mut handles = vec![];
for i in 0..4 {
let barrier = Arc::clone(&barrier);
let handle = thread::spawn(move || {
println!("Thread {} working...", i);
thread::sleep(std::time::Duration::from_millis(100 * (i + 1)));
let result = barrier.wait();
// One thread becomes the "leader"
if result.is_leader() {
println!("Thread {} is the leader!", i);
}
println!("Thread {} continues", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let num_threads = 4;
let barrier = Arc::new(Barrier::new(num_threads));
let mut handles = vec![];
for i in 0..num_threads {
let barrier = Arc::clone(&barrier);
let handle = thread::spawn(move || {
// Phase 1: Initialize
println!("Thread {}: Phase 1 - Initializing", i);
thread::sleep(std::time::Duration::from_millis(50));
barrier.wait();
// Phase 2: Process
println!("Thread {}: Phase 2 - Processing", i);
thread::sleep(std::time::Duration::from_millis(100));
barrier.wait();
// Phase 3: Finalize
println!("Thread {}: Phase 3 - Finalizing", i);
thread::sleep(std::time::Duration::from_millis(30));
barrier.wait();
println!("Thread {}: Complete!", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier, Mutex};
use std::thread;
fn main() {
let num_threads = 4;
let barrier = Arc::new(Barrier::new(num_threads));
let results = Arc::new(Mutex::new(Vec::new()));
let mut handles = vec![];
for i in 0..num_threads {
let barrier = Arc::clone(&barrier);
let results = Arc::clone(&results);
let handle = thread::spawn(move || {
// Each thread computes a value
let value = i * i;
// Store partial result
results.lock().unwrap().push(value);
// Wait for all threads to finish computing
barrier.wait();
// Leader aggregates results
let sum: i32 = results.lock().unwrap().iter().sum();
barrier.wait();
println!("Thread {} sees sum: {}", i, sum);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier, Mutex};
use std::thread;
fn main() {
let rows = 4;
let cols = 4;
let matrix: Vec<Vec<i32>> = (0..rows).map(|r| (0..cols).map(|c| (r * cols + c) as i32).collect()).collect();
let result = Arc::new(Mutex::new(vec![vec![0i32; cols]; rows]));
let barrier = Arc::new(Barrier::new(rows));
let mut handles = vec![];
for row in 0..rows {
let barrier = Arc::clone(&barrier);
let result = Arc::clone(&result);
let row_data = matrix[row].clone();
let handle = thread::spawn(move || {
// Phase 1: Compute row
let computed: Vec<i32> = row_data.iter().map(|&x| x * 2).collect();
{
let mut result = result.lock().unwrap();
result[row] = computed;
}
barrier.wait();
// Phase 2: Read all results
let snapshot = result.lock().unwrap().clone();
println!("Row {} sees: {:?}", row, snapshot);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;
fn main() {
let num_threads = 4;
let barrier = Arc::new(Barrier::new(num_threads + 1)); // +1 for main
let start_barrier = Arc::new(Barrier::new(num_threads + 1));
let mut handles = vec![];
for i in 0..num_threads {
let barrier = Arc::clone(&barrier);
let start_barrier = Arc::clone(&start_barrier);
let handle = thread::spawn(move || {
start_barrier.wait(); // Synchronized start
// Work being benchmarked
let mut sum = 0u64;
for n in 0..1_000_000 {
sum = sum.wrapping_add(n);
}
barrier.wait(); // Synchronized end
sum
});
handles.push(handle);
}
// Synchronized start
start_barrier.wait();
let start = Instant::now();
// Wait for all threads to complete
barrier.wait();
let duration = start.elapsed();
println!("All threads completed in {:?}", duration);
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier, Mutex};
use std::thread;
fn main() {
let num_threads = 3;
let num_iterations = 3;
let barrier = Arc::new(Barrier::new(num_threads));
let iteration = Arc::new(Mutex::new(0));
let mut handles = vec![];
for i in 0..num_threads {
let barrier = Arc::clone(&barrier);
let iteration = Arc::clone(&iteration);
let handle = thread::spawn(move || {
for _ in 0..num_iterations {
let current_iter = *iteration.lock().unwrap();
println!("Thread {} working on iteration {}", i, current_iter);
thread::sleep(std::time::Duration::from_millis(50));
let result = barrier.wait();
if result.is_leader() {
println!("--- Iteration {} complete ---", current_iter);
*iteration.lock().unwrap() += 1;
}
barrier.wait(); // Ensure leader updates iteration before others read
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}Note: Barrier::wait_timeout is available in nightly Rust or you can combine with other primitives.
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
fn main() {
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
for i in 0..3 {
let barrier = Arc::clone(&barrier);
let handle = thread::spawn(move || {
// Simulate different arrival times
thread::sleep(Duration::from_millis(100 * i as u64));
println!("Thread {} reached barrier", i);
barrier.wait();
println!("Thread {} released", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier, Condvar, Mutex};
use std::thread;
fn main() {
// Barrier: All threads wait for each other
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
for i in 0..3 {
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
println!("Barrier Thread {} waiting", i);
barrier.wait();
println!("Barrier Thread {} continuing", i);
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("---");
// Condvar: One thread signals another
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_clone = Arc::clone(&pair);
let waiter = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut ready = lock.lock().unwrap();
while !*ready {
ready = cvar.wait(ready).unwrap();
}
println!("Condvar Thread woke up");
});
thread::sleep(std::time::Duration::from_millis(100));
{
let (lock, cvar) = &*pair;
*lock.lock().unwrap() = true;
cvar.notify_one();
}
waiter.join().unwrap();
}use std::sync::{Arc, Barrier, Mutex};
use std::thread;
struct Pipeline {
stage1_barrier: Arc<Barrier>,
stage2_barrier: Arc<Barrier>,
stage3_barrier: Arc<Barrier>,
data: Arc<Mutex<Vec<i32>>>,
}
impl Pipeline {
fn new(num_workers: usize) -> Self {
Self {
stage1_barrier: Arc::new(Barrier::new(num_workers)),
stage2_barrier: Arc::new(Barrier::new(num_workers)),
stage3_barrier: Arc::new(Barrier::new(num_workers)),
data: Arc::new(Mutex::new(vec![0; num_workers])),
}
}
}
fn main() {
let num_workers = 4;
let pipeline = Pipeline::new(num_workers);
let mut handles = vec![];
for worker_id in 0..num_workers {
let stage1 = Arc::clone(&pipeline.stage1_barrier);
let stage2 = Arc::clone(&pipeline.stage2_barrier);
let stage3 = Arc::clone(&pipeline.stage3_barrier);
let data = Arc::clone(&pipeline.data);
let handle = thread::spawn(move || {
// Stage 1: Generate
{
let mut data = data.lock().unwrap();
data[worker_id] = (worker_id + 1) as i32;
}
stage1.wait();
// Stage 2: Transform (double values)
{
let mut data = data.lock().unwrap();
data[worker_id] *= 2;
}
stage2.wait();
// Stage 3: Aggregate (compute sum)
let sum: i32 = {
let data = data.lock().unwrap();
data.iter().sum()
};
stage3.wait();
println!("Worker {} final sum: {}", worker_id, sum);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;
fn main() {
let num_threads = 5;
let start_barrier = Arc::new(Barrier::new(num_threads + 1)); // +1 for main
let mut handles = vec![];
for i in 0..num_threads {
let barrier = Arc::clone(&start_barrier);
let handle = thread::spawn(move || {
println!("Thread {} ready", i);
barrier.wait();
let start = Instant::now();
// Simulate work
thread::sleep(std::time::Duration::from_millis(100));
start.elapsed()
});
handles.push(handle);
}
println!("Main thread ready");
start_barrier.wait();
println!("All threads starting now!");
for (i, handle) in handles.into_iter().enumerate() {
let elapsed = handle.join().unwrap();
println!("Thread {} took {:?}", i, elapsed);
}
}use std::sync::{Arc, Barrier, Mutex};
use std::thread;
fn main() {
let num_threads = 4;
let barrier = Arc::new(Barrier::new(num_threads));
let data = Arc::new(Mutex::new(vec![64, 25, 12, 22, 11, 90, 45, 33,
1, 78, 56, 89, 23, 67, 34, 99]));
let sorted = Arc::new(Mutex::new(false));
let mut handles = vec![];
for thread_id in 0..num_threads {
let barrier = Arc::clone(&barrier);
let data = Arc::clone(&data);
let sorted = Arc::clone(&sorted);
let handle = thread::spawn(move || {
for pass in 0..data.lock().unwrap().len() {
{
let mut arr = data.lock().unwrap();
let chunk_size = arr.len() / num_threads;
let start = thread_id * chunk_size;
let end = if thread_id == num_threads - 1 {
arr.len()
} else {
(thread_id + 1) * chunk_size
};
// Local sort on chunk
arr[start..end].sort();
}
barrier.wait();
// Leader checks if fully sorted
if thread_id == 0 {
let arr = data.lock().unwrap();
let is_sorted = arr.windows(2).all(|w| w[0] <= w[1]);
*sorted.lock().unwrap() = is_sorted;
}
barrier.wait();
if *sorted.lock().unwrap() {
break;
}
}
if thread_id == 0 {
println!("Sorted: {:?}", *data.lock().unwrap());
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use std::collections::HashMap;
fn main() {
let data = vec!["apple", "banana", "apple", "cherry", "banana",
"apple", "date", "cherry", "elderberry", "fig"];
let num_workers = 3;
let map_barrier = Arc::new(Barrier::new(num_workers));
let reduce_barrier = Arc::new(Barrier::new(num_workers));
let partial_results = Arc::new(Mutex::new(Vec::new()));
let final_result = Arc::new(Mutex::new(HashMap::new()));
let mut handles = vec![];
for worker_id in 0..num_workers {
let map_barrier = Arc::clone(&map_barrier);
let reduce_barrier = Arc::clone(&reduce_barrier);
let partial_results = Arc::clone(&partial_results);
let final_result = Arc::clone(&final_result);
let chunk = data.chunks((data.len() + num_workers - 1) / num_workers)
.nth(worker_id)
.unwrap_or(&[])
.to_vec();
let handle = thread::spawn(move || {
// Map phase: count words in chunk
let mut local_counts = HashMap::new();
for word in chunk {
*local_counts.entry(word).or_insert(0) += 1;
}
partial_results.lock().unwrap().push(local_counts);
map_barrier.wait();
// Reduce phase: aggregate partial results
let result = reduce_barrier.wait();
if result.is_leader() {
let mut final_counts = HashMap::new();
for partial in partial_results.lock().unwrap().iter() {
for (word, count) in partial {
*final_counts.entry(*word).or_insert(0) += count;
}
}
*final_result.lock().unwrap() = final_counts;
}
reduce_barrier.wait();
if worker_id == 0 {
println!("Final counts: {:?}", *final_result.lock().unwrap());
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}Barrier Methods:
| Method | Description |
|--------|-------------|
| new(n) | Create a barrier for n threads |
| wait() | Block until all threads reach barrier |
| wait().is_leader() | Check if this thread is the "leader" |
Barrier vs Other Primitives:
| Primitive | Purpose |
|-----------|---------|
| Barrier | All threads wait for each other (synchronization point) |
| Condvar | Thread waits for signal from another thread |
| Channel | Thread sends/receives data to/from another thread |
| Barrier | Multiple threads synchronize at same point |
| Once | One-time initialization (single use) |
Common Patterns:
| Pattern | Description | |---------|-------------| | Multi-phase computation | Parallel algorithms with distinct phases | | Bulk-synchronous parallel | BSP model implementation | | Synchronized startup | All threads start at same time | | Benchmarking | Measure parallel section execution | | Map-reduce | Parallel map, then reduce phase | | Pipeline stages | Phased parallel processing |
Key Points:
Barrier::new(n) creates a barrier for n threadswait() blocks until all n threads have called wait()BarrierWaitResult::is_leader() identifies one thread as leaderMutex for shared state accessCondvar or channelsstd::sync module