How do I work with Condvar for thread synchronization in Rust?

Walkthrough

A Condvar (condition variable) is a synchronization primitive that allows threads to wait for a specific condition to become true. It's used in conjunction with a Mutex to block threads until they're notified by another thread.

Condition variables are located in std::sync::Condvar and provide:

  • wait() — Release the lock and block until notified
  • wait_timeout() — Wait with a timeout
  • notify_one() — Wake up one waiting thread
  • notify_all() — Wake up all waiting threads

The typical pattern:

  1. Acquire a Mutex protecting shared state
  2. Check the condition — if not met, call wait() which atomically releases the mutex and blocks
  3. When notified, wait() reacquires the mutex and returns
  4. Re-check the condition (may be spurious wakeup)
  5. Continue when condition is satisfied

Important: Always use a MutexGuard with Condvar — the wait() method requires both.

Code Examples

Basic Condvar Usage

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
fn main() {
    // Condvar is typically paired with a Mutex
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair_clone = Arc::clone(&pair);
    
    // Waiting thread
    let waiter = thread::spawn(move || {
        let (lock, cvar) = &*pair_clone;
        let mut started = lock.lock().unwrap();
        
        // Wait until started becomes true
        // This releases the lock and blocks, reacquires when notified
        while !*started {
            started = cvar.wait(started).unwrap();
        }
        
        println!("Waiter thread proceeding!");
    });
    
    // Notifying thread
    thread::sleep(std::time::Duration::from_millis(100));
    
    {
        let (lock, cvar) = &*pair;
        let mut started = lock.lock().unwrap();
        *started = true;
        // Notify the waiting thread
        cvar.notify_one();
    }
    
    waiter.join().unwrap();
    println!("Done!");
}

Producer-Consumer Pattern

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;
 
struct BoundedQueue<T> {
    queue: VecDeque<T>,
    capacity: usize,
}
 
impl<T> BoundedQueue<T> {
    fn new(capacity: usize) -> Self {
        Self { queue: VecDeque::new(), capacity }
    }
    
    fn is_full(&self) -> bool {
        self.queue.len() >= self.capacity
    }
    
    fn is_empty(&self) -> bool {
        self.queue.is_empty()
    }
}
 
fn main() {
    let queue = Arc::new((Mutex::new(BoundedQueue::<i32>::new(5)), Condvar::new()));
    
    let producer_queue = Arc::clone(&queue);
    let producer = thread::spawn(move || {
        let (lock, cvar) = &*producer_queue;
        
        for i in 0..10 {
            let mut guard = lock.lock().unwrap();
            
            // Wait while queue is full
            while guard.queue.is_full() {
                println!("Producer: queue full, waiting...");
                guard = cvar.wait(guard).unwrap();
            }
            
            guard.queue.push_back(i);
            println!("Produced: {}", i);
            
            // Notify consumer that item is available
            cvar.notify_one();
        }
        
        // Signal completion
        let mut guard = lock.lock().unwrap();
        guard.queue.push_back(-1); // Sentinel value
        cvar.notify_one();
    });
    
    let consumer_queue = Arc::clone(&queue);
    let consumer = thread::spawn(move || {
        let (lock, cvar) = &*consumer_queue;
        
        loop {
            let mut guard = lock.lock().unwrap();
            
            // Wait while queue is empty
            while guard.queue.is_empty() {
                guard = cvar.wait(guard).unwrap();
            }
            
            let item = guard.queue.pop_front().unwrap();
            
            if item == -1 {
                println!("Consumer: received end signal");
                break;
            }
            
            println!("Consumed: {}", item);
            
            // Notify producer that space is available
            cvar.notify_one();
        }
    });
    
    producer.join().unwrap();
    consumer.join().unwrap();
}

Wait with Timeout

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;
 
fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair_clone = Arc::clone(&pair);
    
    // Thread that will timeout
    let waiter = thread::spawn(move || {
        let (lock, cvar) = &*pair_clone;
        let mut guard = lock.lock().unwrap();
        
        let result = cvar.wait_timeout(guard, Duration::from_millis(500)).unwrap();
        
        if result.1.timed_out() {
            println!("Wait timed out after 500ms");
        } else {
            println!("Condition was signaled");
        }
        
        // result.0 is the MutexGuard
        let guard = result.0;
        println!("Condition value: {}", *guard);
    });
    
    // Don't signal - let it timeout
    waiter.join().unwrap();
    
    // Now test with a signal before timeout
    let pair2 = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2_clone = Arc::clone(&pair2);
    
    let waiter2 = thread::spawn(move || {
        let (lock, cvar) = &*pair2_clone;
        let mut guard = lock.lock().unwrap();
        
        let result = cvar.wait_timeout(guard, Duration::from_secs(2)).unwrap();
        
        if result.1.timed_out() {
            println!("Wait 2 timed out");
        } else {
            println!("Wait 2 was signaled before timeout");
        }
    });
    
    thread::sleep(Duration::from_millis(100));
    
    // Signal before timeout
    {
        let (lock, cvar) = &*pair2;
        let mut guard = lock.lock().unwrap();
        *guard = true;
        cvar.notify_one();
    }
    
    waiter2.join().unwrap();
}

Multiple Waiters with notify_all

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
fn main() {
    let pair = Arc::new((Mutex::new(0), Condvar::new()));
    let mut handles = vec![];
    
    // Spawn 5 waiting threads
    for i in 0..5 {
        let pair_clone = Arc::clone(&pair);
        let handle = thread::spawn(move || {
            let (lock, cvar) = &*pair_clone;
            let mut value = lock.lock().unwrap();
            
            // Wait until value becomes 1
            while *value == 0 {
                println!("Thread {} waiting...", i);
                value = cvar.wait(value).unwrap();
            }
            
            println!("Thread {} proceeding!", i);
        });
        handles.push(handle);
    }
    
    // Wait a bit for all threads to start waiting
    thread::sleep(std::time::Duration::from_millis(100));
    
    // Wake ALL waiting threads
    {
        let (lock, cvar) = &*pair;
        let mut value = lock.lock().unwrap();
        *value = 1;
        println!("Signaling all threads to proceed");
        cvar.notify_all();
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
    
    println!("All threads completed");
}

Thread Pool with Condvar

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
type Job = Box<dyn FnOnce() + Send + 'static>;
 
struct ThreadPool {
    workers: Vec<thread::JoinHandle<()>>,
    sender: Option<std::sync::mpsc::Sender<Job>>,
}
 
impl ThreadPool {
    fn new(size: usize) -> Self {
        let (sender, receiver) = std::sync::mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let cvar = Arc::new(Condvar::new());
        
        let mut workers = Vec::with_capacity(size);
        
        for id in 0..size {
            let receiver = Arc::clone(&receiver);
            let cvar = Arc::clone(&cvar);
            
            let handle = thread::spawn(move || {
                loop {
                    let guard = receiver.lock().unwrap();
                    
                    // Wait for a job to be available
                    // We use a closure to check if there's work
                    let result = cvar.wait_while(guard, |r| {
                        r.try_recv().is_err()
                    });
                    
                    match result {
                        Ok(guard) => {
                            // Got signaled, try to get a job
                            if let Ok(job) = guard.try_recv() {
                                println!("Worker {} executing job", id);
                                job();
                            }
                        }
                        Err(_) => break, // Channel poisoned
                    }
                }
            });
            
            workers.push(handle);
        }
        
        Self { workers, sender: Some(sender) }
    }
    
    fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
        let job = Box::new(f);
        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}
 
impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());
        for worker in self.workers.drain(..) {
            worker.join().unwrap();
        }
    }
}
 
fn main() {
    let pool = ThreadPool::new(4);
    
    for i in 0..8 {
        pool.execute(move || {
            println!("Processing task {}", i);
            thread::sleep(std::time::Duration::from_millis(100));
        });
    }
    
    // Pool drops and waits for workers
}

Wait While Predicate

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
fn main() {
    let state = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
    
    let state_clone = Arc::clone(&state);
    let waiter = thread::spawn(move || {
        let (lock, cvar) = &*state_clone;
        let mut data = lock.lock().unwrap();
        
        // wait_while continues waiting while predicate returns true
        // Returns when predicate returns false (condition met)
        data = cvar.wait_while(data, |v| v.len() < 3).unwrap();
        
        println!("Got 3 items: {:?}", *data);
    });
    
    let state_clone2 = Arc::clone(&state);
    let producer = thread::spawn(move || {
        let (lock, cvar) = &*state_clone2;
        
        for i in 0..5 {
            thread::sleep(std::time::Duration::from_millis(200));
            let mut data = lock.lock().unwrap();
            data.push(i);
            println!("Added item {}, total: {}", i, data.len());
            cvar.notify_one();
        }
    });
    
    waiter.join().unwrap();
    producer.join().unwrap();
}

