How does tokio::sync::broadcast channel differ from tokio::sync::mpsc for multi-consumer scenarios?

tokio::sync::broadcast sends every message to all active receivers, creating a copy of each message for each receiver. tokio::sync::mpsc with multiple consumers distributes messages among receivers—each message goes to exactly one receiver. The fundamental difference is delivery semantics: broadcast is a "fan-out" pattern where all consumers receive all messages, while mpsc is a "work distribution" pattern where messages are load-balanced across consumers. This affects memory usage, ordering guarantees, backpressure behavior, and use case suitability. Broadcast is ideal for event notification, while mpsc excels at parallel task processing.

Basic mpsc Multi-Consumer Pattern

use tokio::sync::mpsc;
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    let (tx, rx1) = mpsc::channel::<i32>(10);
    let rx2 = rx1.clone();  // Clone to get another receiver
    
    // Consumer 1
    let h1 = tokio::spawn(async move {
        while let Some(val) = rx1.recv().await {
            println!("Consumer 1 received: {}", val);
        }
    });
    
    // Consumer 2
    let h2 = tokio::spawn(async move {
        while let Some(val) = rx2.recv().await {
            println!("Consumer 2 received: {}", val);
        }
    });
    
    // Send messages
    for i in 0..4 {
        tx.send(i).await.unwrap();
    }
    drop(tx);
    
    h1.await.unwrap();
    h2.await.unwrap();
}
 
// Output might be:
// Consumer 1 received: 0
// Consumer 1 received: 2
// Consumer 2 received: 1
// Consumer 2 received: 3
// (Messages distributed between consumers)

With mpsc, each message goes to exactly one consumer.

Basic broadcast Pattern

use tokio::sync::broadcast;
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    let (tx, rx1) = broadcast::channel::<i32>(10);
    let rx2 = tx.subscribe();  // Create another subscriber
    
    // Consumer 1
    let h1 = tokio::spawn(async move {
        while let Ok(val) = rx1.recv().await {
            println!("Consumer 1 received: {}", val);
        }
    });
    
    // Consumer 2
    let h2 = tokio::spawn(async move {
        while let Ok(val) = rx2.recv().await {
            println!("Consumer 2 received: {}", val);
        }
    });
    
    // Send messages
    for i in 0..4 {
        tx.send(i).unwrap();
    }
    drop(tx);
    
    h1.await.unwrap();
    h2.await.unwrap();
}
 
// Output:
// Consumer 1 received: 0
// Consumer 1 received: 1
// Consumer 1 received: 2
// Consumer 1 received: 3
// Consumer 2 received: 0
// Consumer 2 received: 1
// Consumer 2 received: 2
// Consumer 2 received: 3
// (All consumers receive ALL messages)

With broadcast, every consumer receives every message.

Message Distribution Semantics

use tokio::sync::{mpsc, broadcast};
 
async fn distribution_comparison() {
    // mpsc: Work distribution
    let (tx, mut rx1) = mpsc::channel::<&str>(10);
    let mut rx2 = rx1.clone();
    
    tx.send("task1").await.unwrap();
    tx.send("task2").await.unwrap();
    tx.send("task3").await.unwrap();
    tx.send("task4").await.unwrap();
    
    // rx1 might receive: task1, task3
    // rx2 might receive: task2, task4
    // Each message goes to ONE receiver
    
    // broadcast: Message replication
    let (tx, mut rx1) = broadcast::channel::<&str>(10);
    let mut rx2 = tx.subscribe();
    
    tx.send("event1").unwrap();
    tx.send("event2").unwrap();
    tx.send("event3").unwrap();
    
    // rx1 receives: event1, event2, event3
    // rx2 receives: event1, event2, event3
    // Each message goes to ALL receivers
}

The core difference: mpsc distributes work, broadcast replicates events.

Cloning Receivers: Different Semantics

use tokio::sync::{mpsc, broadcast};
 
