How do I work with Condvar for Thread Synchronization in Rust?

Walkthrough

Condvar (Condition Variable) is a synchronization primitive that allows threads to wait for a specific condition to become true. It's always used with a Mutex to protect the shared state that the condition depends on. Threads can block on a Condvar and be notified when the condition might have changed.

Key concepts:

  • Wait — Thread blocks until notified, atomically releases mutex
  • Notify one — Wake up one waiting thread
  • Notify all — Wake up all waiting threads
  • Spurious wakeups — Can happen; always recheck condition in a loop
  • Mutex pairing — Always used with a Mutex<(bool, T)> pattern

When to use Condvar:

  • Producer-consumer with bounded buffer
  • Thread pools waiting for work
  • Event signaling between threads
  • Implementing higher-level synchronization primitives
  • When threads need to wait for state changes

When NOT to use Condvar:

  • Simple signaling (use channels instead)
  • One-time events (use Once or OnceLock)
  • When channels provide a cleaner abstraction

Code Examples

Basic Condvar Usage

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair_clone = Arc::clone(&pair);
    
    // Waiting thread
    let waiter = thread::spawn(move || {
        let (lock, cvar) = &*pair_clone;
        let mut started = lock.lock().unwrap();
        
        // Wait until started becomes true
        while !*started {
            started = cvar.wait(started).unwrap();
        }
        
        println!("Waiter: condition met!");
    });
    
    // Signaling thread
    thread::sleep(std::time::Duration::from_millis(100));
    
    {
        let (lock, cvar) = &*pair;
        let mut started = lock.lock().unwrap();
        *started = true;
        cvar.notify_one();
    }
    
    waiter.join().unwrap();
}

Condvar with wait_while

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
fn main() {
    let pair = Arc::new((Mutex::new(0i32), Condvar::new()));
    
    let pair_clone = Arc::clone(&pair);
    let waiter = thread::spawn(move || {
        let (lock, cvar) = &*pair_clone;
        let mut value = lock.lock().unwrap();
        
        // Wait until value >= 10
        value = cvar.wait_while(value, |v| *v < 10).unwrap();
        
        println!("Value reached: {}", *value);
    });
    
    // Increment value
    for i in 1..=15 {
        let (lock, cvar) = &*pair;
        let mut value = lock.lock().unwrap();
        *value = i;
        println!("Set value to: {}", i);
        cvar.notify_one();
        thread::sleep(std::time::Duration::from_millis(50));
    }
    
    waiter.join().unwrap();
}

Condvar with wait_timeout

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::{Duration, Instant};
 
fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair_clone = Arc::clone(&pair);
    
    let waiter = thread::spawn(move || {
        let (lock, cvar) = &*pair_clone;
        let mut started = lock.lock().unwrap();
        
        let timeout = Duration::from_secs(2);
        let start = Instant::now();
        
        loop {
            if *started {
                println!("Condition met!");
                return;
            }
            
            let result = cvar.wait_timeout(started, timeout).unwrap();
            started = result.0;
            
            if result.1.timed_out() {
                println!("Timeout after {:?}", start.elapsed());
                return;
            }
        }
    });
    
    // Don't signal - let it timeout
    thread::sleep(Duration::from_millis(500));
    
    waiter.join().unwrap();
}

Producer-Consumer with Bounded Buffer

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;
 
struct BoundedBuffer<T> {
    queue: Mutex<VecDeque<T>>,
    not_empty: Condvar,
    not_full: Condvar,
    capacity: usize,
}
 
impl<T> BoundedBuffer<T> {
    fn new(capacity: usize) -> Self {
        Self {
            queue: Mutex::new(VecDeque::with_capacity(capacity)),
            not_empty: Condvar::new(),
            not_full: Condvar::new(),
            capacity,
        }
    }
    
    fn push(&self, item: T) {
        let mut queue = self.queue.lock().unwrap();
        
        // Wait until there's space
        while queue.len() == self.capacity {
            queue = self.not_full.wait(queue).unwrap();
        }
        
        queue.push_back(item);
        self.not_empty.notify_one();
    }
    