Implementing a Semaphore

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
struct Semaphore {
    count: Mutex<usize>,
    cvar: Condvar,
}
 
impl Semaphore {
    fn new(initial: usize) -> Self {
        Self {
            count: Mutex::new(initial),
            cvar: Condvar::new(),
        }
    }
    
    fn acquire(&self) {
        let mut count = self.count.lock().unwrap();
        while *count == 0 {
            count = self.cvar.wait(count).unwrap();
        }
        *count -= 1;
    }
    
    fn release(&self) {
        let mut count = self.count.lock().unwrap();
        *count += 1;
        self.cvar.notify_one();
    }
}
 
fn main() {
    let semaphore = Arc::new(Semaphore::new(2)); // Max 2 concurrent
    let mut handles = vec![];
    
    for i in 0..5 {
        let sem = Arc::clone(&semaphore);
        let handle = thread::spawn(move || {
            println!("Thread {} trying to acquire", i);
            sem.acquire();
            println!("Thread {} acquired, working...", i);
            
            thread::sleep(std::time::Duration::from_millis(500));
            
            println!("Thread {} releasing", i);
            sem.release();
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Avoiding Spurious Wakeups

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
fn main() {
    let state = Arc::new((Mutex::new(false), Condvar::new()));
    let state_clone = Arc::clone(&state);
    
    let waiter = thread::spawn(move || {
        let (lock, cvar) = &*state_clone;
        let mut ready = lock.lock().unwrap();
        
        // ALWAYS use a loop to check the condition!
        // Spurious wakeups can occur without a signal
        while !*ready {
            println!("Waiting for signal...");
            ready = cvar.wait(ready).unwrap();
        }
        
        println!("Condition satisfied!");
    });
    
    thread::sleep(std::time::Duration::from_millis(100));
    
    {
        let (lock, cvar) = &*state;
        let mut ready = lock.lock().unwrap();
        *ready = true;
        cvar.notify_one();
    }
    
    waiter.join().unwrap();
    
    // Alternative: use wait_while which handles the loop internally
    println!("\nUsing wait_while:");
    
    let state2 = Arc::new((Mutex::new(0), Condvar::new()));
    let state2_clone = Arc::clone(&state2);
    
    let waiter2 = thread::spawn(move || {
        let (lock, cvar) = &*state2_clone;
        let guard = lock.lock().unwrap();
        
        // wait_while automatically handles spurious wakeups
        let value = cvar.wait_while(guard, |v| *v < 10).unwrap();
        println!("Got value: {}", *value);
    });
    
    thread::sleep(std::time::Duration::from_millis(100));
    
    {
        let (lock, cvar) = &*state2;
        let mut value = lock.lock().unwrap();
        *value = 10;
        cvar.notify_one();
    }
    
    waiter2.join().unwrap();
}

Summary

Method Description
wait(guard) Block until notified, must hold MutexGuard
wait_timeout(guard, duration) Wait with timeout, returns (guard, WaitTimeoutResult)
wait_while(guard, predicate) Wait while predicate returns true
notify_one() Wake up one waiting thread
notify_all() Wake up all waiting threads

Condvar Pattern:

  1. Always pair with a Mutex protecting the condition
  2. Always check condition in a loop (spurious wakeups)
  3. Use notify_one() for single waiter, notify_all() for multiple
  4. Consider channels for simpler producer-consumer scenarios

Comparison:

Synchronization Use Case
Condvar Complex conditions, multiple wait conditions
Channel Simple producer-consumer, message passing
Barrier Wait for N threads to reach a point
Atomic Simple flags and counters

Key Points:

  • Condvar enables threads to wait for arbitrary conditions
  • Always protect shared state with a Mutex
  • Use loops to recheck conditions after wakeup
  • notify_all() wakes all waiters; notify_one() wakes one
  • wait_timeout() allows bounded waiting