async fn receiver_cloning() {
    // mpsc: Cloning creates competing consumers
    let (tx, rx1) = mpsc::channel::<i32>(10);
    let mut rx2 = rx1.clone();  // rx1 and rx2 COMPETE for messages
    
    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    
    // One of them gets 1, the other might get 2
    // They don't both get both messages
    
    // broadcast: Subscribing creates independent consumers
    let (tx, mut rx1) = broadcast::channel::<i32>(10);
    let mut rx2 = tx.subscribe();  // rx2 gets its OWN copy of each message
    
    tx.send(1).unwrap();
    tx.send(2).unwrap();
    
    // rx1 receives: 1, 2
    // rx2 receives: 1, 2
    // Both get ALL messages
}

mpsc::Receiver::clone creates competing consumers; broadcast::Sender::subscribe creates independent subscribers.

Memory and Performance Implications

use tokio::sync::broadcast;
 
async fn memory_considerations() {
    // broadcast copies each message to each subscriber's buffer
    let (tx, _rx1) = broadcast::channel::<Vec<u8>>(10);
    let _rx2 = tx.subscribe();
    let _rx3 = tx.subscribe();
    
    // Large message
    let large_data = vec![0u8; 1_000_000];  // 1 MB
    
    // This creates COPIES for each subscriber
    // With 3 subscribers: 3 MB of data copied into buffers
    tx.send(large_data).unwrap();
    
    // Consider: is the message cheap to clone?
    // Arc<T> can reduce copying overhead
    use std::sync::Arc;
    let (tx, _rx1) = broadcast::channel::<Arc<Vec<u8>>>(10);
    let _rx2 = tx.subscribe();
    let _rx3 = tx.subscribe();
    
    let shared_data = Arc::new(vec![0u8; 1_000_000]);
    tx.send(shared_data.clone()).unwrap();  // Only clones Arc, not data
}

Broadcast clones each message for each subscriber. Use Arc for large messages.

Backpressure Behavior

use tokio::sync::{mpsc, broadcast};
 
async fn backpressure() {
    // mpsc: Backpressure propagates to sender
    let (tx, mut rx) = mpsc::channel::<i32>(2);
    
    tx.send(1).await.unwrap();  // Succeeds
    tx.send(2).await.unwrap();  // Succeeds
    // tx.send(3).await would BLOCK until there's room
    // Sender waits when channel is full
    
    // broadcast: Different behavior
    let (tx, mut rx1) = broadcast::channel::<i32>(2);
    let mut rx2 = tx.subscribe();
    
    tx.send(1).unwrap();  // Succeeds if ANY subscriber can receive
    tx.send(2).unwrap();
    tx.send(3).unwrap();  // Overwrites oldest for slow receivers
    // rx2 might have missed message 1 if it was slow!
    
    // Slow consumers get Lagged error
    match rx2.recv().await {
        Ok(_) => {},
        Err(broadcast::error::RecvError::Lagged(n)) => {
            println!("Receiver fell behind, missed {} messages", n);
        }
        Err(broadcast::error::RecvError::Closed) => {
            println!("Channel closed");
        }
    }
}

mpsc blocks senders when full; broadcast drops old messages and reports lag.

Handling Slow Consumers

use tokio::sync::broadcast;
 
#[tokio::main]
async fn main() {
    let (tx, mut rx1) = broadcast::channel::<i32>(3);
    let mut rx2 = tx.subscribe();
    
    // Fast consumer
    let h1 = tokio::spawn(async move {
        while let Ok(val) = rx1.recv().await {
            println!("Fast consumer: {}", val);
        }
    });
    
    // Slow consumer (simulated)
    let h2 = tokio::spawn(async move {
        loop {
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            match rx2.recv().await {
                Ok(val) => println!("Slow consumer: {}", val),
                Err(broadcast::error::RecvError::Lagged(n)) => {
                    println!("Slow consumer lagged, missed {} messages", n);
                    continue;
                }
                Err(broadcast::error::RecvError::Closed) => break,
            }
        }
    });
    
    // Send messages rapidly
    for i in 0..10 {
        tx.send(i).unwrap();
    }
    
    drop(tx);
    h1.await.unwrap();
    h2.await.unwrap();
}
 
// Fast consumer receives all messages
// Slow consumer might see:
// Slow consumer lagged, missed 7 messages
// Slow consumer: 7
// Slow consumer: 8
// Slow consumer: 9

