Loading page…
Rust walkthroughs
Loading page…
Condvar (Condition Variable) is a synchronization primitive that allows threads to wait for a specific condition to become true. It's always used with a Mutex to protect the shared state that the condition depends on. Threads can block on a Condvar and be notified when the condition might have changed.
Key concepts:
Mutex<(bool, T)> patternWhen to use Condvar:
When NOT to use Condvar:
Once or OnceLock)use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_clone = Arc::clone(&pair);
// Waiting thread
let waiter = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut started = lock.lock().unwrap();
// Wait until started becomes true
while !*started {
started = cvar.wait(started).unwrap();
}
println!("Waiter: condition met!");
});
// Signaling thread
thread::sleep(std::time::Duration::from_millis(100));
{
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
}
waiter.join().unwrap();
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(0i32), Condvar::new()));
let pair_clone = Arc::clone(&pair);
let waiter = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut value = lock.lock().unwrap();
// Wait until value >= 10
value = cvar.wait_while(value, |v| *v < 10).unwrap();
println!("Value reached: {}", *value);
});
// Increment value
for i in 1..=15 {
let (lock, cvar) = &*pair;
let mut value = lock.lock().unwrap();
*value = i;
println!("Set value to: {}", i);
cvar.notify_one();
thread::sleep(std::time::Duration::from_millis(50));
}
waiter.join().unwrap();
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::{Duration, Instant};
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_clone = Arc::clone(&pair);
let waiter = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut started = lock.lock().unwrap();
let timeout = Duration::from_secs(2);
let start = Instant::now();
loop {
if *started {
println!("Condition met!");
return;
}
let result = cvar.wait_timeout(started, timeout).unwrap();
started = result.0;
if result.1.timed_out() {
println!("Timeout after {:?}", start.elapsed());
return;
}
}
});
// Don't signal - let it timeout
thread::sleep(Duration::from_millis(500));
waiter.join().unwrap();
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;
struct BoundedBuffer<T> {
queue: Mutex<VecDeque<T>>,
not_empty: Condvar,
not_full: Condvar,
capacity: usize,
}
impl<T> BoundedBuffer<T> {
fn new(capacity: usize) -> Self {
Self {
queue: Mutex::new(VecDeque::with_capacity(capacity)),
not_empty: Condvar::new(),
not_full: Condvar::new(),
capacity,
}
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
// Wait until there's space
while queue.len() == self.capacity {
queue = self.not_full.wait(queue).unwrap();
}
queue.push_back(item);
self.not_empty.notify_one();
}
fn pop(&self) -> T {
let mut queue = self.queue.lock().unwrap();
// Wait until there's an item
while queue.is_empty() {
queue = self.not_empty.wait(queue).unwrap();
}
let item = queue.pop_front().unwrap();
self.not_full.notify_one();
item
}
}
fn main() {
let buffer = Arc::new(BoundedBuffer::new(5));
let producer_buffer = Arc::clone(&buffer);
let producer = thread::spawn(move || {
for i in 0..10 {
producer_buffer.push(i);
println!("Produced: {}", i);
}
});
let consumer_buffer = Arc::clone(&buffer);
let consumer = thread::spawn(move || {
for _ in 0..10 {
let item = consumer_buffer.pop();
println!("Consumed: {}", item);
}
});
producer.join().unwrap();
consumer.join().unwrap();
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
struct ThreadPool {
workers: Mutex<Vec<Option<thread::JoinHandle<()>>>>,
tasks: Mutex<Vec<Box<dyn FnOnce() + Send>>>,
cvar: Condvar,
shutdown: Mutex<bool>,
}
impl ThreadPool {
fn new(size: usize) -> Arc<Self> {
let pool = Arc::new(Self {
workers: Mutex::new(Vec::new()),
tasks: Mutex::new(Vec::new()),
cvar: Condvar::new(),
shutdown: Mutex::new(false),
});
for id in 0..size {
let pool_clone = Arc::clone(&pool);
let handle = thread::spawn(move || {
loop {
let task = {
let mut tasks = pool_clone.tasks.lock().unwrap();
let shutdown = pool_clone.shutdown.lock().unwrap();
while tasks.is_empty() && !*shutdown {
tasks = pool_clone.cvar.wait(tasks).unwrap();
}
if *shutdown && tasks.is_empty() {
return;
}
tasks.pop()
};
if let Some(task) = task {
println!("Worker {} executing task", id);
task();
}
}
});
pool.workers.lock().unwrap().push(Some(handle));
}
pool
}
fn submit<F: FnOnce() + Send + 'static>(&self, task: F) {
let mut tasks = self.tasks.lock().unwrap();
tasks.push(Box::new(task));
self.cvar.notify_one();
}
fn shutdown(&self) {
*self.shutdown.lock().unwrap() = true;
self.cvar.notify_all();
}
}
fn main() {
let pool = ThreadPool::new(3);
for i in 0..5 {
pool.submit(move || {
println!("Task {} running", i);
thread::sleep(std::time::Duration::from_millis(100));
});
}
thread::sleep(std::time::Duration::from_millis(500));
pool.shutdown();
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(0usize), Condvar::new()));
let mut handles = vec![];
// Spawn multiple waiters
for i in 0..5 {
let pair_clone = Arc::clone(&pair);
let handle = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut count = lock.lock().unwrap();
while *count == 0 {
count = cvar.wait(count).unwrap();
}
println!("Waiter {} woke up with count = {}", i, *count);
});
handles.push(handle);
}
thread::sleep(std::time::Duration::from_millis(100));
// notify_one wakes only one thread
{
let (lock, cvar) = &*pair;
let mut count = lock.lock().unwrap();
*count = 1;
println!("Signaling with notify_one...");
cvar.notify_one();
}
thread::sleep(std::time::Duration::from_millis(100));
// notify_all wakes all remaining threads
{
let (lock, cvar) = &*pair;
let mut count = lock.lock().unwrap();
*count = 2;
println!("Signaling with notify_all...");
cvar.notify_all();
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
struct Event {
triggered: Mutex<bool>,
cvar: Condvar,
}
impl Event {
fn new() -> Self {
Self {
triggered: Mutex::new(false),
cvar: Condvar::new(),
}
}
fn wait(&self) {
let mut triggered = self.triggered.lock().unwrap();
while !*triggered {
triggered = self.cvar.wait(triggered).unwrap();
}
}
fn signal(&self) {
let mut triggered = self.triggered.lock().unwrap();
*triggered = true;
self.cvar.notify_all();
}
fn reset(&self) {
let mut triggered = self.triggered.lock().unwrap();
*triggered = false;
}
}
fn main() {
let event = Arc::new(Event::new());
let mut handles = vec![];
for i in 0..3 {
let event = Arc::clone(&event);
handles.push(thread::spawn(move || {
println!("Thread {} waiting for event", i);
event.wait();
println!("Thread {} event triggered!", i);
}));
}
thread::sleep(std::time::Duration::from_millis(500));
println!("Triggering event...");
event.signal();
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_clone = Arc::clone(&pair);
let waiter = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut ready = lock.lock().unwrap();
// ALWAYS check condition in a loop to handle spurious wakeups
while !*ready {
// This atomically releases the mutex and blocks
// When it returns, mutex is re-acquired
ready = cvar.wait(ready).unwrap();
}
println!("Condition was true!");
});
thread::sleep(std::time::Duration::from_millis(100));
{
let (lock, cvar) = &*pair;
let mut ready = lock.lock().unwrap();
*ready = true;
cvar.notify_one();
}
waiter.join().unwrap();
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
struct SharedState {
data: Mutex<(bool, bool)>, // (condition_a, condition_b)
cvar_a: Condvar,
cvar_b: Condvar,
}
impl SharedState {
fn new() -> Self {
Self {
data: Mutex::new((false, false)),
cvar_a: Condvar::new(),
cvar_b: Condvar::new(),
}
}
fn set_a(&self) {
let mut data = self.data.lock().unwrap();
data.0 = true;
self.cvar_a.notify_one();
}
fn set_b(&self) {
let mut data = self.data.lock().unwrap();
data.1 = true;
self.cvar_b.notify_one();
}
fn wait_for_a(&self) {
let mut data = self.data.lock().unwrap();
while !data.0 {
data = self.cvar_a.wait(data).unwrap();
}
}
fn wait_for_b(&self) {
let mut data = self.data.lock().unwrap();
while !data.1 {
data = self.cvar_b.wait(data).unwrap();
}
}
}
fn main() {
let state = Arc::new(SharedState::new());
let state_a = Arc::clone(&state);
let waiter_a = thread::spawn(move || {
println!("Waiting for A");
state_a.wait_for_a();
println!("A is ready!");
});
let state_b = Arc::clone(&state);
let waiter_b = thread::spawn(move || {
println!("Waiting for B");
state_b.wait_for_b();
println!("B is ready!");
});
thread::sleep(std::time::Duration::from_millis(100));
state.set_a();
thread::sleep(std::time::Duration::from_millis(100));
state.set_b();
waiter_a.join().unwrap();
waiter_b.join().unwrap();
}use std::sync::{Arc, Mutex, Condvar};
use std::sync::mpsc;
use std::thread;
fn main() {
// Condvar approach - signal a condition
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_clone = Arc::clone(&pair);
let handle1 = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut ready = lock.lock().unwrap();
while !*ready {
ready = cvar.wait(ready).unwrap();
}
println!("Condvar: condition met");
});
{
let (lock, cvar) = &*pair;
*lock.lock().unwrap() = true;
cvar.notify_one();
}
handle1.join().unwrap();
// Channel approach - send a message
let (tx, rx) = mpsc::channel();
let handle2 = thread::spawn(move || {
rx.recv().unwrap();
println!("Channel: message received");
});
tx.send(()).unwrap();
handle2.join().unwrap();
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
struct Semaphore {
count: Mutex<usize>,
cvar: Condvar,
}
impl Semaphore {
fn new(count: usize) -> Self {
Self {
count: Mutex::new(count),
cvar: Condvar::new(),
}
}
fn acquire(&self) {
let mut count = self.count.lock().unwrap();
while *count == 0 {
count = self.cvar.wait(count).unwrap();
}
*count -= 1;
}
fn release(&self) {
let mut count = self.count.lock().unwrap();
*count += 1;
self.cvar.notify_one();
}
}
fn main() {
let sem = Arc::new(Semaphore::new(2)); // Max 2 concurrent
let mut handles = vec![];
for i in 0..5 {
let sem = Arc::clone(&sem);
handles.push(thread::spawn(move || {
sem.acquire();
println!("Thread {} acquired semaphore", i);
thread::sleep(std::time::Duration::from_millis(100));
println!("Thread {} releasing semaphore", i);
sem.release();
}));
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::{Arc, Mutex, Condvar};
use std::thread;
struct CountdownLatch {
count: Mutex<usize>,
cvar: Condvar,
}
impl CountdownLatch {
fn new(count: usize) -> Self {
Self {
count: Mutex::new(count),
cvar: Condvar::new(),
}
}
fn count_down(&self) {
let mut count = self.count.lock().unwrap();
*count = count.saturating_sub(1);
if *count == 0 {
self.cvar.notify_all();
}
}
fn wait(&self) {
let mut count = self.count.lock().unwrap();
while *count > 0 {
count = self.cvar.wait(count).unwrap();
}
}
}
fn main() {
let latch = Arc::new(CountdownLatch::new(3));
let mut handles = vec![];
for i in 0..3 {
let latch = Arc::clone(&latch);
handles.push(thread::spawn(move || {
println!("Worker {} started", i);
thread::sleep(std::time::Duration::from_millis(100 * (i + 1)));
println!("Worker {} done", i);
latch.count_down();
}));
}
println!("Main thread waiting...");
latch.wait();
println!("All workers complete!");
for handle in handles {
handle.join().unwrap();
}
}Condvar Methods:
| Method | Description |
|--------|-------------|
| new() | Create a new Condvar |
| wait(guard) | Block until notified, re-acquire mutex on wake |
| wait_while(guard, predicate) | Wait while predicate is true |
| wait_timeout(guard, dur) | Wait with timeout |
| notify_one() | Wake one waiting thread |
| notify_all() | Wake all waiting threads |
Condvar vs Alternatives:
| Primitive | Use Case |
|-----------|----------|
| Condvar | Wait for condition changes, broadcast to multiple waiters |
| Channel | Send data between threads, simpler for most cases |
| Barrier | Wait for multiple threads to reach a point |
| Once | One-time initialization |
| Atomic | Lock-free simple state |
Common Patterns:
| Pattern | Description | |---------|-------------| | Signal/Wait | Basic notification between threads | | Bounded Buffer | Producer-consumer with fixed capacity | | Thread Pool | Workers wait for tasks | | Event | Broadcast event to multiple listeners | | Semaphore | Limit concurrent access | | Countdown Latch | Wait for N tasks to complete |
Key Points:
wait() atomically releases mutex and blocksnotify_one() wakes one waiter, notify_all() wakes allwait_timeout() to avoid blocking foreverwait_while() is cleaner than manual loopstd::sync module