How do I work with Barrier for thread synchronization in Rust?

Walkthrough

A Barrier is a synchronization primitive that enables multiple threads to synchronize at a specific point in their execution. When threads reach a barrier, they block until all participating threads have reached the same barrier, then they all proceed together.

Barriers are located in std::sync::Barrier and are useful for:

  • Phased computations — Where work happens in stages
  • Parallel algorithms — Where each phase must complete before the next begins
  • Benchmarking — Ensuring all threads start simultaneously
  • Data synchronization — Coordinating access to shared data across phases

Key methods:

  • new(n) — Create a barrier for n threads
  • wait() — Block until all threads reach the barrier, returns BarrierWaitResult
  • BarrierWaitResult::is_leader() — Returns true for exactly one thread (useful for cleanup)

The barrier automatically resets after all threads pass through, making it reusable for multiple synchronization points.

Code Examples

Basic Barrier Usage

use std::sync::{Arc, Barrier};
use std::thread;
 
fn main() {
    // Create a barrier for 3 threads
    let barrier = Arc::new(Barrier::new(3));
    let mut handles = vec![];
    
    for i in 0..3 {
        let barrier_clone = Arc::clone(&barrier);
        
        let handle = thread::spawn(move || {
            println!("Thread {} before barrier", i);
            
            // Wait for all threads to reach this point
            barrier_clone.wait();
            
            // All threads proceed together after barrier
            println!("Thread {} after barrier", i);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Identifying the Leader Thread

use std::sync::{Arc, Barrier};
use std::thread;
 
fn main() {
    let barrier = Arc::new(Barrier::new(4));
    let mut handles = vec![];
    
    for i in 0..4 {
        let barrier_clone = Arc::clone(&barrier);
        
        let handle = thread::spawn(move || {
            println!("Thread {} working on phase 1", i);
            
            let result = barrier_clone.wait();
            
            // Exactly one thread will be the "leader"
            if result.is_leader() {
                println!(">>> Thread {} is the leader!" , i);
            }
            
            println!("Thread {} starting phase 2", i);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Multi-Phase Computation

use std::sync::{Arc, Barrier, Mutex};
use std::thread;
 
fn main() {
    let num_threads = 4;
    let barrier = Arc::new(Barrier::new(num_threads));
    
    // Shared data for computation
    let data = Arc::new(Mutex::new(vec![0; num_threads]));
    
    let mut handles = vec![];
    
    for i in 0..num_threads {
        let barrier_clone = Arc::clone(&barrier);
        let data_clone = Arc::clone(&data);
        
        let handle = thread::spawn(move || {
            // Phase 1: Initialize
            {
                let mut data = data_clone.lock().unwrap();
                data[i] = i * 10;
                println!("Thread {} initialized data[{}] = {}", i, i, data[i]);
            }
            
            barrier_clone.wait(); // Sync after initialization
            
            // Phase 2: Read others and compute
            let my_sum: i32;
            {
                let data = data_clone.lock().unwrap();
                my_sum = data.iter().sum();
                println!("Thread {} sees total: {}", i, my_sum);
            }
            
            barrier_clone.wait(); // Sync after reading
            
            // Phase 3: Final computation
            {
                let mut data = data_clone.lock().unwrap();
                data[i] = my_sum;
            }
            
            let result = barrier_clone.wait(); // Sync after final phase
            
            if result.is_leader() {
                let data = data_clone.lock().unwrap();
                println!("Final data: {:?}", *data);
            }
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Parallel Matrix Processing

use std::sync::{Arc, Barrier};
use std::thread;
 
fn main() {
    let rows = 4;
    let cols = 4;
    let mut matrix = vec![vec![0i32; cols]; rows];
    
    // Initialize matrix
    for i in 0..rows {
        for j in 0..cols {
            matrix[i][j] = (i * cols + j) as i32;
        }
    }
    
    println!("Initial matrix:");
    for row in &matrix {
        println!("  {:?}", row);
    }
    
    let matrix = Arc::new(std::sync::Mutex::new(matrix));
    let barrier = Arc::new(Barrier::new(rows));
    
    let mut handles = vec![];
    
    // Each thread processes one row
    for thread_id in 0..rows {
        let barrier_clone = Arc::clone(&barrier);
        let matrix_clone = Arc::clone(&matrix);
        
        let handle = thread::spawn(move || {
            // Phase 1: Double all values
            {
                let mut matrix = matrix_clone.lock().unwrap();
                for j in 0..cols {
                    matrix[thread_id][j] *= 2;
                }
            }
            
            barrier_clone.wait();
            
            // Phase 2: Add neighbor values (requires all phase 1 complete)
            let left_neighbor = if thread_id > 0 { thread_id - 1 } else { rows - 1 };
            
            {
                let mut matrix = matrix_clone.lock().unwrap();
                for j in 0..cols {
                    matrix[thread_id][j] += matrix[left_neighbor][j] / 2;
                }
            }
            
            let result = barrier_clone.wait();
            
            if result.is_leader() {
                let matrix = matrix_clone.lock().unwrap();
                println!("\nProcessed matrix:");
                for row in matrix.iter() {
                    println!("  {:?}", row);
                }
            }
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Benchmarking with Barrier

use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;
 
fn main() {
    let num_threads = 4;
    let barrier = Arc::new(Barrier::new(num_threads));
    
    println!("Starting benchmark with {} threads...", num_threads);
    
    let start_time = Arc::new(std::sync::Mutex::new(None));
    let mut handles = vec![];
    
    for i in 0..num_threads {
        let barrier_clone = Arc::clone(&barrier);
        let start_clone = Arc::clone(&start_time);
        
        let handle = thread::spawn(move || {
            // First barrier: ensure all threads start together
            let result = barrier_clone.wait();
            
            if result.is_leader() {
                // Leader records the start time
                *start_clone.lock().unwrap() = Some(Instant::now());
            }
            
            // Simulated work
            let mut sum = 0u64;
            for n in 0..10_000_000 {
                sum = sum.wrapping_add(n);
            }
            
            // Second barrier: wait for all threads to finish
            let result = barrier_clone.wait();
            
            if result.is_leader() {
                let elapsed = start_clone.lock().unwrap().unwrap().elapsed();
                println!("All threads completed in {:?}", elapsed);
            }
            
            println!("Thread {} computed sum: {}", i, sum);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Pipeline with Multiple Barriers

use std::sync::{Arc, Barrier};
use std::thread;
 
fn main() {
    let num_workers = 3;
    
    // Multiple barriers for different pipeline stages
    let stage1_barrier = Arc::new(Barrier::new(num_workers));
    let stage2_barrier = Arc::new(Barrier::new(num_workers));
    let stage3_barrier = Arc::new(Barrier::new(num_workers));
    
    let mut handles = vec![];
    
    for i in 0..num_workers {
        let b1 = Arc::clone(&stage1_barrier);
        let b2 = Arc::clone(&stage2_barrier);
        let b3 = Arc::clone(&stage3_barrier);
        
        let handle = thread::spawn(move || {
            // Stage 1
            println!("[Thread {}] Starting stage 1", i);
            thread::sleep(std::time::Duration::from_millis(100 * (i + 1)));
            println!("[Thread {}] Finished stage 1", i);
            b1.wait();
            
            // Stage 2 (all threads start together)
            println!("[Thread {}] Starting stage 2", i);
            thread::sleep(std::time::Duration::from_millis(50 * (num_workers - i)));
            println!("[Thread {}] Finished stage 2", i);
            b2.wait();
            
            // Stage 3
            println!("[Thread {}] Starting stage 3", i);
            let result = b3.wait();
            
            if result.is_leader() {
                println!("\n=== All threads completed all stages ===");
            }
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Combining Barrier with Other Primitives

use std::sync::{Arc, Barrier, Mutex, Condvar};
use std::thread;
 
fn main() {
    let num_threads = 4;
    let barrier = Arc::new(Barrier::new(num_threads));
    
    // Shared state protected by mutex
    let state = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
    
    let mut handles = vec![];
    
    for i in 0..num_threads {
        let barrier_clone = Arc::clone(&barrier);
        let state_clone = Arc::clone(&state);
        
        let handle = thread::spawn(move || {
            // Phase 1: Each thread adds to shared state
            {
                let (lock, _) = &*state_clone;
                let mut data = lock.lock().unwrap();
                data.push(format!("Item from thread {}", i));
            }
            
            // Barrier ensures all threads complete phase 1
            barrier_clone.wait();
            
            // Leader prints results after phase 1
            let result = barrier_clone.wait();
            if result.is_leader() {
                let (lock, cvar) = &*state_clone;
                let data = lock.lock().unwrap();
                println!("Collected items: {:?}", *data);
            }
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Dynamic Thread Count with Barrier

use std::sync::{Arc, Barrier};
use std::thread;
 
fn main() {
    // Simulate dynamic work allocation
    let total_work = 20;
    let num_threads = 4;
    let work_per_thread = total_work / num_threads;
    
    let barrier = Arc::new(Barrier::new(num_threads));
    let results = Arc::new(std::sync::Mutex::new(Vec::new()));
    
    let mut handles = vec![];
    
    for thread_id in 0..num_threads {
        let barrier_clone = Arc::clone(&barrier);
        let results_clone = Arc::clone(&results);
        
        let handle = thread::spawn(move || {
            let start = thread_id * work_per_thread;
            let end = if thread_id == num_threads - 1 {
                total_work // Handle remainder
            } else {
                (thread_id + 1) * work_per_thread
            };
            
            // Compute partial results
            let mut partial = Vec::new();
            for i in start..end {
                partial.push(i * i); // Square each number
            }
            
            println!("Thread {} computed squares for range {}..{}", thread_id, start, end);
            
            // Wait for all threads to complete computation
            barrier_clone.wait();
            
            // Combine results (only leader does this)
            let result = barrier_clone.wait();
            
            // Store partial results
            {
                let mut results = results_clone.lock().unwrap();
                results.extend(partial);
            }
            
            barrier_clone.wait();
            
            if result.is_leader() {
                let mut results = results_clone.lock().unwrap();
                results.sort();
                println!("\nAll squares: {:?}", *results);
            }
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Error Handling with Barriers

use std::sync::{Arc, Barrier};
use std::thread;
 
fn main() {
    let num_threads = 4;
    let barrier = Arc::new(Barrier::new(num_threads));
    
    let mut handles = vec![];
    
    for i in 0..num_threads {
        let barrier_clone = Arc::clone(&barrier);
        
        let handle = thread::spawn(move || {
            // Simulated work that might fail
            let result = if i == 2 {
                Err(format!("Thread {} failed!", i))
            } else {
                Ok(format!("Thread {} succeeded", i))
            };
            
            // Still need to wait at barrier to not block other threads
            barrier_clone.wait();
            
            result
        });
        
        handles.push(handle);
    }
    
    // Collect results
    let mut success_count = 0;
    let mut failure_count = 0;
    
    for (i, handle) in handles.into_iter().enumerate() {
        match handle.join() {
            Ok(Ok(msg)) => {
                println!("Thread {}: {}", i, msg);
                success_count += 1;
            }
            Ok(Err(e)) => {
                println!("Thread {}: {}", i, e);
                failure_count += 1;
            }
            Err(e) => {
                println!("Thread {} panicked: {:?}", i, e);
                failure_count += 1;
            }
        }
    }
    
    println!("\nSummary: {} succeeded, {} failed", success_count, failure_count);
}

Summary

Method Description
Barrier::new(n) Create a barrier for n threads
wait() Block until all threads reach the barrier
BarrierWaitResult::is_leader() Returns true for exactly one thread

Common Patterns:

Pattern Use Case
Multi-phase computation Divide work into synchronized stages
Leader election One thread performs special work
Benchmarking Ensure all threads start simultaneously
Data aggregation Collect partial results after barrier

Comparison with Other Primitives:

Primitive Use Case
Barrier All N threads sync at a point, reusable
Condvar Wait for arbitrary conditions
Channel Message passing between threads
Once One-time initialization

Key Points:

  • Barriers automatically reset and are reusable
  • Exactly one thread gets is_leader() == true after each wait
  • All threads must call wait() for the barrier to release
  • Use barriers for phased parallel algorithms
  • Combine with Mutex for shared data access across phases