How do I work with Barrier for Thread Coordination in Rust?

Walkthrough

Barrier is a synchronization primitive that allows multiple threads to synchronize at a specific point in their execution. When a thread reaches a barrier, it blocks until all threads have reached the same barrier point. Once all threads arrive, they are all released simultaneously.

Key concepts:

  • Barrier point — A synchronization point where threads wait
  • Count — Number of threads that must arrive before release
  • BarrierWaitResult — Returned by wait(), check if this thread was the "leader"
  • Reusable — Barriers can be reused after all threads pass through

When to use Barrier:

  • Parallel computations with phases
  • Bulk-synchronous parallel algorithms
  • Coordinated thread startup or shutdown
  • Multi-stage parallel processing
  • Benchmarking parallel code sections

When NOT to use Barrier:

  • Producer-consumer patterns (use channels)
  • Event signaling (use Condvar or channels)
  • One-time events (use Once or OnceLock)

Code Examples

Basic Barrier Usage

use std::sync::{Arc, Barrier};
use std::thread;
 
fn main() {
    // Create a barrier for 3 threads
    let barrier = Arc::new(Barrier::new(3));
    let mut handles = vec![];
    
    for i in 0..3 {
        let barrier = Arc::clone(&barrier);
        
        let handle = thread::spawn(move || {
            println!("Thread {} before barrier", i);
            
            // Wait at barrier
            barrier.wait();
            
            println!("Thread {} after barrier", i);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Barrier with Leader Detection

use std::sync::{Arc, Barrier};
use std::thread;
 
fn main() {
    let barrier = Arc::new(Barrier::new(4));
    let mut handles = vec![];
    
    for i in 0..4 {
        let barrier = Arc::clone(&barrier);
        
        let handle = thread::spawn(move || {
            println!("Thread {} working...", i);
            thread::sleep(std::time::Duration::from_millis(100 * (i + 1)));
            
            let result = barrier.wait();
            
            // One thread becomes the "leader"
            if result.is_leader() {
                println!("Thread {} is the leader!", i);
            }
            
            println!("Thread {} continues", i);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Multi-Phase Parallel Computation

use std::sync::{Arc, Barrier};
use std::thread;
 
fn main() {
    let num_threads = 4;
    let barrier = Arc::new(Barrier::new(num_threads));
    let mut handles = vec![];
    
    for i in 0..num_threads {
        let barrier = Arc::clone(&barrier);
        
        let handle = thread::spawn(move || {
            // Phase 1: Initialize
            println!("Thread {}: Phase 1 - Initializing", i);
            thread::sleep(std::time::Duration::from_millis(50));
            barrier.wait();
            
            // Phase 2: Process
            println!("Thread {}: Phase 2 - Processing", i);
            thread::sleep(std::time::Duration::from_millis(100));
            barrier.wait();
            
            // Phase 3: Finalize
            println!("Thread {}: Phase 3 - Finalizing", i);
            thread::sleep(std::time::Duration::from_millis(30));
            barrier.wait();
            
            println!("Thread {}: Complete!", i);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Barrier with Shared State

use std::sync::{Arc, Barrier, Mutex};
use std::thread;
 
fn main() {
    let num_threads = 4;
    let barrier = Arc::new(Barrier::new(num_threads));
    let results = Arc::new(Mutex::new(Vec::new()));
    let mut handles = vec![];
    
    for i in 0..num_threads {
        let barrier = Arc::clone(&barrier);
        let results = Arc::clone(&results);
        
        let handle = thread::spawn(move || {
            // Each thread computes a value
            let value = i * i;
            
            // Store partial result
            results.lock().unwrap().push(value);
            
            // Wait for all threads to finish computing
            barrier.wait();
            
            // Leader aggregates results
            let sum: i32 = results.lock().unwrap().iter().sum();
            
            barrier.wait();
            
            println!("Thread {} sees sum: {}", i, sum);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Parallel Matrix Computation

use std::sync::{Arc, Barrier, Mutex};
use std::thread;
 
fn main() {
    let rows = 4;
    let cols = 4;
    let matrix: Vec<Vec<i32>> = (0..rows).map(|r| (0..cols).map(|c| (r * cols + c) as i32).collect()).collect();
    let result = Arc::new(Mutex::new(vec![vec![0i32; cols]; rows]));
    let barrier = Arc::new(Barrier::new(rows));
    
    let mut handles = vec![];
    
    for row in 0..rows {
        let barrier = Arc::clone(&barrier);
        let result = Arc::clone(&result);
        let row_data = matrix[row].clone();
        
        let handle = thread::spawn(move || {
            // Phase 1: Compute row
            let computed: Vec<i32> = row_data.iter().map(|&x| x * 2).collect();
            {
                let mut result = result.lock().unwrap();
                result[row] = computed;
            }
            
            barrier.wait();
            
            // Phase 2: Read all results
            let snapshot = result.lock().unwrap().clone();
            println!("Row {} sees: {:?}", row, snapshot);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Barrier for Benchmarking

use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;
 
fn main() {
    let num_threads = 4;
    let barrier = Arc::new(Barrier::new(num_threads + 1)); // +1 for main
    let start_barrier = Arc::new(Barrier::new(num_threads + 1));
    let mut handles = vec![];
    
    for i in 0..num_threads {
        let barrier = Arc::clone(&barrier);
        let start_barrier = Arc::clone(&start_barrier);
        
        let handle = thread::spawn(move || {
            start_barrier.wait();  // Synchronized start
            
            // Work being benchmarked
            let mut sum = 0u64;
            for n in 0..1_000_000 {
                sum = sum.wrapping_add(n);
            }
            
            barrier.wait();  // Synchronized end
            
            sum
        });
        
        handles.push(handle);
    }
    
    // Synchronized start
    start_barrier.wait();
    let start = Instant::now();
    
    // Wait for all threads to complete
    barrier.wait();
    let duration = start.elapsed();
    
    println!("All threads completed in {:?}", duration);
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Cyclic Barrier Pattern

use std::sync::{Arc, Barrier, Mutex};
use std::thread;
 
fn main() {
    let num_threads = 3;
    let num_iterations = 3;
    let barrier = Arc::new(Barrier::new(num_threads));
    let iteration = Arc::new(Mutex::new(0));
    
    let mut handles = vec![];
    
    for i in 0..num_threads {
        let barrier = Arc::clone(&barrier);
        let iteration = Arc::clone(&iteration);
        
        let handle = thread::spawn(move || {
            for _ in 0..num_iterations {
                let current_iter = *iteration.lock().unwrap();
                println!("Thread {} working on iteration {}", i, current_iter);
                
                thread::sleep(std::time::Duration::from_millis(50));
                
                let result = barrier.wait();
                
                if result.is_leader() {
                    println!("--- Iteration {} complete ---", current_iter);
                    *iteration.lock().unwrap() += 1;
                }
                
                barrier.wait();  // Ensure leader updates iteration before others read
            }
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Barrier with Timeout (Using wait_timeout)

Note: Barrier::wait_timeout is available in nightly Rust or you can combine with other primitives.

use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
 
fn main() {
    let barrier = Arc::new(Barrier::new(3));
    let mut handles = vec![];
    
    for i in 0..3 {
        let barrier = Arc::clone(&barrier);
        
        let handle = thread::spawn(move || {
            // Simulate different arrival times
            thread::sleep(Duration::from_millis(100 * i as u64));
            println!("Thread {} reached barrier", i);
            
            barrier.wait();
            
            println!("Thread {} released", i);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Barrier vs Condvar Comparison

use std::sync::{Arc, Barrier, Condvar, Mutex};
use std::thread;
 
fn main() {
    // Barrier: All threads wait for each other
    let barrier = Arc::new(Barrier::new(3));
    
    let mut handles = vec![];
    for i in 0..3 {
        let barrier = Arc::clone(&barrier);
        handles.push(thread::spawn(move || {
            println!("Barrier Thread {} waiting", i);
            barrier.wait();
            println!("Barrier Thread {} continuing", i);
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("---");
    
    // Condvar: One thread signals another
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair_clone = Arc::clone(&pair);
    
    let waiter = thread::spawn(move || {
        let (lock, cvar) = &*pair_clone;
        let mut ready = lock.lock().unwrap();
        while !*ready {
            ready = cvar.wait(ready).unwrap();
        }
        println!("Condvar Thread woke up");
    });
    
    thread::sleep(std::time::Duration::from_millis(100));
    
    {
        let (lock, cvar) = &*pair;
        *lock.lock().unwrap() = true;
        cvar.notify_one();
    }
    
    waiter.join().unwrap();
}

Parallel Data Processing Pipeline

use std::sync::{Arc, Barrier, Mutex};
use std::thread;
 
struct Pipeline {
    stage1_barrier: Arc<Barrier>,
    stage2_barrier: Arc<Barrier>,
    stage3_barrier: Arc<Barrier>,
    data: Arc<Mutex<Vec<i32>>>,
}
 
impl Pipeline {
    fn new(num_workers: usize) -> Self {
        Self {
            stage1_barrier: Arc::new(Barrier::new(num_workers)),
            stage2_barrier: Arc::new(Barrier::new(num_workers)),
            stage3_barrier: Arc::new(Barrier::new(num_workers)),
            data: Arc::new(Mutex::new(vec![0; num_workers])),
        }
    }
}
 
fn main() {
    let num_workers = 4;
    let pipeline = Pipeline::new(num_workers);
    let mut handles = vec![];
    
    for worker_id in 0..num_workers {
        let stage1 = Arc::clone(&pipeline.stage1_barrier);
        let stage2 = Arc::clone(&pipeline.stage2_barrier);
        let stage3 = Arc::clone(&pipeline.stage3_barrier);
        let data = Arc::clone(&pipeline.data);
        
        let handle = thread::spawn(move || {
            // Stage 1: Generate
            {
                let mut data = data.lock().unwrap();
                data[worker_id] = (worker_id + 1) as i32;
            }
            stage1.wait();
            
            // Stage 2: Transform (double values)
            {
                let mut data = data.lock().unwrap();
                data[worker_id] *= 2;
            }
            stage2.wait();
            
            // Stage 3: Aggregate (compute sum)
            let sum: i32 = {
                let data = data.lock().unwrap();
                data.iter().sum()
            };
            stage3.wait();
            
            println!("Worker {} final sum: {}", worker_id, sum);
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Thread Startup Synchronization

use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;
 
fn main() {
    let num_threads = 5;
    let start_barrier = Arc::new(Barrier::new(num_threads + 1)); // +1 for main
    
    let mut handles = vec![];
    
    for i in 0..num_threads {
        let barrier = Arc::clone(&start_barrier);
        
        let handle = thread::spawn(move || {
            println!("Thread {} ready", i);
            barrier.wait();
            
            let start = Instant::now();
            // Simulate work
            thread::sleep(std::time::Duration::from_millis(100));
            
            start.elapsed()
        });
        
        handles.push(handle);
    }
    
    println!("Main thread ready");
    start_barrier.wait();
    println!("All threads starting now!");
    
    for (i, handle) in handles.into_iter().enumerate() {
        let elapsed = handle.join().unwrap();
        println!("Thread {} took {:?}", i, elapsed);
    }
}

Barrier for Parallel Sort Phases

use std::sync::{Arc, Barrier, Mutex};
use std::thread;
 
fn main() {
    let num_threads = 4;
    let barrier = Arc::new(Barrier::new(num_threads));
    let data = Arc::new(Mutex::new(vec![64, 25, 12, 22, 11, 90, 45, 33,
                                          1, 78, 56, 89, 23, 67, 34, 99]));
    let sorted = Arc::new(Mutex::new(false));
    
    let mut handles = vec![];
    
    for thread_id in 0..num_threads {
        let barrier = Arc::clone(&barrier);
        let data = Arc::clone(&data);
        let sorted = Arc::clone(&sorted);
        
        let handle = thread::spawn(move || {
            for pass in 0..data.lock().unwrap().len() {
                {
                    let mut arr = data.lock().unwrap();
                    let chunk_size = arr.len() / num_threads;
                    let start = thread_id * chunk_size;
                    let end = if thread_id == num_threads - 1 {
                        arr.len()
                    } else {
                        (thread_id + 1) * chunk_size
                    };
                    
                    // Local sort on chunk
                    arr[start..end].sort();
                }
                
                barrier.wait();
                
                // Leader checks if fully sorted
                if thread_id == 0 {
                    let arr = data.lock().unwrap();
                    let is_sorted = arr.windows(2).all(|w| w[0] <= w[1]);
                    *sorted.lock().unwrap() = is_sorted;
                }
                
                barrier.wait();
                
                if *sorted.lock().unwrap() {
                    break;
                }
            }
            
            if thread_id == 0 {
                println!("Sorted: {:?}", *data.lock().unwrap());
            }
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Barrier for Map-Reduce Pattern

use std::sync::{Arc, Barrier, Mutex};
use std::thread;
use std::collections::HashMap;
 
fn main() {
    let data = vec!["apple", "banana", "apple", "cherry", "banana", 
                    "apple", "date", "cherry", "elderberry", "fig"];
    let num_workers = 3;
    
    let map_barrier = Arc::new(Barrier::new(num_workers));
    let reduce_barrier = Arc::new(Barrier::new(num_workers));
    let partial_results = Arc::new(Mutex::new(Vec::new()));
    let final_result = Arc::new(Mutex::new(HashMap::new()));
    
    let mut handles = vec![];
    
    for worker_id in 0..num_workers {
        let map_barrier = Arc::clone(&map_barrier);
        let reduce_barrier = Arc::clone(&reduce_barrier);
        let partial_results = Arc::clone(&partial_results);
        let final_result = Arc::clone(&final_result);
        let chunk = data.chunks((data.len() + num_workers - 1) / num_workers)
                        .nth(worker_id)
                        .unwrap_or(&[])
                        .to_vec();
        
        let handle = thread::spawn(move || {
            // Map phase: count words in chunk
            let mut local_counts = HashMap::new();
            for word in chunk {
                *local_counts.entry(word).or_insert(0) += 1;
            }
            
            partial_results.lock().unwrap().push(local_counts);
            map_barrier.wait();
            
            // Reduce phase: aggregate partial results
            let result = reduce_barrier.wait();
            if result.is_leader() {
                let mut final_counts = HashMap::new();
                for partial in partial_results.lock().unwrap().iter() {
                    for (word, count) in partial {
                        *final_counts.entry(*word).or_insert(0) += count;
                    }
                }
                *final_result.lock().unwrap() = final_counts;
            }
            
            reduce_barrier.wait();
            
            if worker_id == 0 {
                println!("Final counts: {:?}", *final_result.lock().unwrap());
            }
        });
        
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Summary

Barrier Methods:

Method Description
new(n) Create a barrier for n threads
wait() Block until all threads reach barrier
wait().is_leader() Check if this thread is the "leader"

Barrier vs Other Primitives:

Primitive Purpose
Barrier All threads wait for each other (synchronization point)
Condvar Thread waits for signal from another thread
Channel Thread sends/receives data to/from another thread
Barrier Multiple threads synchronize at same point
Once One-time initialization (single use)

Common Patterns:

Pattern Description
Multi-phase computation Parallel algorithms with distinct phases
Bulk-synchronous parallel BSP model implementation
Synchronized startup All threads start at same time
Benchmarking Measure parallel section execution
Map-reduce Parallel map, then reduce phase
Pipeline stages Phased parallel processing

Key Points:

  • Barrier::new(n) creates a barrier for n threads
  • wait() blocks until all n threads have called wait()
  • BarrierWaitResult::is_leader() identifies one thread as leader
  • Barriers are reusable after all threads pass through
  • Use for phased parallel computations
  • Combine with Mutex for shared state access
  • For signaling between threads, use Condvar or channels
  • Thread-safe and efficient for coordination
  • Part of std::sync module