    fn pop(&self) -> T {
        let mut queue = self.queue.lock().unwrap();
        
        // Wait until there's an item
        while queue.is_empty() {
            queue = self.not_empty.wait(queue).unwrap();
        }
        
        let item = queue.pop_front().unwrap();
        self.not_full.notify_one();
        item
    }
}
 
fn main() {
    let buffer = Arc::new(BoundedBuffer::new(5));
    
    let producer_buffer = Arc::clone(&buffer);
    let producer = thread::spawn(move || {
        for i in 0..10 {
            producer_buffer.push(i);
            println!("Produced: {}", i);
        }
    });
    
    let consumer_buffer = Arc::clone(&buffer);
    let consumer = thread::spawn(move || {
        for _ in 0..10 {
            let item = consumer_buffer.pop();
            println!("Consumed: {}", item);
        }
    });
    
    producer.join().unwrap();
    consumer.join().unwrap();
}

Thread Pool with Condvar

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
struct ThreadPool {
    workers: Mutex<Vec<Option<thread::JoinHandle<()>>>>,
    tasks: Mutex<Vec<Box<dyn FnOnce() + Send>>>,
    cvar: Condvar,
    shutdown: Mutex<bool>,
}
 
impl ThreadPool {
    fn new(size: usize) -> Arc<Self> {
        let pool = Arc::new(Self {
            workers: Mutex::new(Vec::new()),
            tasks: Mutex::new(Vec::new()),
            cvar: Condvar::new(),
            shutdown: Mutex::new(false),
        });
        
        for id in 0..size {
            let pool_clone = Arc::clone(&pool);
            let handle = thread::spawn(move || {
                loop {
                    let task = {
                        let mut tasks = pool_clone.tasks.lock().unwrap();
                        let shutdown = pool_clone.shutdown.lock().unwrap();
                        
                        while tasks.is_empty() && !*shutdown {
                            tasks = pool_clone.cvar.wait(tasks).unwrap();
                        }
                        
                        if *shutdown && tasks.is_empty() {
                            return;
                        }
                        
                        tasks.pop()
                    };
                    
                    if let Some(task) = task {
                        println!("Worker {} executing task", id);
                        task();
                    }
                }
            });
            
            pool.workers.lock().unwrap().push(Some(handle));
        }
        
        pool
    }
    
    fn submit<F: FnOnce() + Send + 'static>(&self, task: F) {
        let mut tasks = self.tasks.lock().unwrap();
        tasks.push(Box::new(task));
        self.cvar.notify_one();
    }
    
    fn shutdown(&self) {
        *self.shutdown.lock().unwrap() = true;
        self.cvar.notify_all();
    }
}
 
fn main() {
    let pool = ThreadPool::new(3);
    
    for i in 0..5 {
        pool.submit(move || {
            println!("Task {} running", i);
            thread::sleep(std::time::Duration::from_millis(100));
        });
    }
    
    thread::sleep(std::time::Duration::from_millis(500));
    pool.shutdown();
}

