Loading page…
Rust walkthroughs
Loading page…
std::sync::RwLock with large read-heavy workloads?std::sync::RwLock allows multiple concurrent readers or a single exclusive writer, making it attractive for read-heavy workloads. However, it introduces several memory safety and performance considerations that go beyond simple mutual exclusion. The primary concerns are: writer starvation (writers may be indefinitely blocked by continuous readers), panic safety (a panic while holding a lock can poison it), and lock granularity (coarse-grained locks on large data structures can create contention even among readers). Additionally, RwLock in the standard library is not async-aware and can deadlock in async contexts. Understanding these trade-offs is essential for building correct concurrent systems.
use std::sync::RwLock;
use std::thread;
struct Cache {
data: RwLock<Vec<String>>,
}
impl Cache {
fn new() -> Self {
Cache {
data: RwLock::new(Vec::new()),
}
}
fn get(&self, index: usize) -> Option<String> {
// Multiple readers can hold read locks simultaneously
let data = self.data.read().unwrap();
data.get(index).cloned()
}
fn insert(&self, value: String) {
// Write lock is exclusive - blocks all readers
let mut data = self.data.write().unwrap();
data.push(value);
}
}
fn basic_usage() {
let cache = Cache::new();
// Insert some data
cache.insert("hello".to_string());
cache.insert("world".to_string());
// Multiple concurrent reads are allowed
let cache_ref = &cache;
let handles: Vec<_> = (0..4)
.map(|i| {
thread::spawn(move || {
if let Some(s) = cache_ref.get(i % 2) {
println!("Thread {} read: {}", i, s);
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}RwLock allows multiple concurrent readers but only one exclusive writer.
use std::sync::RwLock;
use std::thread;
use std::time::Duration;
fn writer_starvation() {
let lock = RwLock::new(0);
// Continuous readers can block writers indefinitely
let lock_ref = &lock;
// Spawn many reader threads that continuously read
let reader_handles: Vec<_> = (0..10)
.map(|_| {
thread::spawn(move || {
for _ in 0..1000 {
let _guard = lock_ref.read().unwrap();
// Hold the read lock briefly
thread::sleep(Duration::from_micros(10));
}
})
})
.collect();
// Writer thread may struggle to acquire the lock
let writer_handle = thread::spawn(move || {
for _ in 0..10 {
// This may block for a long time
let start = std::time::Instant::now();
let mut guard = lock.write().unwrap();
*guard += 1;
println!("Writer acquired lock after {:?}", start.elapsed());
thread::sleep(Duration::from_millis(1));
}
});
for handle in reader_handles {
handle.join().unwrap();
}
writer_handle.join().unwrap();
}Continuous readers can prevent writers from acquiring the lock, causing writer starvation.
use std::sync::RwLock;
use std::panic;
fn lock_poisoning() {
let lock = RwLock::new(vec![1, 2, 3]);
// A panic while holding the lock "poisons" it
let result = panic::catch_unwind(|| {
let mut guard = lock.write().unwrap();
guard.push(4);
// Panic while holding the lock
panic!("oops!");
});
assert!(result.is_err());
// The lock is now poisoned
let read_result = lock.read();
match read_result {
Ok(guard) => println!("Lock not poisoned: {:?}", *guard),
Err(poisoned) => {
// Can still access the data through the error
let guard = poisoned.into_inner();
println!("Lock was poisoned, data: {:?}", *guard);
}
}
// Same with write
let write_result = lock.write();
match write_result {
Ok(mut guard) => guard.push(5),
Err(poisoned) => {
let mut guard = poisoned.into_inner();
guard.push(5);
println!("Recovered from poisoned lock");
}
}
}A panic while holding a lock poisons it, but the data can still be recovered.
use std::sync::RwLock;
use std::thread;
fn deadlock_scenario() {
let lock_a = RwLock::new(1);
let lock_b = RwLock::new(2);
// Thread 1: read A, then write B
let h1 = thread::spawn(move || {
let _read_a = lock_a.read().unwrap();
// Do some work...
thread::sleep(std::time::Duration::from_millis(1));
// Try to write B while holding read on A
// This can deadlock if Thread 2 holds B and wants A
let mut _write_b = lock_b.write().unwrap();
});
// Thread 2: read B, then write A
let h2 = thread::spawn(move || {
let _read_b = lock_b.read().unwrap();
thread::sleep(std::time::Duration::from_millis(1));
// Try to write A while holding read on B
// DEADLOCK: Neither can upgrade or acquire
let mut _write_a = lock_a.write().unwrap();
});
// This will likely deadlock!
// h1.join().unwrap();
// h2.join().unwrap();
}
fn upgrade_deadlock() {
// RwLock doesn't support upgrading read -> write
let lock = RwLock::new(0);
let _read = lock.read().unwrap();
// This would DEADLOCK - can't acquire write while holding read
// let mut write = lock.write().unwrap();
// Must drop read first
drop(_read);
let mut _write = lock.write().unwrap();
}RwLock cannot upgrade from read to write, and lock ordering issues cause deadlocks.
use std::sync::RwLock;
use std::collections::HashMap;
struct LargeCache {
// Coarse-grained lock on entire map
data: RwLock<HashMap<String, Vec<u8>>>,
}
impl LargeCache {
fn get(&self, key: &str) -> Option<Vec<u8>> {
// Read lock blocks ALL writers for entire duration
let data = self.data.read().unwrap();
data.get(key).cloned()
}
fn insert(&self, key: String, value: Vec<u8>) {
// Write lock blocks ALL readers and writers
let mut data = self.data.write().unwrap();
data.insert(key, value);
}
}
// Better approach: fine-grained locking
use std::sync::Arc;
use std::collections::BTreeMap;
use std::sync::RwLock as StdRwLock;
struct ShardedCache {
// Multiple smaller locks
shards: Vec<RwLock<HashMap<String, Vec<u8>>>>,
}
impl ShardedCache {
fn new(shard_count: usize) -> Self {
let shards = (0..shard_count)
.map(|_| RwLock::new(HashMap::new()))
.collect();
ShardedCache { shards }
}
fn shard_for_key(&self, key: &str) -> usize {
// Simple hash-based sharding
let hash = fxhash::hash(key);
(hash as usize) % self.shards.len()
}
fn get(&self, key: &str) -> Option<Vec<u8>> {
// Only lock the relevant shard
let shard_idx = self.shard_for_key(key);
let shard = self.shards[shard_idx].read().unwrap();
shard.get(key).cloned()
}
fn insert(&self, key: String, value: Vec<u8>) {
// Only lock the relevant shard
let shard_idx = self.shard_for_key(&key);
let mut shard = self.shards[shard_idx].write().unwrap();
shard.insert(key, value);
}
}
mod fxhash {
pub fn hash(key: &str) -> u64 {
// Simple hash function
let mut hash: u64 = 0;
for byte in key.bytes() {
hash = hash.wrapping_mul(0x517cc1b727220a95).wrapping_add(byte as u64);
}
hash
}
}Coarse-grained locks on large structures create contention; sharding reduces it.
use std::sync::RwLock;
use std::thread;
use std::time::{Duration, Instant};
struct Metrics {
read_count: RwLock<usize>,
write_count: RwLock<usize>,
}
impl Metrics {
fn new() -> Self {
Metrics {
read_count: RwLock::new(0),
write_count: RwLock::new(0),
}
}
fn record_read(&self) {
let mut count = self.read_count.write().unwrap();
*count += 1;
}
fn record_write(&self) {
let mut count = self.write_count.write().unwrap();
*count += 1;
}
fn snapshot(&self) -> (usize, usize) {
let reads = *self.read_count.read().unwrap();
let writes = *self.write_count.read().unwrap();
(reads, writes)
}
}
fn read_heavy_workload() {
let data = RwLock::new(vec![0u64; 10_000]);
let metrics = Metrics::new();
let start = Instant::now();
let duration = Duration::from_secs(1);
// 95% reads, 5% writes
let reader_handles: Vec<_> = (0..8)
.map(|_| {
let data = &data;
let metrics = &metrics;
thread::spawn(move || {
while start.elapsed() < duration {
let guard = data.read().unwrap();
let _sum: u64 = guard.iter().sum();
drop(guard);
metrics.record_read();
}
})
})
.collect();
let writer_handle = thread::spawn(|| {
while start.elapsed() < duration {
let mut guard = data.write().unwrap();
guard[0] += 1;
drop(guard);
metrics.record_write();
}
});
for h in reader_handles {
h.join().unwrap();
}
writer_handle.join().unwrap();
let (reads, writes) = metrics.snapshot();
println!("Reads: {}, Writes: {}", reads, writes);
println!("Read/Write ratio: {}", reads as f64 / writes as f64);
}Read-heavy workloads benefit from RwLock but writers may still be delayed.
use std::sync::RwLock;
use std::thread;
struct SharedState {
data: RwLock<Data>,
}
struct Data {
value: u64,
initialized: bool,
}
fn memory_ordering() {
let state = SharedState {
data: RwLock::new(Data {
value: 0,
initialized: false,
}),
};
// Writer thread
let writer = thread::spawn(|| {
let mut guard = state.data.write().unwrap();
guard.value = 42;
guard.initialized = true;
// Release semantics: all writes visible to next reader
});
writer.join().unwrap();
// Reader thread - sees consistent state
let reader = thread::spawn(|| {
let guard = state.data.read().unwrap();
// Acquire semantics: sees all prior writes
if guard.initialized {
assert_eq!(guard.value, 42);
}
});
reader.join().unwrap();
}RwLock provides acquire-release semantics, ensuring visibility of writes.
use std::sync::{RwLock, Mutex};
use std::thread;
use std::time::Instant;
fn compare_rwlock_mutex() {
let rwlock = RwLock::new(0u64);
let mutex = Mutex::new(0u64);
let iterations = 1_000_000;
let reader_threads = 8;
// RwLock benchmark
let start = Instant::now();
let handles: Vec<_> = (0..reader_threads)
.map(|_| {
thread::spawn(|| {
for _ in 0..iterations {
let _guard = rwlock.read().unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let rwlock_time = start.elapsed();
// Mutex benchmark
let start = Instant::now();
let handles: Vec<_> = (0..reader_threads)
.map(|_| {
thread::spawn(|| {
for _ in 0..iterations {
let _guard = mutex.lock().unwrap();
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let mutex_time = start.elapsed();
println!("RwLock: {:?}", rwlock_time);
println!("Mutex: {:?}", mutex_time);
// RwLock typically faster for read-heavy with multiple readers
// Mutex typically faster for write-heavy or single reader
}RwLock shines with multiple concurrent readers; Mutex may be faster for write-heavy workloads.
use std::sync::RwLock;
fn try_lock_pattern() {
let lock = RwLock::new(vec![1, 2, 3]);
// Try to acquire read lock without blocking
match lock.try_read() {
Ok(guard) => {
println!("Got read lock: {:?}", *guard);
}
Err(_) => {
println!("Read lock not available, do something else");
}
}
// Try to acquire write lock without blocking
match lock.try_write() {
Ok(mut guard) => {
guard.push(4);
println!("Got write lock");
}
Err(_) => {
println!("Write lock not available, retry later");
}
}
}
fn try_lock_with_backoff() {
let lock = RwLock::new(0);
let mut attempts = 0;
loop {
match lock.try_write() {
Ok(mut guard) => {
*guard += 1;
break;
}
Err(_) => {
attempts += 1;
if attempts > 10 {
// Fall back to blocking
let mut guard = lock.write().unwrap();
*guard += 1;
break;
}
std::thread::yield_now();
}
}
}
}try_read() and try_write() allow non-blocking lock acquisition attempts.
use std::sync::RwLock;
use tokio::sync::RwLock as AsyncRwLock;
// PROBLEM: std::sync::RwLock in async context
async fn problematic_async_rwlock() {
let lock = RwLock::new(0);
// This can block the async runtime!
let _guard = lock.read().unwrap();
// If we hold the guard across an await point, we block other tasks
// that need this lock from running on the same thread
some_async_operation().await;
// The guard is still held here
}
async fn some_async_operation() {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
// SOLUTION: Use tokio::sync::RwLock for async
async fn proper_async_rwlock() {
let lock = AsyncRwLock::new(0);
// This yields to the runtime if lock is contended
let _guard = lock.read().await;
// Safe to hold across await points
some_async_operation().await;
}
// Even better: don't hold locks across await points
async fn best_practice() {
let lock = AsyncRwLock::new(0);
// Get data, release lock immediately
let data = {
let guard = lock.read().await;
*guard
};
// Now safe to await without holding lock
some_async_operation().await;
// Acquire lock again only when needed
{
let mut guard = lock.write().await;
*guard += 1;
}
}std::sync::RwLock blocks the async runtime; use tokio::sync::RwLock for async code.
use std::sync::RwLock;
use std::sync::Arc;
struct ThreadSafeCache {
data: RwLock<CacheData>,
}
struct CacheData {
entries: Vec<Entry>,
last_update: std::time::Instant,
}
struct Entry {
key: String,
value: String,
}
impl ThreadSafeCache {
fn new() -> Self {
ThreadSafeCache {
data: RwLock::new(CacheData {
entries: Vec::new(),
last_update: std::time::Instant::now(),
}),
}
}
fn get(&self, key: &str) -> Option<String> {
let data = self.data.read().unwrap();
data.entries
.iter()
.find(|e| e.key == key)
.map(|e| e.value.clone())
}
fn insert(&self, key: String, value: String) {
let mut data = self.data.write().unwrap();
if let Some(entry) = data.entries.iter_mut().find(|e| e.key == key) {
entry.value = value;
} else {
data.entries.push(Entry { key, value });
}
data.last_update = std::time::Instant::now();
}
fn len(&self) -> usize {
self.data.read().unwrap().entries.len()
}
}
// Shared ownership with Arc
fn shared_cache() {
let cache = Arc::new(ThreadSafeCache::new());
let mut handles = vec![];
for i in 0..4 {
let cache = Arc::clone(&cache);
handles.push(std::thread::spawn(move || {
cache.insert(format!("key{}", i), format!("value{}", i));
if let Some(v) = cache.get(&format!("key{}", i)) {
println!("Got: {}", v);
}
}));
}
for h in handles {
h.join().unwrap();
}
println!("Total entries: {}", cache.len());
}RwLock enables interior mutability for shared state across threads.
use std::sync::RwLock;
use std::thread;
fn fairness_example() {
let lock = RwLock::new(0);
// The standard library RwLock behavior varies by platform
// On some platforms, writers can be starved by continuous readers
// On others, writers may have priority (blocking new readers)
// Linux (glibc): Uses pthread_rwlock which may have writer-preference
// Windows: May have different behavior
// This is platform-dependent and not guaranteed by Rust
// Spawn continuous readers
let lock_ref = &lock;
let reader = thread::spawn(move || {
for _ in 0..100 {
let _guard = lock_ref.read().unwrap();
// Do work
}
});
// Writer tries to acquire
let writer = thread::spawn(move || {
let mut guard = lock.write().unwrap();
*guard += 1;
});
reader.join().unwrap();
writer.join().unwrap();
}RwLock fairness behavior is platform-dependent; don't rely on specific ordering.
use std::sync::RwLock;
struct Container {
data: RwLock<Vec<String>>,
}
impl Container {
fn get_ref(&self, index: usize) -> Option<String> {
let guard = self.data.read().unwrap();
// Guard must stay alive for the reference
guard.get(index).cloned()
}
// This would NOT compile - guard doesn't live long enough
// fn get_ref_lifetime(&self, index: usize) -> Option<&String> {
// let guard = self.data.read().unwrap();
// guard.get(index) // Error: guard dropped, reference invalid
// }
fn process<F, R>(&self, f: F) -> R
where
F: FnOnce(&[String]) -> R,
{
let guard = self.data.read().unwrap();
f(&guard)
}
}
fn guard_lifetime() {
let container = Container {
data: RwLock::new(vec!["hello".to_string(), "world".to_string()]),
};
// Good: get owned data
if let Some(s) = container.get_ref(0) {
println!("{}", s);
}
// Good: process within callback
container.process(|data| {
if let Some(s) = data.get(0) {
println!("{}", s);
}
});
}Read guards must stay alive for references to be valid; cloning or callbacks work around this.
std::sync::RwLock has several memory safety and correctness considerations for read-heavy workloads:
Memory Safety Guarantees:
Performance Considerations:
Key Risks:
tokio::sync::RwLock instead)Best Practices:
try_read()/try_write() for non-blocking patternstokio::sync::RwLock for async codeKey insight: RwLock is not a universal improvement over Mutex for read-heavy workloads. The overhead of tracking readers can outweigh benefits when read hold times are short or contention is high. The writer starvation problem means that workloads requiring timely writes may need alternative approaches (like parking_lot::RwLock with fair policies, or write-preferring implementations). Always measure actual performance under realistic contention patterns.