Loading page…
Rust walkthroughs
Loading page…
A Condvar (condition variable) is a synchronization primitive that allows threads to wait for a specific condition to become true. It's used in conjunction with a Mutex to block threads until they're notified by another thread.
Condition variables are located in std::sync::Condvar and provide:
wait() — Release the lock and block until notifiedwait_timeout() — Wait with a timeoutnotify_one() — Wake up one waiting threadnotify_all() — Wake up all waiting threadsThe typical pattern:
Mutex protecting shared statewait() which atomically releases the mutex and blockswait() reacquires the mutex and returnsImportant: Always use a MutexGuard with Condvar — the wait() method requires both.
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main() {
// Condvar is typically paired with a Mutex
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_clone = Arc::clone(&pair);
// Waiting thread
let waiter = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut started = lock.lock().unwrap();
// Wait until started becomes true
// This releases the lock and blocks, reacquires when notified
while !*started {
started = cvar.wait(started).unwrap();
}
println!("Waiter thread proceeding!");
});
// Notifying thread
thread::sleep(std::time::Duration::from_millis(100));
{
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
*started = true;
// Notify the waiting thread
cvar.notify_one();
}
waiter.join().unwrap();
println!("Done!");
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;
struct BoundedQueue<T> {
queue: VecDeque<T>,
capacity: usize,
}
impl<T> BoundedQueue<T> {
fn new(capacity: usize) -> Self {
Self { queue: VecDeque::new(), capacity }
}
fn is_full(&self) -> bool {
self.queue.len() >= self.capacity
}
fn is_empty(&self) -> bool {
self.queue.is_empty()
}
}
fn main() {
let queue = Arc::new((Mutex::new(BoundedQueue::<i32>::new(5)), Condvar::new()));
let producer_queue = Arc::clone(&queue);
let producer = thread::spawn(move || {
let (lock, cvar) = &*producer_queue;
for i in 0..10 {
let mut guard = lock.lock().unwrap();
// Wait while queue is full
while guard.queue.is_full() {
println!("Producer: queue full, waiting...");
guard = cvar.wait(guard).unwrap();
}
guard.queue.push_back(i);
println!("Produced: {}", i);
// Notify consumer that item is available
cvar.notify_one();
}
// Signal completion
let mut guard = lock.lock().unwrap();
guard.queue.push_back(-1); // Sentinel value
cvar.notify_one();
});
let consumer_queue = Arc::clone(&queue);
let consumer = thread::spawn(move || {
let (lock, cvar) = &*consumer_queue;
loop {
let mut guard = lock.lock().unwrap();
// Wait while queue is empty
while guard.queue.is_empty() {
guard = cvar.wait(guard).unwrap();
}
let item = guard.queue.pop_front().unwrap();
if item == -1 {
println!("Consumer: received end signal");
break;
}
println!("Consumed: {}", item);
// Notify producer that space is available
cvar.notify_one();
}
});
producer.join().unwrap();
consumer.join().unwrap();
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_clone = Arc::clone(&pair);
// Thread that will timeout
let waiter = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut guard = lock.lock().unwrap();
let result = cvar.wait_timeout(guard, Duration::from_millis(500)).unwrap();
if result.1.timed_out() {
println!("Wait timed out after 500ms");
} else {
println!("Condition was signaled");
}
// result.0 is the MutexGuard
let guard = result.0;
println!("Condition value: {}", *guard);
});
// Don't signal - let it timeout
waiter.join().unwrap();
// Now test with a signal before timeout
let pair2 = Arc::new((Mutex::new(false), Condvar::new()));
let pair2_clone = Arc::clone(&pair2);
let waiter2 = thread::spawn(move || {
let (lock, cvar) = &*pair2_clone;
let mut guard = lock.lock().unwrap();
let result = cvar.wait_timeout(guard, Duration::from_secs(2)).unwrap();
if result.1.timed_out() {
println!("Wait 2 timed out");
} else {
println!("Wait 2 was signaled before timeout");
}
});
thread::sleep(Duration::from_millis(100));
// Signal before timeout
{
let (lock, cvar) = &*pair2;
let mut guard = lock.lock().unwrap();
*guard = true;
cvar.notify_one();
}
waiter2.join().unwrap();
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(0), Condvar::new()));
let mut handles = vec![];
// Spawn 5 waiting threads
for i in 0..5 {
let pair_clone = Arc::clone(&pair);
let handle = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut value = lock.lock().unwrap();
// Wait until value becomes 1
while *value == 0 {
println!("Thread {} waiting...", i);
value = cvar.wait(value).unwrap();
}
println!("Thread {} proceeding!", i);
});
handles.push(handle);
}
// Wait a bit for all threads to start waiting
thread::sleep(std::time::Duration::from_millis(100));
// Wake ALL waiting threads
{
let (lock, cvar) = &*pair;
let mut value = lock.lock().unwrap();
*value = 1;
println!("Signaling all threads to proceed");
cvar.notify_all();
}
for handle in handles {
handle.join().unwrap();
}
println!("All threads completed");
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
workers: Vec<thread::JoinHandle<()>>,
sender: Option<std::sync::mpsc::Sender<Job>>,
}
impl ThreadPool {
fn new(size: usize) -> Self {
let (sender, receiver) = std::sync::mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let cvar = Arc::new(Condvar::new());
let mut workers = Vec::with_capacity(size);
for id in 0..size {
let receiver = Arc::clone(&receiver);
let cvar = Arc::clone(&cvar);
let handle = thread::spawn(move || {
loop {
let guard = receiver.lock().unwrap();
// Wait for a job to be available
// We use a closure to check if there's work
let result = cvar.wait_while(guard, |r| {
r.try_recv().is_err()
});
match result {
Ok(guard) => {
// Got signaled, try to get a job
if let Ok(job) = guard.try_recv() {
println!("Worker {} executing job", id);
job();
}
}
Err(_) => break, // Channel poisoned
}
}
});
workers.push(handle);
}
Self { workers, sender: Some(sender) }
}
fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
worker.join().unwrap();
}
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Processing task {}", i);
thread::sleep(std::time::Duration::from_millis(100));
});
}
// Pool drops and waits for workers
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main() {
let state = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
let state_clone = Arc::clone(&state);
let waiter = thread::spawn(move || {
let (lock, cvar) = &*state_clone;
let mut data = lock.lock().unwrap();
// wait_while continues waiting while predicate returns true
// Returns when predicate returns false (condition met)
data = cvar.wait_while(data, |v| v.len() < 3).unwrap();
println!("Got 3 items: {:?}", *data);
});
let state_clone2 = Arc::clone(&state);
let producer = thread::spawn(move || {
let (lock, cvar) = &*state_clone2;
for i in 0..5 {
thread::sleep(std::time::Duration::from_millis(200));
let mut data = lock.lock().unwrap();
data.push(i);
println!("Added item {}, total: {}", i, data.len());
cvar.notify_one();
}
});
waiter.join().unwrap();
producer.join().unwrap();
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
struct Semaphore {
count: Mutex<usize>,
cvar: Condvar,
}
impl Semaphore {
fn new(initial: usize) -> Self {
Self {
count: Mutex::new(initial),
cvar: Condvar::new(),
}
}
fn acquire(&self) {
let mut count = self.count.lock().unwrap();
while *count == 0 {
count = self.cvar.wait(count).unwrap();
}
*count -= 1;
}
fn release(&self) {
let mut count = self.count.lock().unwrap();
*count += 1;
self.cvar.notify_one();
}
}
fn main() {
let semaphore = Arc::new(Semaphore::new(2)); // Max 2 concurrent
let mut handles = vec![];
for i in 0..5 {
let sem = Arc::clone(&semaphore);
let handle = thread::spawn(move || {
println!("Thread {} trying to acquire", i);
sem.acquire();
println!("Thread {} acquired, working...", i);
thread::sleep(std::time::Duration::from_millis(500));
println!("Thread {} releasing", i);
sem.release();
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main() {
let state = Arc::new((Mutex::new(false), Condvar::new()));
let state_clone = Arc::clone(&state);
let waiter = thread::spawn(move || {
let (lock, cvar) = &*state_clone;
let mut ready = lock.lock().unwrap();
// ALWAYS use a loop to check the condition!
// Spurious wakeups can occur without a signal
while !*ready {
println!("Waiting for signal...");
ready = cvar.wait(ready).unwrap();
}
println!("Condition satisfied!");
});
thread::sleep(std::time::Duration::from_millis(100));
{
let (lock, cvar) = &*state;
let mut ready = lock.lock().unwrap();
*ready = true;
cvar.notify_one();
}
waiter.join().unwrap();
// Alternative: use wait_while which handles the loop internally
println!("\nUsing wait_while:");
let state2 = Arc::new((Mutex::new(0), Condvar::new()));
let state2_clone = Arc::clone(&state2);
let waiter2 = thread::spawn(move || {
let (lock, cvar) = &*state2_clone;
let guard = lock.lock().unwrap();
// wait_while automatically handles spurious wakeups
let value = cvar.wait_while(guard, |v| *v < 10).unwrap();
println!("Got value: {}", *value);
});
thread::sleep(std::time::Duration::from_millis(100));
{
let (lock, cvar) = &*state2;
let mut value = lock.lock().unwrap();
*value = 10;
cvar.notify_one();
}
waiter2.join().unwrap();
}| Method | Description |
|--------|-------------|
| wait(guard) | Block until notified, must hold MutexGuard |
| wait_timeout(guard, duration) | Wait with timeout, returns (guard, WaitTimeoutResult) |
| wait_while(guard, predicate) | Wait while predicate returns true |
| notify_one() | Wake up one waiting thread |
| notify_all() | Wake up all waiting threads |
Condvar Pattern:
Mutex protecting the conditionnotify_one() for single waiter, notify_all() for multipleComparison:
| Synchronization | Use Case |
|----------------|----------|
| Condvar | Complex conditions, multiple wait conditions |
| Channel | Simple producer-consumer, message passing |
| Barrier | Wait for N threads to reach a point |
| Atomic | Simple flags and counters |
Key Points:
Mutexnotify_all() wakes all waiters; notify_one() wakes onewait_timeout() allows bounded waiting