notify_one vs notify_all

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
fn main() {
    let pair = Arc::new((Mutex::new(0usize), Condvar::new()));
    let mut handles = vec![];
    
    // Spawn multiple waiters
    for i in 0..5 {
        let pair_clone = Arc::clone(&pair);
        let handle = thread::spawn(move || {
            let (lock, cvar) = &*pair_clone;
            let mut count = lock.lock().unwrap();
            
            while *count == 0 {
                count = cvar.wait(count).unwrap();
            }
            
            println!("Waiter {} woke up with count = {}", i, *count);
        });
        handles.push(handle);
    }
    
    thread::sleep(std::time::Duration::from_millis(100));
    
    // notify_one wakes only one thread
    {
        let (lock, cvar) = &*pair;
        let mut count = lock.lock().unwrap();
        *count = 1;
        println!("Signaling with notify_one...");
        cvar.notify_one();
    }
    
    thread::sleep(std::time::Duration::from_millis(100));
    
    // notify_all wakes all remaining threads
    {
        let (lock, cvar) = &*pair;
        let mut count = lock.lock().unwrap();
        *count = 2;
        println!("Signaling with notify_all...");
        cvar.notify_all();
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Condvar for Event Signaling

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
struct Event {
    triggered: Mutex<bool>,
    cvar: Condvar,
}
 
impl Event {
    fn new() -> Self {
        Self {
            triggered: Mutex::new(false),
            cvar: Condvar::new(),
        }
    }
    
    fn wait(&self) {
        let mut triggered = self.triggered.lock().unwrap();
        while !*triggered {
            triggered = self.cvar.wait(triggered).unwrap();
        }
    }
    
    fn signal(&self) {
        let mut triggered = self.triggered.lock().unwrap();
        *triggered = true;
        self.cvar.notify_all();
    }
    
    fn reset(&self) {
        let mut triggered = self.triggered.lock().unwrap();
        *triggered = false;
    }
}
 
fn main() {
    let event = Arc::new(Event::new());
    
    let mut handles = vec![];
    
    for i in 0..3 {
        let event = Arc::clone(&event);
        handles.push(thread::spawn(move || {
            println!("Thread {} waiting for event", i);
            event.wait();
            println!("Thread {} event triggered!", i);
        }));
    }
    
    thread::sleep(std::time::Duration::from_millis(500));
    println!("Triggering event...");
    event.signal();
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Spurious Wakeup Handling

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    
    let pair_clone = Arc::clone(&pair);
    let waiter = thread::spawn(move || {
        let (lock, cvar) = &*pair_clone;
        let mut ready = lock.lock().unwrap();
        
        // ALWAYS check condition in a loop to handle spurious wakeups
        while !*ready {
            // This atomically releases the mutex and blocks
            // When it returns, mutex is re-acquired
            ready = cvar.wait(ready).unwrap();
        }
        
        println!("Condition was true!");
    });
    
    thread::sleep(std::time::Duration::from_millis(100));
    
    {
        let (lock, cvar) = &*pair;
        let mut ready = lock.lock().unwrap();
        *ready = true;
        cvar.notify_one();
    }
    
    waiter.join().unwrap();
}

Multiple Conditions with One Mutex

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
struct SharedState {
    data: Mutex<(bool, bool)>,  // (condition_a, condition_b)
    cvar_a: Condvar,
    cvar_b: Condvar,
}
 
impl SharedState {
    fn new() -> Self {
        Self {
            data: Mutex::new((false, false)),
            cvar_a: Condvar::new(),
            cvar_b: Condvar::new(),
        }
    }
    
    fn set_a(&self) {
        let mut data = self.data.lock().unwrap();
        data.0 = true;
        self.cvar_a.notify_one();
    }
    
    fn set_b(&self) {
        let mut data = self.data.lock().unwrap();
        data.1 = true;
        self.cvar_b.notify_one();
    }
    
    fn wait_for_a(&self) {
        let mut data = self.data.lock().unwrap();
        while !data.0 {
            data = self.cvar_a.wait(data).unwrap();
        }
    }
    
    fn wait_for_b(&self) {
        let mut data = self.data.lock().unwrap();
        while !data.1 {
            data = self.cvar_b.wait(data).unwrap();
        }
    }
}
 
fn main() {
    let state = Arc::new(SharedState::new());
    
    let state_a = Arc::clone(&state);
    let waiter_a = thread::spawn(move || {
        println!("Waiting for A");
        state_a.wait_for_a();
        println!("A is ready!");
    });
    
    let state_b = Arc::clone(&state);
    let waiter_b = thread::spawn(move || {
        println!("Waiting for B");
        state_b.wait_for_b();
        println!("B is ready!");
    });
    
    thread::sleep(std::time::Duration::from_millis(100));
    state.set_a();
    
    thread::sleep(std::time::Duration::from_millis(100));
    state.set_b();
    
    waiter_a.join().unwrap();
    waiter_b.join().unwrap();
}

Condvar vs Channel Comparison

use std::sync::{Arc, Mutex, Condvar};
use std::sync::mpsc;
use std::thread;
 
fn main() {
    // Condvar approach - signal a condition
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair_clone = Arc::clone(&pair);
    
    let handle1 = thread::spawn(move || {
        let (lock, cvar) = &*pair_clone;
        let mut ready = lock.lock().unwrap();
        while !*ready {
            ready = cvar.wait(ready).unwrap();
        }
        println!("Condvar: condition met");
    });
    
    {
        let (lock, cvar) = &*pair;
        *lock.lock().unwrap() = true;
        cvar.notify_one();
    }
    
    handle1.join().unwrap();
    
    // Channel approach - send a message
    let (tx, rx) = mpsc::channel();
    
    let handle2 = thread::spawn(move || {
        rx.recv().unwrap();
        println!("Channel: message received");
    });
    
    tx.send(()).unwrap();
    
    handle2.join().unwrap();
}

Semaphore-like Pattern with Condvar

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
struct Semaphore {
    count: Mutex<usize>,
    cvar: Condvar,
}
 
impl Semaphore {
    fn new(count: usize) -> Self {
        Self {
            count: Mutex::new(count),
            cvar: Condvar::new(),
        }
    }
    
    fn acquire(&self) {
        let mut count = self.count.lock().unwrap();
        while *count == 0 {
            count = self.cvar.wait(count).unwrap();
        }
        *count -= 1;
    }
    
    fn release(&self) {
        let mut count = self.count.lock().unwrap();
        *count += 1;
        self.cvar.notify_one();
    }
}
 
fn main() {
    let sem = Arc::new(Semaphore::new(2));  // Max 2 concurrent
    
    let mut handles = vec![];
    
    for i in 0..5 {
        let sem = Arc::clone(&sem);
        handles.push(thread::spawn(move || {
            sem.acquire();
            println!("Thread {} acquired semaphore", i);
            thread::sleep(std::time::Duration::from_millis(100));
            println!("Thread {} releasing semaphore", i);
            sem.release();
        }));
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Countdown Latch Pattern

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
 
struct CountdownLatch {
    count: Mutex<usize>,
    cvar: Condvar,
}
 
impl CountdownLatch {
    fn new(count: usize) -> Self {
        Self {
            count: Mutex::new(count),
            cvar: Condvar::new(),
        }
    }
    
    fn count_down(&self) {
        let mut count = self.count.lock().unwrap();
        *count = count.saturating_sub(1);
        if *count == 0 {
            self.cvar.notify_all();
        }
    }
    
    fn wait(&self) {
        let mut count = self.count.lock().unwrap();
        while *count > 0 {
            count = self.cvar.wait(count).unwrap();
        }
    }
}
 
fn main() {
    let latch = Arc::new(CountdownLatch::new(3));
    
    let mut handles = vec![];
    
    for i in 0..3 {
        let latch = Arc::clone(&latch);
        handles.push(thread::spawn(move || {
            println!("Worker {} started", i);
            thread::sleep(std::time::Duration::from_millis(100 * (i + 1)));
            println!("Worker {} done", i);
            latch.count_down();
        }));
    }
    
    println!("Main thread waiting...");
    latch.wait();
    println!("All workers complete!");
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Summary

Condvar Methods:

Method Description
new() Create a new Condvar
wait(guard) Block until notified, re-acquire mutex on wake
wait_while(guard, predicate) Wait while predicate is true
wait_timeout(guard, dur) Wait with timeout
notify_one() Wake one waiting thread
notify_all() Wake all waiting threads

Condvar vs Alternatives:

Primitive Use Case
Condvar Wait for condition changes, broadcast to multiple waiters
Channel Send data between threads, simpler for most cases
Barrier Wait for multiple threads to reach a point
Once One-time initialization
Atomic Lock-free simple state

Common Patterns:

Pattern Description
Signal/Wait Basic notification between threads
Bounded Buffer Producer-consumer with fixed capacity
Thread Pool Workers wait for tasks
Event Broadcast event to multiple listeners
Semaphore Limit concurrent access
Countdown Latch Wait for N tasks to complete

Key Points:

  • Condvar is always used with a Mutex
  • Always check condition in a loop (spurious wakeups)
  • wait() atomically releases mutex and blocks
  • On wake, mutex is re-acquired before returning
  • notify_one() wakes one waiter, notify_all() wakes all
  • Use wait_timeout() to avoid blocking forever
  • wait_while() is cleaner than manual loop
  • Consider channels for simpler message-passing use cases
  • Thread-safe for multi-threaded signaling
  • Part of std::sync module