Loading page…
Rust walkthroughs
Loading page…
A Barrier is a synchronization primitive that enables multiple threads to synchronize at a specific point in their execution. When threads reach a barrier, they block until all participating threads have reached the same barrier, then they all proceed together.
Barriers are located in std::sync::Barrier and are useful for:
Key methods:
new(n) — Create a barrier for n threadswait() — Block until all threads reach the barrier, returns BarrierWaitResultBarrierWaitResult::is_leader() — Returns true for exactly one thread (useful for cleanup)The barrier automatically resets after all threads pass through, making it reusable for multiple synchronization points.
use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
// Create a barrier for 3 threads
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
for i in 0..3 {
let barrier_clone = Arc::clone(&barrier);
let handle = thread::spawn(move || {
println!("Thread {} before barrier", i);
// Wait for all threads to reach this point
barrier_clone.wait();
// All threads proceed together after barrier
println!("Thread {} after barrier", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let barrier = Arc::new(Barrier::new(4));
let mut handles = vec![];
for i in 0..4 {
let barrier_clone = Arc::clone(&barrier);
let handle = thread::spawn(move || {
println!("Thread {} working on phase 1", i);
let result = barrier_clone.wait();
// Exactly one thread will be the "leader"
if result.is_leader() {
println!(">>> Thread {} is the leader!" , i);
}
println!("Thread {} starting phase 2", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier, Mutex};
use std::thread;
fn main() {
let num_threads = 4;
let barrier = Arc::new(Barrier::new(num_threads));
// Shared data for computation
let data = Arc::new(Mutex::new(vec![0; num_threads]));
let mut handles = vec![];
for i in 0..num_threads {
let barrier_clone = Arc::clone(&barrier);
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
// Phase 1: Initialize
{
let mut data = data_clone.lock().unwrap();
data[i] = i * 10;
println!("Thread {} initialized data[{}] = {}", i, i, data[i]);
}
barrier_clone.wait(); // Sync after initialization
// Phase 2: Read others and compute
let my_sum: i32;
{
let data = data_clone.lock().unwrap();
my_sum = data.iter().sum();
println!("Thread {} sees total: {}", i, my_sum);
}
barrier_clone.wait(); // Sync after reading
// Phase 3: Final computation
{
let mut data = data_clone.lock().unwrap();
data[i] = my_sum;
}
let result = barrier_clone.wait(); // Sync after final phase
if result.is_leader() {
let data = data_clone.lock().unwrap();
println!("Final data: {:?}", *data);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let rows = 4;
let cols = 4;
let mut matrix = vec![vec![0i32; cols]; rows];
// Initialize matrix
for i in 0..rows {
for j in 0..cols {
matrix[i][j] = (i * cols + j) as i32;
}
}
println!("Initial matrix:");
for row in &matrix {
println!(" {:?}", row);
}
let matrix = Arc::new(std::sync::Mutex::new(matrix));
let barrier = Arc::new(Barrier::new(rows));
let mut handles = vec![];
// Each thread processes one row
for thread_id in 0..rows {
let barrier_clone = Arc::clone(&barrier);
let matrix_clone = Arc::clone(&matrix);
let handle = thread::spawn(move || {
// Phase 1: Double all values
{
let mut matrix = matrix_clone.lock().unwrap();
for j in 0..cols {
matrix[thread_id][j] *= 2;
}
}
barrier_clone.wait();
// Phase 2: Add neighbor values (requires all phase 1 complete)
let left_neighbor = if thread_id > 0 { thread_id - 1 } else { rows - 1 };
{
let mut matrix = matrix_clone.lock().unwrap();
for j in 0..cols {
matrix[thread_id][j] += matrix[left_neighbor][j] / 2;
}
}
let result = barrier_clone.wait();
if result.is_leader() {
let matrix = matrix_clone.lock().unwrap();
println!("\nProcessed matrix:");
for row in matrix.iter() {
println!(" {:?}", row);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;
fn main() {
let num_threads = 4;
let barrier = Arc::new(Barrier::new(num_threads));
println!("Starting benchmark with {} threads...", num_threads);
let start_time = Arc::new(std::sync::Mutex::new(None));
let mut handles = vec![];
for i in 0..num_threads {
let barrier_clone = Arc::clone(&barrier);
let start_clone = Arc::clone(&start_time);
let handle = thread::spawn(move || {
// First barrier: ensure all threads start together
let result = barrier_clone.wait();
if result.is_leader() {
// Leader records the start time
*start_clone.lock().unwrap() = Some(Instant::now());
}
// Simulated work
let mut sum = 0u64;
for n in 0..10_000_000 {
sum = sum.wrapping_add(n);
}
// Second barrier: wait for all threads to finish
let result = barrier_clone.wait();
if result.is_leader() {
let elapsed = start_clone.lock().unwrap().unwrap().elapsed();
println!("All threads completed in {:?}", elapsed);
}
println!("Thread {} computed sum: {}", i, sum);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let num_workers = 3;
// Multiple barriers for different pipeline stages
let stage1_barrier = Arc::new(Barrier::new(num_workers));
let stage2_barrier = Arc::new(Barrier::new(num_workers));
let stage3_barrier = Arc::new(Barrier::new(num_workers));
let mut handles = vec![];
for i in 0..num_workers {
let b1 = Arc::clone(&stage1_barrier);
let b2 = Arc::clone(&stage2_barrier);
let b3 = Arc::clone(&stage3_barrier);
let handle = thread::spawn(move || {
// Stage 1
println!("[Thread {}] Starting stage 1", i);
thread::sleep(std::time::Duration::from_millis(100 * (i + 1)));
println!("[Thread {}] Finished stage 1", i);
b1.wait();
// Stage 2 (all threads start together)
println!("[Thread {}] Starting stage 2", i);
thread::sleep(std::time::Duration::from_millis(50 * (num_workers - i)));
println!("[Thread {}] Finished stage 2", i);
b2.wait();
// Stage 3
println!("[Thread {}] Starting stage 3", i);
let result = b3.wait();
if result.is_leader() {
println!("\n=== All threads completed all stages ===");
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier, Mutex, Condvar};
use std::thread;
fn main() {
let num_threads = 4;
let barrier = Arc::new(Barrier::new(num_threads));
// Shared state protected by mutex
let state = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
let mut handles = vec![];
for i in 0..num_threads {
let barrier_clone = Arc::clone(&barrier);
let state_clone = Arc::clone(&state);
let handle = thread::spawn(move || {
// Phase 1: Each thread adds to shared state
{
let (lock, _) = &*state_clone;
let mut data = lock.lock().unwrap();
data.push(format!("Item from thread {}", i));
}
// Barrier ensures all threads complete phase 1
barrier_clone.wait();
// Leader prints results after phase 1
let result = barrier_clone.wait();
if result.is_leader() {
let (lock, cvar) = &*state_clone;
let data = lock.lock().unwrap();
println!("Collected items: {:?}", *data);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
// Simulate dynamic work allocation
let total_work = 20;
let num_threads = 4;
let work_per_thread = total_work / num_threads;
let barrier = Arc::new(Barrier::new(num_threads));
let results = Arc::new(std::sync::Mutex::new(Vec::new()));
let mut handles = vec![];
for thread_id in 0..num_threads {
let barrier_clone = Arc::clone(&barrier);
let results_clone = Arc::clone(&results);
let handle = thread::spawn(move || {
let start = thread_id * work_per_thread;
let end = if thread_id == num_threads - 1 {
total_work // Handle remainder
} else {
(thread_id + 1) * work_per_thread
};
// Compute partial results
let mut partial = Vec::new();
for i in start..end {
partial.push(i * i); // Square each number
}
println!("Thread {} computed squares for range {}..{}", thread_id, start, end);
// Wait for all threads to complete computation
barrier_clone.wait();
// Combine results (only leader does this)
let result = barrier_clone.wait();
// Store partial results
{
let mut results = results_clone.lock().unwrap();
results.extend(partial);
}
barrier_clone.wait();
if result.is_leader() {
let mut results = results_clone.lock().unwrap();
results.sort();
println!("\nAll squares: {:?}", *results);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let num_threads = 4;
let barrier = Arc::new(Barrier::new(num_threads));
let mut handles = vec![];
for i in 0..num_threads {
let barrier_clone = Arc::clone(&barrier);
let handle = thread::spawn(move || {
// Simulated work that might fail
let result = if i == 2 {
Err(format!("Thread {} failed!", i))
} else {
Ok(format!("Thread {} succeeded", i))
};
// Still need to wait at barrier to not block other threads
barrier_clone.wait();
result
});
handles.push(handle);
}
// Collect results
let mut success_count = 0;
let mut failure_count = 0;
for (i, handle) in handles.into_iter().enumerate() {
match handle.join() {
Ok(Ok(msg)) => {
println!("Thread {}: {}", i, msg);
success_count += 1;
}
Ok(Err(e)) => {
println!("Thread {}: {}", i, e);
failure_count += 1;
}
Err(e) => {
println!("Thread {} panicked: {:?}", i, e);
failure_count += 1;
}
}
}
println!("\nSummary: {} succeeded, {} failed", success_count, failure_count);
}| Method | Description |
|--------|-------------|
| Barrier::new(n) | Create a barrier for n threads |
| wait() | Block until all threads reach the barrier |
| BarrierWaitResult::is_leader() | Returns true for exactly one thread |
Common Patterns:
| Pattern | Use Case | |---------|----------| | Multi-phase computation | Divide work into synchronized stages | | Leader election | One thread performs special work | | Benchmarking | Ensure all threads start simultaneously | | Data aggregation | Collect partial results after barrier |
Comparison with Other Primitives:
| Primitive | Use Case |
|-----------|----------|
| Barrier | All N threads sync at a point, reusable |
| Condvar | Wait for arbitrary conditions |
| Channel | Message passing between threads |
| Once | One-time initialization |
Key Points:
is_leader() == true after each waitwait() for the barrier to releaseMutex for shared data access across phases