Loading page…
Rust walkthroughs
Loading page…
parking_lot::Condvar for thread synchronization with condition variables?parking_lot::Condvar is a condition variable implementation that coordinates thread execution based on shared state, allowing threads to wait for a condition to become true while releasing an associated lock, then reacquiring it when signaled. Condition variables solve the problem of efficient waiting for state changes: without them, threads would need to poll (spin) or sleep with timeouts, both wasteful approaches. parking_lot::Condvar integrates with parking_lot::Mutex and parking_lot::MutexGuard, providing wait, wait_while, notify_one, and notify_all methods. The parking_lot implementation offers advantages over std::sync::Condvar including smaller memory footprint, faster operations in uncontended cases, and the ability to use wait_while for convenient predicate-based waiting without manually writing the wait loop pattern.
use parking_lot::{Mutex, Condvar};
use std::sync::Arc;
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
// Spawn a thread that will signal the condition
thread::spawn(move || {
let (lock, cvar) = &*pair2;
thread::sleep(std::time::Duration::from_millis(100));
// Acquire the lock and change state
let mut started = lock.lock();
*started = true;
// Notify waiting thread
cvar.notify_one();
});
// Wait for the condition
let (lock, cvar) = &*pair;
let mut started = lock.lock();
// wait_while automatically handles the loop and spurious wakeups
cvar.wait_while(&mut started, |started| !*started);
println!("Condition was signaled! started = {}", *started);
}Condition variables allow threads to wait efficiently until a condition is met.
use parking_lot::{Mutex, Condvar};
use std::sync::Arc;
use std::thread;
fn main() {
// wait_while: convenient predicate-based waiting
let pair = Arc::new((Mutex::new(0usize), Condvar::new()));
let pair_clone = Arc::clone(&pair);
thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
thread::sleep(std::time::Duration::from_millis(50));
let mut counter = lock.lock();
*counter = 10;
cvar.notify_one();
});
// Using wait_while - cleaner, handles predicate automatically
{
let (lock, cvar) = &*pair;
let mut counter = lock.lock();
cvar.wait_while(&mut counter, |counter| *counter < 10);
println!("wait_while: counter = {}", *counter);
}
// Equivalent manual loop pattern
let pair2 = Arc::new((Mutex::new(0usize), Condvar::new()));
let pair2_clone = Arc::clone(&pair2);
thread::spawn(move || {
let (lock, cvar) = &*pair2_clone;
thread::sleep(std::time::Duration::from_millis(50));
let mut counter = lock.lock();
*counter = 10;
cvar.notify_one();
});
// Manual wait loop with spurious wakeup handling
{
let (lock, cvar) = &*pair2;
let mut counter = lock.lock();
// Must check condition in a loop due to spurious wakeups
while *counter < 10 {
cvar.wait(&mut counter);
}
println!("manual loop: counter = {}", *counter);
}
}wait_while handles the predicate check loop automatically, including spurious wakeups.
use parking_lot::{Mutex, Condvar};
use std::collections::VecDeque;
use std::sync::Arc;
use std::thread;
fn main() {
let queue = Arc::new((Mutex::new(VecDeque::new()), Condvar::new()));
let producer_queue = Arc::clone(&queue);
let producer = thread::spawn(move || {
let (lock, cvar) = &*producer_queue;
for i in 0..5 {
thread::sleep(std::time::Duration::from_millis(100));
let mut q = lock.lock();
q.push_back(i);
println!("Produced: {}", i);
// Signal consumer that data is available
cvar.notify_one();
}
});
let consumer_queue = Arc::clone(&queue);
let consumer = thread::spawn(move || {
let (lock, cvar) = &*consumer_queue;
for _ in 0..5 {
let mut q = lock.lock();
// Wait until queue has data
cvar.wait_while(&mut q, |q| q.is_empty());
if let Some(item) = q.pop_front() {
println!("Consumed: {}", item);
}
}
});
producer.join().unwrap();
consumer.join().unwrap();
}Condition variables enable efficient producer-consumer coordination.
use parking_lot::{Mutex, Condvar};
use std::sync::Arc;
use std::thread;
fn main() {
// notify_one: wakes a single waiting thread
// Use when only one thread can make progress
let state = Arc::new((Mutex::new(false), Condvar::new()));
let mut handles = vec![];
for i in 0..3 {
let state_clone = Arc::clone(&state);
let handle = thread::spawn(move || {
let (lock, cvar) = &*state_clone;
let mut ready = lock.lock();
cvar.wait_while(&mut ready, |ready| !*ready);
println!("Thread {} woke up", i);
});
handles.push(handle);
}
thread::sleep(std::time::Duration::from_millis(100));
let (lock, cvar) = &*state;
// notify_one wakes only one thread
{
let mut ready = lock.lock();
*ready = true;
}
cvar.notify_one(); // Only one thread will wake
thread::sleep(std::time::Duration::from_millis(100));
// notify_all wakes all waiting threads
cvar.notify_all(); // Remaining threads wake
for handle in handles {
handle.join().unwrap();
}
}Use notify_one when one thread should consume the state change; notify_all when all should respond.
use parking_lot::{Mutex, Condvar};
use std::collections::VecDeque;
use std::sync::Arc;
struct BoundedBuffer<T> {
queue: Mutex<VecDeque<T>>,
not_empty: Condvar,
not_full: Condvar,
capacity: usize,
}
impl<T> BoundedBuffer<T> {
fn new(capacity: usize) -> Self {
BoundedBuffer {
queue: Mutex::new(VecDeque::with_capacity(capacity)),
not_empty: Condvar::new(),
not_full: Condvar::new(),
capacity,
}
}
fn put(&self, item: T) {
let mut queue = self.queue.lock();
// Wait until there's space
self.not_full.wait_while(&mut queue, |q| q.len() >= self.capacity);
queue.push_back(item);
// Signal that queue is not empty
self.not_empty.notify_one();
}
fn take(&self) -> T {
let mut queue = self.queue.lock();
// Wait until there's data
self.not_empty.wait_while(&mut queue, |q| q.is_empty());
let item = queue.pop_front().unwrap();
// Signal that queue is not full
self.not_full.notify_one();
item
}
}
fn main() {
let buffer = Arc::new(BoundedBuffer::new(3));
let buffer_producer = Arc::clone(&buffer);
let producer = std::thread::spawn(move || {
for i in 0..10 {
buffer_producer.put(i);
println!("Produced: {}", i);
}
});
let buffer_consumer = Arc::clone(&buffer);
let consumer = std::thread::spawn(move || {
for _ in 0..10 {
let item = buffer_consumer.take();
println!("Consumed: {}", item);
}
});
producer.join().unwrap();
consumer.join().unwrap();
}Multiple condition variables allow fine-grained waiting for different conditions.
use parking_lot::{Mutex as PlMutex, Condvar as PlCondvar};
use std::sync::{Mutex as StdMutex, Condvar as StdCondvar};
fn main() {
// std::sync::Condvar
let std_pair = (StdMutex::new(0i32), StdCondvar::new());
// Manual wait with std - must handle Result (poisoning)
{
let (lock, cvar) = &std_pair;
let mut guard = lock.lock().unwrap(); // Result from lock
// std::sync::Condvar::wait returns Result
while *guard < 10 {
guard = cvar.wait(guard).unwrap(); // Result from wait
}
}
// parking_lot::Condvar
let pl_pair = (PlMutex::new(0i32), PlCondvar::new());
// parking_lot - no poisoning, no Result types
{
let (lock, cvar) = &pl_pair;
let mut guard = lock.lock(); // Direct MutexGuard, not Result
// parking_lot::Condvar::wait doesn't return Result
while *guard < 10 {
cvar.wait(&mut guard); // No unwrap needed
}
// Or use wait_while for cleaner code
cvar.wait_while(&mut guard, |g| *g < 10);
}
println!("parking_lot::Condvar: cleaner API, no poisoning");
}parking_lot::Condvar avoids Result types and poisoning, simplifying error handling.
use parking_lot::{Mutex, Condvar};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
fn main() {
let state = Arc::new((Mutex::new(false), Condvar::new()));
// wait_until with timeout returns whether condition was met
let (lock, cvar) = &*state;
let mut ready = lock.lock();
let start = Instant::now();
let result = cvar.wait_for(&mut ready, Duration::from_millis(100));
match result {
parking_lot::WaitResult { timed_out: true } => {
println!("Timed out after {:?}", start.elapsed());
}
parking_lot::WaitResult { timed_out: false } => {
println!("Condition was signaled");
}
}
// wait_while_for combines predicate with timeout
let state2 = Arc::new((Mutex::new(0usize), Condvar::new()));
let (lock, cvar) = &*state2;
let mut counter = lock.lock();
let result = cvar.wait_while_for(&mut counter, |c| *c < 10, Duration::from_millis(50));
if result.timed_out {
println!("Timed out waiting for counter >= 10, value: {}", *counter);
} else {
println!("Counter reached: {}", *counter);
}
}wait_for and wait_while_for support timeouts for bounded waiting.
use parking_lot::{Mutex, Condvar};
use std::sync::Arc;
use std::thread;
fn main() {
struct WorkerState {
active_workers: usize,
shutdown: bool,
}
let state = Arc::new((
Mutex::new(WorkerState {
active_workers: 3,
shutdown: false,
}),
Condvar::new(),
));
// Worker threads
let mut handles = vec![];
for id in 0..3 {
let worker_state = Arc::clone(&state);
let handle = thread::spawn(move || {
let (lock, cvar) = &*worker_state;
let mut state = lock.lock();
// Wait for shutdown signal
cvar.wait_while(&mut state, |s| !s.shutdown);
println!("Worker {} shutting down", id);
state.active_workers -= 1;
// Signal that this worker is done
cvar.notify_all();
});
handles.push(handle);
}
// Simulate work then shutdown
thread::sleep(std::time::Duration::from_millis(100));
{
let (lock, cvar) = &*state;
let mut state = lock.lock();
state.shutdown = true;
cvar.notify_all(); // Wake all workers
}
// Wait for all workers to finish
{
let (lock, cvar) = &*state;
let mut state = lock.lock();
cvar.wait_while(&mut state, |s| s.active_workers > 0);
println!("All workers shut down");
}
for handle in handles {
handle.join().unwrap();
}
}Condition variables coordinate complex multi-threaded shutdown sequences.
use parking_lot::{Mutex, Condvar};
use std::sync::Arc;
use std::thread;
fn main() {
// Condition variables can wake up spuriously
// Always use predicates with wait
let state = Arc::new((Mutex::new(0u32), Condvar::new()));
// INCORRECT: Don't do this
// {
// let (lock, cvar) = &*state;
// let mut counter = lock.lock();
// cvar.wait(&mut counter); // May wake up spuriously!
// // Counter may still be 0
// }
// CORRECT: Always use wait_while or check predicate
{
let (lock, cvar) = &*state;
let mut counter = lock.lock();
// wait_while handles spurious wakeups correctly
cvar.wait_while(&mut counter, |c| *c == 0);
// Now counter is definitely non-zero (unless signaled with non-zero)
}
// The predicate must be checked in a loop
// wait_while does this automatically
}wait_while handles spurious wakeups by re-checking the predicate after each wakeup.
use parking_lot::{Mutex, Condvar};
use std::sync::Arc;
use std::thread;
use std::sync::atomic::{AtomicUsize, Ordering};
fn main() {
// parking_lot uses a fair wake queue
// Threads are woken in the order they started waiting
let state = Arc::new((Mutex::new(false), Condvar::new()));
let wake_order = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for i in 0..5 {
let state_clone = Arc::clone(&state);
let wake_order_clone = Arc::clone(&wake_order);
let handle = thread::spawn(move || {
let (lock, cvar) = &*state_clone;
let mut ready = lock.lock();
cvar.wait_while(&mut ready, |r| !*r);
// Record wake order
let order = wake_order_clone.fetch_add(1, Ordering::SeqCst);
println!("Thread {} woke up (order {})", i, order);
});
handles.push(handle);
}
thread::sleep(std::time::Duration::from_millis(50));
{
let (lock, cvar) = &*state;
let mut ready = lock.lock();
*ready = true;
// Wake all threads
cvar.notify_all();
}
for handle in handles {
handle.join().unwrap();
}
// parking_lot provides fair wake ordering
// Threads wake in a deterministic order
}parking_lot::Condvar uses fair wake ordering, waking threads in FIFO order.
use parking_lot::{Mutex, Condvar};
use std::sync::{Mutex as StdMutex, Condvar as StdCondvar};
use std::mem::size_of;
fn main() {
// Size comparison
println!("parking_lot::Mutex: {} bytes", size_of::<Mutex<()>>());
println!("std::sync::Mutex: {} bytes", size_of::<StdMutex<()>>());
println!("parking_lot::Condvar: {} bytes", size_of::<Condvar>());
println!("std::sync::Condvar: {} bytes", size_of::<StdCondvar>());
// parking_lot types are typically smaller
// parking_lot::Mutex<()> is often 1 pointer (size_of::<usize>())
// std::sync::Mutex<()> is larger due to different internal structure
// Performance characteristics:
// parking_lot::Condvar:
// - Faster uncontended operations
// - Smaller memory footprint
// - Fair wake ordering (FIFO)
// - No poisoning
// std::sync::Condvar:
// - System-level implementation
// - Larger memory footprint
// - May have different wake ordering
// - Handles poisoning
}parking_lot types are typically smaller and faster for uncontended cases.
use parking_lot::{Mutex, Condvar};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
fn main() {
// Pattern: wait until a complex condition is met
struct ProcessState {
items_processed: usize,
items_total: usize,
started: Instant,
}
let state = Arc::new((
Mutex::new(ProcessState {
items_processed: 0,
items_total: 10,
started: Instant::now(),
}),
Condvar::new(),
));
// Worker thread
let worker_state = Arc::clone(&state);
let worker = thread::spawn(move || {
let (lock, cvar) = &*worker_state;
for i in 0..10 {
thread::sleep(Duration::from_millis(50));
let mut state = lock.lock();
state.items_processed += 1;
println!("Processed {}/{}", state.items_processed, state.items_total);
cvar.notify_one();
}
});
// Main thread waits for completion
{
let (lock, cvar) = &*state;
let mut state = lock.lock();
cvar.wait_while(&mut state, |s| {
s.items_processed < s.items_total
&& s.started.elapsed() < Duration::from_secs(5)
});
if state.items_processed == state.items_total {
println!("All items processed!");
} else {
println!("Timed out waiting for completion");
}
}
worker.join().unwrap();
}Complex predicates enable sophisticated waiting conditions.
use parking_lot::{Mutex, Condvar};
use std::time::Duration;
fn main() {
let mutex = Mutex::new(0);
let cvar = Condvar::new();
// Basic waiting
{
let mut guard = mutex.lock();
cvar.wait(&mut guard);
// guard is held again
}
// Predicate-based waiting (recommended)
{
let mut guard = mutex.lock();
cvar.wait_while(&mut guard, |value| *value < 10);
// value >= 10 is guaranteed
}
// Timeout-based waiting
{
let mut guard = mutex.lock();
let result = cvar.wait_for(&mut guard, Duration::from_secs(1));
if result.timed_out {
println!("Timed out");
}
}
// Predicate with timeout
{
let mut guard = mutex.lock();
let result = cvar.wait_while_for(&mut guard, |v| *v < 10, Duration::from_secs(1));
if result.timed_out {
println!("Timed out, value = {}", *guard);
}
}
// Notification
{
let mut guard = mutex.lock();
*guard = 10;
}
cvar.notify_one(); // Wake one waiting thread
cvar.notify_all(); // Wake all waiting threads
}Key concepts:
| Concept | Purpose |
|---------|---------|
| Condvar | Coordinate threads based on shared state |
| wait | Block until notified, releasing lock |
| wait_while | Wait until predicate is true (recommended) |
| notify_one | Wake one waiting thread |
| notify_all | Wake all waiting threads |
parking_lot::Condvar advantages over std::sync::Condvar:
| Aspect | parking_lot | std::sync |
|--------|---------------|-------------|
| Lock poisoning | No poisoning | Handles poisoning via Result |
| Memory size | Smaller | Larger |
| Uncontended speed | Faster | Slower |
| Wake ordering | Fair (FIFO) | Implementation-defined |
| wait_while | Built-in predicate loop | Manual implementation needed |
Common patterns:
| Pattern | Use case |
|---------|----------|
| wait_while | Always use instead of plain wait |
| notify_one | Single consumer, or signal that one thread can proceed |
| notify_all | Multiple consumers, or state change affects all waiters |
| Multiple Condvar | Different conditions (e.g., "not empty" and "not full") |
Key insight: Condition variables solve the fundamental problem of efficient waiting for state changes in shared memory concurrency. Without Condvar, threads would need to poll (busy-wait) or sleep with timeouts, both wasteful approaches that consume CPU or introduce latency. Condvar::wait_while atomically releases the lock and blocks the thread, then reacquires the lock before returning, ensuring no state changes are missed. The predicate check in wait_while handles spurious wakeups—the implementation may wake threads without signals, so the predicate must be re-evaluated upon each wakeup. parking_lot::Condvar improves on std::sync::Condvar by eliminating poisoning (converting panics into poison), providing smaller memory footprint, fair wake ordering, and built-in predicate waiting through wait_while. The fair wake ordering ensures that when notify_all wakes multiple threads, they acquire the lock in FIFO order rather than randomly, which can prevent starvation in some patterns. Multiple condition variables on the same mutex enable fine-grained coordination, as shown in the bounded buffer example where producers wait on "not full" while consumers wait on "not empty".