Loading page…
Rust walkthroughs
Loading page…
tokio::sync::mpsc::channel differ from tokio::sync::mpsc::unbounded_channel in backpressure handling?tokio::sync::mpsc::channel creates a bounded channel with a fixed capacity that enforces backpressure—when the channel is full, senders must wait (or fail) until space becomes available. tokio::sync::mpsc::unbounded_channel creates an unbounded channel that accepts messages without limit, effectively disabling backpressure and allowing senders to continue indefinitely. The bounded channel protects the receiver from being overwhelmed by controlling memory usage and providing natural flow control, while the unbounded channel offers simplicity and throughput at the cost of potential memory exhaustion. Choose bounded channels when you need to control memory usage and signal backpressure to producers; choose unbounded channels only when you can guarantee that message rates are bounded by other means or when memory exhaustion is acceptable.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Bounded channel with capacity of 32
let (tx, rx) = mpsc::channel::<i32>(32);
// Unbounded channel with no capacity limit
let (tx_unbounded, rx_unbounded) = mpsc::unbounded_channel::<i32>();
}Bounded channels require a capacity; unbounded channels don't.
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(2);
// First two sends succeed immediately
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
// Third send blocks until receiver makes space
// This is backpressure: sender waits for capacity
tx.send(3).await.unwrap();
// Receive one item
let val = rx.recv().await.unwrap();
println!("Received: {}", val); // 1
// Now the third send completes
}send().await blocks when the channel is full, implementing backpressure.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
// All sends succeed immediately, no waiting
for i in 0..1000000 {
tx.send(i).unwrap(); // Never blocks
}
// Messages queue up in memory
// No backpressure to slow down the sender
}Unbounded channels accept unlimited messages without blocking.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Bounded channel limits memory usage
let (tx, _rx) = mpsc::channel::<Vec<u8>>(100);
// Each message is 1MB
let big_message = vec![0u8; 1024 * 1024];
// At most 100 * 1MB = 100MB queued at any time
for _ in 0..100 {
tx.send(big_message.clone()).await.unwrap();
}
// Next send would wait (backpressure)
// tx.send(big_message).await; // Blocks until space available
}Bounded channels cap memory usage to capacity * message_size.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::unbounded_channel::<Vec<u8>>();
// No limit on queued messages
let big_message = vec![0u8; 1024 * 1024]; // 1MB
// This could grow memory without bound
// until the system runs out of memory
for _ in 0..1_000_000 {
tx.send(big_message.clone()).unwrap();
}
// Potentially 1TB of memory used
// System may OOM before this completes
}Unbounded channels can exhaust memory if senders outpace receivers.
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(2);
// Non-blocking send attempts
tx.try_send(1).unwrap();
tx.try_send(2).unwrap();
// Channel is full
match tx.try_send(3) {
Ok(()) => println!("Sent"),
Err(TrySendError::Full(_)) => println!("Channel full, implement backpressure"),
Err(TrySendError::Closed(_)) => println!("Channel closed"),
}
// try_send returns immediately without blocking
// Gives control back to handle backpressure explicitly
}try_send returns immediately, letting you handle full channels explicitly.
use tokio::sync::mpsc;
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(1);
tx.send(1).await.unwrap();
// Wait with timeout for capacity
match timeout(Duration::from_millis(100), tx.send(2)).await {
Ok(Ok(())) => println!("Sent successfully"),
Ok(Err(_)) => println!("Channel closed"),
Err(_) => println!("Timeout - channel full too long"),
}
}Combine send with timeout for bounded waiting.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::unbounded_channel();
// send returns Result, but only fails if receiver is dropped
for i in 0..1000 {
tx.send(i).unwrap(); // Succeeds as long as rx exists
}
// No capacity to check, no blocking to worry about
}Unbounded send only fails when the receiver is closed.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<i32>(10);
let (tx_unbounded, mut rx_unbounded) = mpsc::unbounded_channel::<i32>();
// Both receivers work the same way
// recv().await returns None when channel is closed
drop(tx);
let val = rx.recv().await; // None
drop(tx_unbounded);
let val = rx_unbounded.recv().await; // None
}Both bounded and unbounded receivers use the same recv API.
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(5);
// Spawn slow consumer
tokio::spawn(async move {
while let Some(val) = rx.recv().await {
println!("Processing: {}", val);
sleep(Duration::from_millis(100)).await; // Slow processing
}
});
// Fast producer
for i in 0..100 {
// Backpressure: send blocks when consumer falls behind
tx.send(i).await.unwrap();
println!("Sent: {}", i);
}
// Sender slows down automatically to match consumer speed
}Bounded channels propagate backpressure from consumer to producer.
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::unbounded_channel();
// Spawn slow consumer
tokio::spawn(async move {
while let Some(val) = rx.recv().await {
println!("Processing: {}", val);
sleep(Duration::from_millis(100)).await; // Slow processing
}
});
// Fast producer - no backpressure
for i in 0..1000000 {
tx.send(i).unwrap(); // Never blocks
}
// Producer runs full speed, queue grows without bound
}Unbounded channels disconnect producer rate from consumer rate.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, _rx) = mpsc::channel(100);
// Check capacity (how many more items can be sent without blocking)
let current_capacity = tx.capacity();
println!("Capacity: {}", current_capacity); // 100
// Send some items
for i in 0..50 {
tx.send(i).await.unwrap();
}
// Capacity decreases as items are queued
println!("Remaining capacity: {}", tx.capacity()); // ~50
}tx.capacity() shows remaining space in a bounded channel.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Small capacity: tighter backpressure, lower latency
let (tx_tight, rx) = mpsc::channel(1);
// Large capacity: more buffering, higher throughput burst handling
let (tx_loose, rx) = mpsc::channel(10000);
// Rule of thumb:
// - Capacity should handle brief processing spikes
// - Capacity * message_size should fit comfortably in memory
// - Smaller = more immediate backpressure signal
// - Larger = more tolerance for producer bursts
}Capacity tuning balances memory use and backpressure responsiveness.
use tokio::sync::mpsc;
// Use bounded when:
// 1. Producers may outpace consumers
// 2. Memory must be bounded
// 3. Backpressure is needed for flow control
// 4. You want producers to slow down
#[tokio::main]
async fn main() {
// Example: HTTP request handler -> database writer
// Limit queue to prevent memory exhaustion under load
let (tx, rx) = mpsc::channel::<Request>(100);
// Handler can only queue 100 requests before blocking
// Natural backpressure to clients
}Use bounded channels when producers should respect consumer capacity.
use tokio::sync::mpsc;
// Use unbounded when:
// 1. Producers are already rate-limited elsewhere
// 2. Messages are tiny or bounded in size
// 3. Consumer is always faster than producer
// 4. Simplicity outweighs memory risk
#[tokio::main]
async fn main() {
// Example: Logging channel with bounded input rate
// HTTP server already limits concurrent connections
let (tx, rx) = mpsc::unbounded_channel::<LogEntry>();
// Log rate is bounded by connection limit
// Unbounded is fine, log messages are small
}Use unbounded only when message rate is bounded by other means.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Start with unbounded for prototyping
let (tx, rx) = mpsc::unbounded_channel();
// Switch to bounded for production
let (tx, rx) = mpsc::channel(100);
// Both have similar APIs, conversion is straightforward:
// - tx.send() vs tx.send().await (both async for bounded)
// - rx.recv().await works the same
}Switching between bounded and unbounded is easy during development.
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(10);
// Multiple producers share the same capacity
let tx1 = tx.clone();
let tx2 = tx.clone();
tokio::spawn(async move {
for i in 0..100 {
tx1.send(i).await.unwrap(); // Blocks when channel full
}
});
tokio::spawn(async move {
for i in 100..200 {
tx2.send(i).await.unwrap(); // Also blocks when full
}
});
// Single consumer
while let Some(val) = rx.recv().await {
println!("Received: {}", val);
sleep(Duration::from_millis(10)).await;
}
}All producers share the bounded capacity.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Bounded: Sender implements Clone
let (tx, rx) = mpsc::channel(10);
let tx2 = tx.clone();
// Both senders share the same capacity
tx.send(1).await.unwrap();
tx2.send(2).await.unwrap();
// Unbounded: Sender also implements Clone
let (tx_u, rx_u) = mpsc::unbounded_channel();
let tx_u2 = tx_u.clone();
// Both work similarly for cloning
}Both bounded and unbounded senders can be cloned.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(3);
// Reserve capacity before sending
let permit1 = tx.reserve().await.unwrap(); // Waits if full
let permit2 = tx.reserve().await.unwrap();
let permit3 = tx.reserve().await.unwrap();
// Channel now full, next reserve would block
// Send using permits
permit1.send(1);
permit2.send(2);
permit3.send(3);
// Permits allow reserving space before creating the value
}reserve() reserves capacity before sending, useful when creating values is expensive.
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::{SendError, TrySendError};
#[tokio::main]
async fn main() {
// Bounded: send() can fail if receiver dropped
let (tx, rx) = mpsc::channel(10);
drop(rx);
match tx.send(1).await {
Ok(()) => println!("Sent"),
Err(SendError(_)) => println!("Receiver dropped"),
}
// Unbounded: same error handling
let (tx, rx) = mpsc::unbounded_channel();
drop(rx);
match tx.send(1) {
Ok(()) => println!("Sent"),
Err(SendError(_)) => println!("Receiver dropped"),
}
// try_send for bounded returns TrySendError::Full or TrySendError::Closed
let (tx, rx) = mpsc::channel(1);
tx.try_send(1).unwrap();
match tx.try_send(2) {
Err(TrySendError::Full(_)) => println!("Full"),
Err(TrySendError::Closed(_)) => println!("Closed"),
Ok(()) => println!("Sent"),
}
}Error types differ: bounded has TrySendError::Full, unbounded doesn't.
use tokio::sync::mpsc;
// Unbounded:
// - Faster send (no capacity check, no blocking)
// - Lower overhead per message
// - Memory grows linearly with queue depth
// Bounded:
// - Slower send under contention (capacity check, possible blocking)
// - Fixed memory ceiling
// - Better for production reliability
// In practice:
// - Use bounded for production code
// - Use unbounded for prototyping or when rate is guaranteed boundedUnbounded has lower overhead but higher risk; bounded has controlled memory.
mpsc::channel and mpsc::unbounded_channel represent two different approaches to producer-consumer communication:
Bounded channel implements explicit backpressure. When the queue fills, send().await blocks the producer until the consumer makes space. This creates a natural feedback loop: slow consumers automatically slow down producers. Memory usage is bounded to capacity × message_size. Use bounded channels when:
sendUnbounded channel removes backpressure entirely. send always succeeds immediately (if the receiver exists). This gives maximum throughput but no feedback mechanism. Memory grows as producers outpace consumers, potentially to the point of OOM. Use unbounded channels when:
Key insight: Backpressure isn't just about memory—it's about system stability. A bounded channel creates a feedback loop where producer rate adjusts to consumer rate. An unbounded channel breaks that feedback loop, letting producers run at full speed regardless of consumer capacity. In production systems, bounded channels are the safer default because they prevent cascading failures when a component slows down. Unbounded channels trade safety for simplicity and should only be used when you can guarantee bounded input rates by other means.