Broadcast channels have a fixed capacity; slow consumers may miss messages.

Order of Consumers Joining

use tokio::sync::broadcast;
 
#[tokio::main]
async fn main() {
    let (tx, _rx1) = broadcast::channel::<i32>(10);
    
    // Send messages BEFORE subscriber 2 joins
    tx.send(1).unwrap();
    tx.send(2).unwrap();
    
    // rx1 can still receive 1 and 2 (they're in its buffer)
    
    // rx2 MISSES messages sent before subscription
    let mut rx2 = tx.subscribe();
    
    tx.send(3).unwrap();
    
    // rx1 receives: 1, 2, 3
    // rx2 receives: 3 (missed 1 and 2)
    
    match rx2.recv().await {
        Ok(val) => println!("rx2 received: {}", val),  // 3
        Err(e) => println!("Error: {}", e),
    }
}

Broadcast subscribers only receive messages sent after they subscribe.

mpsc for Work Distribution

use tokio::sync::mpsc;
use std::time::Duration;
 
#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<Job>(10);
    
    // Spawn multiple workers
    let mut workers = Vec::new();
    for id in 0..3 {
        let mut rx = rx.clone();
        workers.push(tokio::spawn(async move {
            while let Some(job) = rx.recv().await {
                println!("Worker {} processing: {}", id, job.name);
                job.process().await;
            }
        }));
    }
    drop(rx);  // Important: drop original receiver
    
    // Submit jobs
    for i in 0..10 {
        tx.send(Job { name: format!("job{}", i) }).await.unwrap();
    }
    drop(tx);
    
    for worker in workers {
        worker.await.unwrap();
    }
}
 
struct Job {
    name: String,
}
 
impl Job {
    async fn process(&self) {
        tokio::time::sleep(Duration::from_millis(10)).await;
    }
}
 
// Each job processed by exactly ONE worker
// Natural load balancing across workers

mpsc is ideal for distributing work across parallel workers.

broadcast for Event Notification

use tokio::sync::broadcast;
 
#[tokio::main]
async fn main() {
    let (tx, _rx) = broadcast::channel::<Event>(10);
    
    // Multiple subsystems subscribe to events
    let mut logger_rx = tx.subscribe();
    let mut metrics_rx = tx.subscribe();
    let mut cache_rx = tx.subscribe();
    
    // Logger
    tokio::spawn(async move {
        while let Ok(event) = logger_rx.recv().await {
            println!("[LOG] {:?}", event);
        }
    });
    
    // Metrics
    tokio::spawn(async move {
        while let Ok(event) = metrics_rx.recv().await {
            println!("[METRICS] {:?}", event);
        }
    });
    
    // Cache invalidation
    tokio::spawn(async move {
        while let Ok(event) = cache_rx.recv().await {
            match event {
                Event::UserDeleted(id) => println!("[CACHE] Invalidate user {}", id),
                Event::UserUpdated(id) => println!("[CACHE] Refresh user {}", id),
            }
        }
    });
    
    // Broadcast events
    tx.send(Event::UserUpdated(1)).unwrap();
    tx.send(Event::UserDeleted(2)).unwrap();
}
 
#[derive(Debug)]
enum Event {
    UserUpdated(u32),
    UserDeleted(u32),
}
 
// All subscribers receive ALL events

Broadcast is ideal when multiple subsystems need to react to the same events.

Capacity and Buffer Behavior

use tokio::sync::{mpsc, broadcast};
 
async fn capacity_behavior() {
    // mpsc capacity is total messages in channel
    let (tx, rx) = mpsc::channel::<i32>(10);
    // Channel holds up to 10 messages total
    
    // broadcast capacity is per-subscriber
    let (tx, rx) = broadcast::channel::<i32>(10);
    // Each subscriber has its OWN buffer of 10 messages
    // With 5 subscribers, potentially 50 messages buffered total
    
    // When capacity exceeded:
    // mpsc: send().await blocks until room available
    // broadcast: send() overwrites oldest message for slow receivers
}

Capacity semantics differ: mpsc uses shared capacity, broadcast uses per-subscriber buffers.

