Loading pageā¦
Rust walkthroughs
Loading pageā¦
tokio::sync::broadcast channel differ from tokio::sync::mpsc for multi-consumer scenarios?tokio::sync::broadcast sends every message to all active receivers, creating a copy of each message for each receiver. tokio::sync::mpsc with multiple consumers distributes messages among receiversāeach message goes to exactly one receiver. The fundamental difference is delivery semantics: broadcast is a "fan-out" pattern where all consumers receive all messages, while mpsc is a "work distribution" pattern where messages are load-balanced across consumers. This affects memory usage, ordering guarantees, backpressure behavior, and use case suitability. Broadcast is ideal for event notification, while mpsc excels at parallel task processing.
use tokio::sync::mpsc;
use std::time::Duration;
#[tokio::main]
async fn main() {
let (tx, rx1) = mpsc::channel::<i32>(10);
let rx2 = rx1.clone(); // Clone to get another receiver
// Consumer 1
let h1 = tokio::spawn(async move {
while let Some(val) = rx1.recv().await {
println!("Consumer 1 received: {}", val);
}
});
// Consumer 2
let h2 = tokio::spawn(async move {
while let Some(val) = rx2.recv().await {
println!("Consumer 2 received: {}", val);
}
});
// Send messages
for i in 0..4 {
tx.send(i).await.unwrap();
}
drop(tx);
h1.await.unwrap();
h2.await.unwrap();
}
// Output might be:
// Consumer 1 received: 0
// Consumer 1 received: 2
// Consumer 2 received: 1
// Consumer 2 received: 3
// (Messages distributed between consumers)With mpsc, each message goes to exactly one consumer.
use tokio::sync::broadcast;
use std::time::Duration;
#[tokio::main]
async fn main() {
let (tx, rx1) = broadcast::channel::<i32>(10);
let rx2 = tx.subscribe(); // Create another subscriber
// Consumer 1
let h1 = tokio::spawn(async move {
while let Ok(val) = rx1.recv().await {
println!("Consumer 1 received: {}", val);
}
});
// Consumer 2
let h2 = tokio::spawn(async move {
while let Ok(val) = rx2.recv().await {
println!("Consumer 2 received: {}", val);
}
});
// Send messages
for i in 0..4 {
tx.send(i).unwrap();
}
drop(tx);
h1.await.unwrap();
h2.await.unwrap();
}
// Output:
// Consumer 1 received: 0
// Consumer 1 received: 1
// Consumer 1 received: 2
// Consumer 1 received: 3
// Consumer 2 received: 0
// Consumer 2 received: 1
// Consumer 2 received: 2
// Consumer 2 received: 3
// (All consumers receive ALL messages)With broadcast, every consumer receives every message.
use tokio::sync::{mpsc, broadcast};
async fn distribution_comparison() {
// mpsc: Work distribution
let (tx, mut rx1) = mpsc::channel::<&str>(10);
let mut rx2 = rx1.clone();
tx.send("task1").await.unwrap();
tx.send("task2").await.unwrap();
tx.send("task3").await.unwrap();
tx.send("task4").await.unwrap();
// rx1 might receive: task1, task3
// rx2 might receive: task2, task4
// Each message goes to ONE receiver
// broadcast: Message replication
let (tx, mut rx1) = broadcast::channel::<&str>(10);
let mut rx2 = tx.subscribe();
tx.send("event1").unwrap();
tx.send("event2").unwrap();
tx.send("event3").unwrap();
// rx1 receives: event1, event2, event3
// rx2 receives: event1, event2, event3
// Each message goes to ALL receivers
}The core difference: mpsc distributes work, broadcast replicates events.
use tokio::sync::{mpsc, broadcast};
async fn receiver_cloning() {
// mpsc: Cloning creates competing consumers
let (tx, rx1) = mpsc::channel::<i32>(10);
let mut rx2 = rx1.clone(); // rx1 and rx2 COMPETE for messages
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
// One of them gets 1, the other might get 2
// They don't both get both messages
// broadcast: Subscribing creates independent consumers
let (tx, mut rx1) = broadcast::channel::<i32>(10);
let mut rx2 = tx.subscribe(); // rx2 gets its OWN copy of each message
tx.send(1).unwrap();
tx.send(2).unwrap();
// rx1 receives: 1, 2
// rx2 receives: 1, 2
// Both get ALL messages
}mpsc::Receiver::clone creates competing consumers; broadcast::Sender::subscribe creates independent subscribers.
use tokio::sync::broadcast;
async fn memory_considerations() {
// broadcast copies each message to each subscriber's buffer
let (tx, _rx1) = broadcast::channel::<Vec<u8>>(10);
let _rx2 = tx.subscribe();
let _rx3 = tx.subscribe();
// Large message
let large_data = vec![0u8; 1_000_000]; // 1 MB
// This creates COPIES for each subscriber
// With 3 subscribers: 3 MB of data copied into buffers
tx.send(large_data).unwrap();
// Consider: is the message cheap to clone?
// Arc<T> can reduce copying overhead
use std::sync::Arc;
let (tx, _rx1) = broadcast::channel::<Arc<Vec<u8>>>(10);
let _rx2 = tx.subscribe();
let _rx3 = tx.subscribe();
let shared_data = Arc::new(vec![0u8; 1_000_000]);
tx.send(shared_data.clone()).unwrap(); // Only clones Arc, not data
}Broadcast clones each message for each subscriber. Use Arc for large messages.
use tokio::sync::{mpsc, broadcast};
async fn backpressure() {
// mpsc: Backpressure propagates to sender
let (tx, mut rx) = mpsc::channel::<i32>(2);
tx.send(1).await.unwrap(); // Succeeds
tx.send(2).await.unwrap(); // Succeeds
// tx.send(3).await would BLOCK until there's room
// Sender waits when channel is full
// broadcast: Different behavior
let (tx, mut rx1) = broadcast::channel::<i32>(2);
let mut rx2 = tx.subscribe();
tx.send(1).unwrap(); // Succeeds if ANY subscriber can receive
tx.send(2).unwrap();
tx.send(3).unwrap(); // Overwrites oldest for slow receivers
// rx2 might have missed message 1 if it was slow!
// Slow consumers get Lagged error
match rx2.recv().await {
Ok(_) => {},
Err(broadcast::error::RecvError::Lagged(n)) => {
println!("Receiver fell behind, missed {} messages", n);
}
Err(broadcast::error::RecvError::Closed) => {
println!("Channel closed");
}
}
}mpsc blocks senders when full; broadcast drops old messages and reports lag.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel::<i32>(3);
let mut rx2 = tx.subscribe();
// Fast consumer
let h1 = tokio::spawn(async move {
while let Ok(val) = rx1.recv().await {
println!("Fast consumer: {}", val);
}
});
// Slow consumer (simulated)
let h2 = tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
match rx2.recv().await {
Ok(val) => println!("Slow consumer: {}", val),
Err(broadcast::error::RecvError::Lagged(n)) => {
println!("Slow consumer lagged, missed {} messages", n);
continue;
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
// Send messages rapidly
for i in 0..10 {
tx.send(i).unwrap();
}
drop(tx);
h1.await.unwrap();
h2.await.unwrap();
}
// Fast consumer receives all messages
// Slow consumer might see:
// Slow consumer lagged, missed 7 messages
// Slow consumer: 7
// Slow consumer: 8
// Slow consumer: 9Broadcast channels have a fixed capacity; slow consumers may miss messages.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, _rx1) = broadcast::channel::<i32>(10);
// Send messages BEFORE subscriber 2 joins
tx.send(1).unwrap();
tx.send(2).unwrap();
// rx1 can still receive 1 and 2 (they're in its buffer)
// rx2 MISSES messages sent before subscription
let mut rx2 = tx.subscribe();
tx.send(3).unwrap();
// rx1 receives: 1, 2, 3
// rx2 receives: 3 (missed 1 and 2)
match rx2.recv().await {
Ok(val) => println!("rx2 received: {}", val), // 3
Err(e) => println!("Error: {}", e),
}
}Broadcast subscribers only receive messages sent after they subscribe.
use tokio::sync::mpsc;
use std::time::Duration;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<Job>(10);
// Spawn multiple workers
let mut workers = Vec::new();
for id in 0..3 {
let mut rx = rx.clone();
workers.push(tokio::spawn(async move {
while let Some(job) = rx.recv().await {
println!("Worker {} processing: {}", id, job.name);
job.process().await;
}
}));
}
drop(rx); // Important: drop original receiver
// Submit jobs
for i in 0..10 {
tx.send(Job { name: format!("job{}", i) }).await.unwrap();
}
drop(tx);
for worker in workers {
worker.await.unwrap();
}
}
struct Job {
name: String,
}
impl Job {
async fn process(&self) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
// Each job processed by exactly ONE worker
// Natural load balancing across workersmpsc is ideal for distributing work across parallel workers.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, _rx) = broadcast::channel::<Event>(10);
// Multiple subsystems subscribe to events
let mut logger_rx = tx.subscribe();
let mut metrics_rx = tx.subscribe();
let mut cache_rx = tx.subscribe();
// Logger
tokio::spawn(async move {
while let Ok(event) = logger_rx.recv().await {
println!("[LOG] {:?}", event);
}
});
// Metrics
tokio::spawn(async move {
while let Ok(event) = metrics_rx.recv().await {
println!("[METRICS] {:?}", event);
}
});
// Cache invalidation
tokio::spawn(async move {
while let Ok(event) = cache_rx.recv().await {
match event {
Event::UserDeleted(id) => println!("[CACHE] Invalidate user {}", id),
Event::UserUpdated(id) => println!("[CACHE] Refresh user {}", id),
}
}
});
// Broadcast events
tx.send(Event::UserUpdated(1)).unwrap();
tx.send(Event::UserDeleted(2)).unwrap();
}
#[derive(Debug)]
enum Event {
UserUpdated(u32),
UserDeleted(u32),
}
// All subscribers receive ALL eventsBroadcast is ideal when multiple subsystems need to react to the same events.
use tokio::sync::{mpsc, broadcast};
async fn capacity_behavior() {
// mpsc capacity is total messages in channel
let (tx, rx) = mpsc::channel::<i32>(10);
// Channel holds up to 10 messages total
// broadcast capacity is per-subscriber
let (tx, rx) = broadcast::channel::<i32>(10);
// Each subscriber has its OWN buffer of 10 messages
// With 5 subscribers, potentially 50 messages buffered total
// When capacity exceeded:
// mpsc: send().await blocks until room available
// broadcast: send() overwrites oldest message for slow receivers
}Capacity semantics differ: mpsc uses shared capacity, broadcast uses per-subscriber buffers.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel::<i32>(3);
// Fill the buffer
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
tx.send(4).unwrap(); // Overwrites oldest (1)
tx.send(5).unwrap(); // Overwrites oldest (2)
// Receiver has fallen behind
match rx.recv().await {
Ok(val) => println!("Received: {}", val),
Err(broadcast::error::RecvError::Lagged(missed)) => {
println!("Lagged, missed {} messages", missed);
// Now can receive recent messages
}
Err(broadcast::error::RecvError::Closed) => {
println!("Channel closed");
}
}
// After handling Lagged, can continue receiving
while let Ok(val) = rx.recv().await {
println!("Received: {}", val);
}
}Broadcast receivers must handle Lagged errors for slow consumption scenarios.
use tokio::sync::{mpsc, broadcast};
// Use mpsc when:
// - You want work distribution (each task processed once)
// - You need backpressure (slow consumers slow down producers)
// - You have a pool of workers
// - Message order matters within a single consumer stream
// Use broadcast when:
// - All consumers need all messages
// - You're broadcasting events/notifications
// - Consumers can tolerate missing messages if slow
// - You want decoupled subscribers (can join/leave dynamically)
// Example: Web server
async fn web_server_example() {
// mpsc: HTTP request handling
let (request_tx, request_rx) = mpsc::channel::<Request>(100);
// Requests distributed among worker tasks
// broadcast: Server events (shutdown, metrics, logging)
let (event_tx, _) = broadcast::channel::<ServerEvent>(10);
// All subsystems receive shutdown signal
let mut shutdown_rx1 = event_tx.subscribe();
let mut shutdown_rx2 = event_tx.subscribe();
// When shutting down:
// event_tx.send(ServerEvent::Shutdown).unwrap();
// Both subscribers receive it
}
struct Request;
enum ServerEvent {
Shutdown,
Metrics,
}Choose based on whether you need distribution (mpsc) or replication (broadcast).
| Aspect | mpsc | broadcast |
|--------|------|-----------|
| Message delivery | Each message to ONE consumer | Each message to ALL consumers |
| Receiver creation | rx.clone() | tx.subscribe() |
| Backpressure | Sender waits when full | Old messages dropped, Lagged error |
| Slow consumers | Block sender | May miss messages |
| Memory usage | Single buffer shared | Buffer per subscriber |
| Use case | Work distribution | Event notification |
| Capacity meaning | Total messages | Per-subscriber buffer |
| Cloning receivers | Competing consumers | Independent subscribers |
| Sender behavior | send().await can block | send() returns immediately |
| Join behavior | No missed messages | Misses messages before subscription |
tokio::sync::mpsc and tokio::sync::broadcast serve fundamentally different purposes despite both being multi-consumer channels:
mpsc (Multi-Producer Single-Consumer, extended to multi-consumer via cloning):
broadcast:
Lagged errorsKey architectural insight: The choice determines how you think about consumers. With mpsc, consumers are workers processing a shared workloadāeach message is a unit of work. With broadcast, consumers are subscribers to a stream of eventsāeach message is something to be observed. A slow worker in mpsc slows down the whole pipeline (backpressure), while a slow subscriber in broadcast simply falls behind and may miss messages (no backpressure). This is why broadcast works well for things like shutdown signals and metrics, where every component needs to know, while mpsc works well for request handling where you want parallel processing without duplication.