Loading page…
Rust walkthroughs
Loading page…
tokio::sync::Semaphore and how does it differ from using Mutex for limiting concurrency?tokio::sync::Semaphore is an async concurrency primitive that limits how many tasks can access a resource simultaneously by maintaining a count of available permits. It differs fundamentally from Mutex in purpose: Mutex provides exclusive access (permit count of 1) for data protection, while Semaphore provides controlled concurrent access (any permit count) for rate limiting and resource management. Use Semaphore when you want to limit how many operations happen in parallel without protecting specific data. Use Mutex when you need exclusive access to shared mutable state. They solve different problems and can be combined—using a semaphore to limit concurrent database connections while each connection uses a mutex internally for its state.
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
// Create a semaphore with 3 permits
let semaphore = Semaphore::new(3);
// Acquire a permit (decrements count)
let permit1 = semaphore.acquire().await.unwrap();
// Now 2 permits available
let permit2 = semaphore.acquire().await.unwrap();
let permit3 = semaphore.acquire().await.unwrap();
// Now 0 permits available
// This would wait until a permit is released:
// let permit4 = semaphore.acquire().await.unwrap();
// Release permits by dropping the guards
drop(permit1);
drop(permit2);
drop(permit3);
// Back to 3 permits available
}A semaphore maintains a count of permits. Acquiring decrements, releasing increments.
use tokio::sync::Semaphore;
use std::sync::Arc;
async fn fetch_url(url: &str) -> String {
// Simulate network request
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
format!("Response from {}", url)
}
#[tokio::main]
async fn main() {
let urls = vec![
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
"https://example.com/4",
"https://example.com/5",
];
// Limit to 2 concurrent requests
let semaphore = Arc::new(Semaphore::new(2));
let mut handles = vec![];
for url in urls {
let semaphore = semaphore.clone();
let handle = tokio::spawn(async move {
// Acquire permit before making request
let _permit = semaphore.acquire().await.unwrap();
println!("Starting request to {}", url);
let response = fetch_url(url).await;
println!("Finished request to {}", url);
response
// Permit released when _permit is dropped
});
handles.push(handle);
}
// Wait for all requests to complete
for handle in handles {
handle.await.unwrap();
}
}Semaphore limits concurrent operations without managing specific data.
use tokio::sync::Mutex;
use std::sync::Arc;
struct Counter {
count: u32,
}
#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(Counter { count: 0 }));
let mut handles = vec![];
for i in 0..10 {
let counter = counter.clone();
let handle = tokio::spawn(async move {
// Lock to get exclusive access to the data
let mut guard = counter.lock().await;
guard.count += 1;
println!("Task {} incremented to {}", i, guard.count);
// Lock released when guard is dropped
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("Final count: {}", counter.lock().await.count);
}Mutex protects shared mutable data with exclusive access.
use tokio::sync::{Semaphore, Mutex};
// Semaphore: Limits concurrency (HOW MANY can run)
// - Rate limiting API calls
// - Limiting database connections
// - Controlling parallel task execution
// - Does NOT protect data
// Mutex: Protects data (WHO can access)
// - Exclusive access to shared state
// - Safe mutation of shared data
// - Ensures only ONE task accesses data at a time
// - IS about data protection
// Example: Database connection pool
// - Semaphore limits how many connections are used
// - Each connection's internal state protected by its own Mutex
#[tokio::main]
async fn main() {
// Semaphore: "At most 3 operations can run concurrently"
let concurrency_limit = Semaphore::new(3);
// Mutex: "Only one task can modify this data at a time"
let shared_data = Mutex::new(vec![]);
// They serve different purposes and can be used together
}Semaphore controls how many; Mutex controls who and protects what.
use tokio::sync::Semaphore;
use std::sync::Arc;
struct ApiClient {
rate_limiter: Arc<Semaphore>,
base_url: String,
}
impl ApiClient {
fn new(base_url: &str, max_concurrent: usize) -> Self {
Self {
rate_limiter: Arc::new(Semaphore::new(max_concurrent)),
base_url: base_url.to_string(),
}
}
async fn get(&self, endpoint: &str) -> String {
// Acquire permit before making request
let _permit = self.rate_limiter.acquire().await.unwrap();
// Make the request (rate-limited)
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
format!("{}{}", self.base_url, endpoint)
}
}
#[tokio::main]
async fn main() {
// Limit to 5 concurrent API requests
let client = ApiClient::new("https://api.example.com", 5);
let mut handles = vec![];
for i in 0..20 {
let client = &client;
let handle = tokio::spawn(async move {
client.get(&format!("/resource/{}", i)).await
});
handles.push(handle);
}
// Even though we spawn 20 tasks, only 5 requests happen concurrently
for handle in handles {
handle.await.unwrap();
}
}Semaphore rate-limits without knowing or caring about the data.
use tokio::sync::Mutex;
use std::sync::Arc;
struct AppState {
users: Vec<String>,
request_count: u64,
}
#[tokio::main]
async fn main() {
let state = Arc::new(Mutex::new(AppState {
users: vec!["admin".to_string()],
request_count: 0,
}));
// Multiple tasks need to modify shared state
let mut handles = vec![];
for i in 0..5 {
let state = state.clone();
let handle = tokio::spawn(async move {
let mut guard = state.lock().await;
guard.users.push(format!("user_{}", i));
guard.request_count += 1;
println!("Added user_{}", i);
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
let final_state = state.lock().await;
println!("Users: {:?}", final_state.users);
println!("Requests: {}", final_state.request_count);
}Mutex ensures data integrity when multiple tasks modify shared state.
use tokio::sync::{Semaphore, Mutex};
use std::sync::Arc;
struct DatabasePool {
// Semaphore limits how many connections are in use
connection_semaphore: Arc<Semaphore>,
// Mutex protects the pool of available connections
available_connections: Arc<Mutex<Vec<String>>>,
}
impl DatabasePool {
fn new(pool_size: usize) -> Self {
let connections: Vec<String> = (0..pool_size)
.map(|i| format!("connection_{}", i))
.collect();
Self {
connection_semaphore: Arc::new(Semaphore::new(pool_size)),
available_connections: Arc::new(Mutex::new(connections)),
}
}
async fn get_connection(&self) -> ConnectionGuard<'_> {
// First: acquire semaphore (limits concurrent access)
let permit = self.connection_semaphore.acquire().await.unwrap();
// Second: get connection from pool (mutex protects the pool)
let mut pool = self.available_connections.lock().await;
let connection = pool.pop().unwrap();
drop(pool);
ConnectionGuard {
connection,
pool: self.available_connections.clone(),
permit,
}
}
}
struct ConnectionGuard<'a> {
connection: String,
pool: Arc<Mutex<Vec<String>>>,
permit: tokio::sync::OwnedSemaphorePermit,
}
impl Drop for ConnectionGuard<'_> {
fn drop(&mut self) {
// Return connection to pool
let pool = self.pool.clone();
let connection = std::mem::take(&mut self.connection);
tokio::spawn(async move {
pool.lock().await.push(connection);
});
// Permit is released when OwnedSemaphorePermit is dropped
}
}Combine both: Semaphore controls concurrency, Mutex protects shared state.
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Semaphore::new(2);
// Try to acquire without waiting
match semaphore.try_acquire() {
Ok(permit) => {
println!("Got permit immediately");
// Use permit
drop(permit);
}
Err(_) => {
println!("No permits available, would need to wait");
// Could: queue the request, return an error, or acquire().await
}
}
// Acquire all permits
let p1 = semaphore.acquire().await.unwrap();
let p2 = semaphore.acquire().await.unwrap();
// Now try_acquire fails
match semaphore.try_acquire() {
Ok(_) => println!("Unexpected success"),
Err(_) => println!("Correctly failed: no permits available"),
}
}try_acquire returns immediately without waiting.
use tokio::sync::Semaphore;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
// OwnedSemaphorePermit can be moved across tasks
let semaphore_clone = semaphore.clone();
let handle = tokio::spawn(async move {
// Acquire an owned permit
let permit = semaphore_clone.acquire().await.unwrap();
println!("Task acquired permit");
permit
});
let permit = handle.await.unwrap();
// Permit is now held by main task
println!("Main has the permit");
// Can also create owned permits directly
let owned_permit = semaphore.clone().acquire_owned().await.unwrap();
// Owned permits can be stored in structs, sent across channels, etc.
}Owned permits allow permit transfer between tasks.
use tokio::sync::{Semaphore, Mutex};
#[tokio::main]
async fn main() {
// SEMAPHORE BEHAVIOR
let sem = Semaphore::new(3);
// Multiple permits can be held simultaneously
let p1 = sem.acquire().await.unwrap();
let p2 = sem.acquire().await.unwrap();
let p3 = sem.acquire().await.unwrap();
// 3 permits held at once!
// p1, p2, p3 are all active concurrently
drop(p1); drop(p2); drop(p3);
// MUTEX BEHAVIOR
let mutex = Mutex::new(());
let g1 = mutex.lock().await;
// let g2 = mutex.lock().await; // This would WAIT
// Only ONE guard can exist at a time
drop(g1);
// Now another lock can be acquired
let g2 = mutex.lock().await;
}Semaphore allows multiple holders; Mutex allows exactly one.
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Semaphore::new(2);
// Close the semaphore to prevent new acquisitions
semaphore.close();
// try_acquire fails after close
assert!(semaphore.try_acquire().is_err());
// acquire returns an error after close
let result = semaphore.acquire().await;
assert!(result.is_err());
// Useful for graceful shutdown:
// 1. Close the semaphore
// 2. Wait for existing permit holders to finish
// 3. Proceed with shutdown
}Closing a semaphore prevents new permits while allowing existing ones to finish.
use tokio::sync::Semaphore;
#[tokio::main]
async fn main() {
let semaphore = Semaphore::new(10);
// Acquire 3 permits at once
let permits = semaphore.acquire_many(3).await.unwrap();
// 7 permits remain
// Permits holds 3 permits; dropping releases all 3
drop(permits);
// Back to 10 permits
// Useful when a single operation needs multiple "slots"
// e.g., a large download consuming more bandwidth quota
}acquire_many acquires multiple permits atomically.
use tokio::sync::{Semaphore, Mutex};
use std::sync::Arc;
struct Connection {
id: usize,
// Connection state...
}
struct Pool {
// Semaphore: limits concurrent usage
semaphore: Semaphore,
// Mutex: protects the available connections list
connections: Mutex<Vec<Connection>>,
max_size: usize,
}
impl Pool {
fn new(max_size: usize) -> Self {
let connections: Vec<Connection> = (0..max_size)
.map(|id| Connection { id })
.collect();
Self {
semaphore: Semaphore::new(max_size),
connections: Mutex::new(connections),
max_size,
}
}
async fn get(&self) -> PooledConnection<'_> {
// Acquire semaphore permit (wait if at capacity)
let permit = self.semaphore.acquire().await.unwrap();
// Get connection from pool (brief lock)
let connection = {
let mut pool = self.connections.lock().await;
pool.pop().expect("Semaphore should ensure availability")
};
PooledConnection {
connection: Some(connection),
pool: &self.connections,
permit,
}
}
}
struct PooledConnection<'a> {
connection: Option<Connection>,
pool: &'a Mutex<Vec<Connection>>,
permit: tokio::sync::SemaphorePermit<'a>,
}
impl Drop for PooledConnection<'_> {
fn drop(&mut self) {
if let Some(conn) = self.connection.take() {
let pool = self.pool;
tokio::spawn(async move {
pool.lock().await.push(conn);
});
}
// Permit released automatically
}
}Real pools use both: semaphore limits concurrency, mutex protects the pool.
| Aspect | Semaphore | Mutex | |--------|-----------|-------| | Purpose | Limit concurrency | Protect data | | Permit count | Configurable (N) | Always 1 | | Data protection | No | Yes | | Use case | Rate limiting, connection pools | Shared mutable state | | Multiple holders | Yes (up to N) | No | | Awareness of data | Unaware | Protects specific data |
Semaphore and Mutex serve fundamentally different purposes despite both being concurrency primitives:
Semaphore is for quantity control: It answers "how many can proceed?" Use it when you need to limit concurrent operations regardless of what data they access. The semaphore neither knows nor cares about the data—it's purely a counter. Common uses:
Mutex is for data protection: It answers "who can access this data?" Use it when multiple tasks need to read and write shared state. The mutex is tied to specific data and ensures exclusive access. Common uses:
Key insight: They're orthogonal concepts that often work together. A connection pool uses a semaphore to limit how many connections can be used (concurrency limit) and a mutex to protect the list of available connections (data protection). The semaphore doesn't protect the connection list—the mutex does. The mutex doesn't limit concurrency—the semaphore does.
When to choose which:
Semaphore when you care about limiting concurrent operationsMutex when you care about protecting shared dataThe mistake to avoid: using a Mutex purely for limiting concurrency without shared data (overkill, wrong abstraction), or using a Semaphore to protect data (doesn't actually protect anything—multiple holders can still corrupt state if they access shared data).