Receiver Error Handling

use tokio::sync::broadcast;
 
#[tokio::main]
async fn main() {
    let (tx, mut rx) = broadcast::channel::<i32>(3);
    
    // Fill the buffer
    tx.send(1).unwrap();
    tx.send(2).unwrap();
    tx.send(3).unwrap();
    tx.send(4).unwrap();  // Overwrites oldest (1)
    tx.send(5).unwrap();  // Overwrites oldest (2)
    
    // Receiver has fallen behind
    match rx.recv().await {
        Ok(val) => println!("Received: {}", val),
        Err(broadcast::error::RecvError::Lagged(missed)) => {
            println!("Lagged, missed {} messages", missed);
            // Now can receive recent messages
        }
        Err(broadcast::error::RecvError::Closed) => {
            println!("Channel closed");
        }
    }
    
    // After handling Lagged, can continue receiving
    while let Ok(val) = rx.recv().await {
        println!("Received: {}", val);
    }
}

Broadcast receivers must handle Lagged errors for slow consumption scenarios.

When to Use Each

use tokio::sync::{mpsc, broadcast};
 
// Use mpsc when:
// - You want work distribution (each task processed once)
// - You need backpressure (slow consumers slow down producers)
// - You have a pool of workers
// - Message order matters within a single consumer stream
 
// Use broadcast when:
// - All consumers need all messages
// - You're broadcasting events/notifications
// - Consumers can tolerate missing messages if slow
// - You want decoupled subscribers (can join/leave dynamically)
 
// Example: Web server
async fn web_server_example() {
    // mpsc: HTTP request handling
    let (request_tx, request_rx) = mpsc::channel::<Request>(100);
    // Requests distributed among worker tasks
    
    // broadcast: Server events (shutdown, metrics, logging)
    let (event_tx, _) = broadcast::channel::<ServerEvent>(10);
    
    // All subsystems receive shutdown signal
    let mut shutdown_rx1 = event_tx.subscribe();
    let mut shutdown_rx2 = event_tx.subscribe();
    
    // When shutting down:
    // event_tx.send(ServerEvent::Shutdown).unwrap();
    // Both subscribers receive it
}
 
struct Request;
enum ServerEvent {
    Shutdown,
    Metrics,
}

Choose based on whether you need distribution (mpsc) or replication (broadcast).

Summary Comparison

Aspect mpsc broadcast
Message delivery Each message to ONE consumer Each message to ALL consumers
Receiver creation rx.clone() tx.subscribe()
Backpressure Sender waits when full Old messages dropped, Lagged error
Slow consumers Block sender May miss messages
Memory usage Single buffer shared Buffer per subscriber
Use case Work distribution Event notification
Capacity meaning Total messages Per-subscriber buffer
Cloning receivers Competing consumers Independent subscribers
Sender behavior send().await can block send() returns immediately
Join behavior No missed messages Misses messages before subscription

Synthesis

tokio::sync::mpsc and tokio::sync::broadcast serve fundamentally different purposes despite both being multi-consumer channels:

mpsc (Multi-Producer Single-Consumer, extended to multi-consumer via cloning):

  • Distributes messages among competing consumers
  • Each message is processed exactly once by one consumer
  • Provides backpressure—full channels block senders
  • Ideal for work queues, task pools, job distribution
  • Cloning a receiver creates another competing consumer for the same message stream

broadcast:

  • Replicates messages to all subscribers
  • Each message is received by all active consumers
  • No backpressure—slow consumers may miss messages and receive Lagged errors
  • Ideal for event streams, notifications, state broadcasting
  • Subscribing creates an independent consumer that only sees messages from that point forward

Key architectural insight: The choice determines how you think about consumers. With mpsc, consumers are workers processing a shared workload—each message is a unit of work. With broadcast, consumers are subscribers to a stream of events—each message is something to be observed. A slow worker in mpsc slows down the whole pipeline (backpressure), while a slow subscriber in broadcast simply falls behind and may miss messages (no backpressure). This is why broadcast works well for things like shutdown signals and metrics, where every component needs to know, while mpsc works well for request handling where you want parallel processing without duplication.