How do I work with mpsc channels in Rust?

Walkthrough

mpsc stands for Multiple Producer, Single Consumer — a channel pattern where multiple senders can send messages to a single receiver. Rust's std::sync::mpsc module provides channel primitives for inter-thread communication.

Key characteristics:

  • Multiple senders — Clone Sender to share among threads
  • Single receiver — Only one Receiver exists (not clonable)
  • FIFO ordering — Messages arrive in send order
  • Ownership transfer — Values sent through channels are moved
  • Blocking receiverecv() waits for messages

Channel types in std::sync::mpsc:

  • channel() — Creates unbounded (infinite buffer) channel
  • sync_channel(n) — Creates bounded (fixed buffer) channel

Channels are essential for message-passing concurrency, actor patterns, and coordinating threads without shared state.

Code Examples

Basic Channel Usage

use std::thread;
use std::sync::mpsc;
 
fn main() {
    // Create a channel: (sender, receiver)
    let (tx, rx) = mpsc::channel();
    
    // Spawn a thread that sends a message
    thread::spawn(move || {
        let message = String::from("hello from thread");
        tx.send(message).unwrap();
    });
    
    // Receive the message (blocks until available)
    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

Multiple Producers (Cloning Sender)

use std::thread;
use std::sync::mpsc;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Clone sender for multiple threads
    let tx1 = tx.clone();
    let tx2 = tx.clone();
    
    // Drop original so receiver knows when all senders are done
    drop(tx);
    
    thread::spawn(move || {
        tx1.send("Message from thread 1").unwrap();
    });
    
    thread::spawn(move || {
        tx2.send("Message from thread 2").unwrap();
    });
    
    // Receive all messages
    for msg in rx {
        println!("Received: {}", msg);
    }
    // Iterator ends when all senders are dropped
}

Sending Different Message Types

use std::thread;
use std::sync::mpsc;
 
enum Message {
    Text(String),
    Number(i32),
    Quit,
}
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        let messages = vec![
            Message::Text("Hello".to_string()),
            Message::Number(42),
            Message::Text("World".to_string()),
            Message::Quit,
        ];
        
        for msg in messages {
            tx.send(msg).unwrap();
            thread::sleep(std::time::Duration::from_millis(100));
        }
    });
    
    // Process messages
    for msg in rx {
        match msg {
            Message::Text(s) => println!("Text: {}", s),
            Message::Number(n) => println!("Number: {}", n),
            Message::Quit => {
                println!("Quit signal received");
                break;
            }
        }
    }
}

Non-Blocking Receive with try_recv

use std::thread;
use std::sync::mpsc;
use std::time::Duration;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(200));
        tx.send("delayed message").unwrap();
    });
    
    // Poll for messages without blocking
    loop {
        match rx.try_recv() {
            Ok(msg) => {
                println!("Got: {}", msg);
                break;
            }
            Err(mpsc::TryRecvError::Empty) => {
                println!("No message yet, doing other work...");
                thread::sleep(Duration::from_millis(50));
            }
            Err(mpsc::TryRecvError::Disconnected) => {
                println!("Sender disconnected");
                break;
            }
        }
    }
}

Receive with Timeout

use std::thread;
use std::sync::mpsc;
use std::time::Duration;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(300));
        tx.send("late message").unwrap();
    });
    
    // Try to receive with timeout
    match rx.recv_timeout(Duration::from_millis(100)) {
        Ok(msg) => println!("Received: {}", msg),
        Err(mpsc::RecvTimeoutError::Timeout) => {
            println!("Timeout - no message received");
        }
        Err(mpsc::RecvTimeoutError::Disconnected) => {
            println!("Sender disconnected");
        }
    }
    
    // Wait longer for the message
    match rx.recv_timeout(Duration::from_millis(500)) {
        Ok(msg) => println!("Received: {}", msg),
        Err(e) => println!("Error: {:?}", e),
    }
}

Iterating Over Messages

use std::thread;
use std::sync::mpsc;
use std::time::Duration;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Spawn producer
    thread::spawn(move || {
        let values = vec![1, 2, 3, 4, 5];
        for val in values {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
        // tx is dropped here, closing the channel
    });
    
    // Iterator automatically ends when sender is dropped
    println!("Receiving values:");
    for val in rx.iter() {
        println!("  Got: {}", val);
    }
    
    println!("Channel closed");
}

Bounded Channel (sync_channel)

use std::thread;
use std::sync::mpsc;
use std::time::Duration;
 
fn main() {
    // Create bounded channel with capacity of 2
    let (tx, rx) = mpsc::sync_channel(2);
    
    let tx_clone = tx.clone();
    
    // Producer thread
    thread::spawn(move || {
        for i in 0..5 {
            // send() blocks when buffer is full
            match tx_clone.send(i) {
                Ok(_) => println!("Sent: {}", i),
                Err(_) => {
                    println!("Receiver dropped");
                    break;
                }
            }
            thread::sleep(Duration::from_millis(50));
        }
    });
    
    // Consumer reads slowly
    thread::sleep(Duration::from_millis(300));
    
    for val in rx {
        println!("  Received: {}", val);
        thread::sleep(Duration::from_millis(100));
    }
}

Try Send with Bounded Channels

use std::thread;
use std::sync::mpsc;
use std::time::Duration;
 
fn main() {
    let (tx, rx) = mpsc::sync_channel(2);
    
    // Fill the bounded channel
    tx.send(1).unwrap();
    tx.send(2).unwrap();
    
    // Now try_send would block (or fail with try_send)
    match tx.try_send(3) {
        Ok(_) => println!("Sent 3"),
        Err(mpsc::TrySendError::Full(_)) => {
            println!("Channel is full, cannot send");
        }
        Err(mpsc::TrySendError::Disconnected(_)) => {
            println!("Receiver disconnected");
        }
    }
    
    // Receive to make room
    println!("Received: {}", rx.recv().unwrap());
    
    // Now send should work
    tx.send(3).unwrap();
    println!("Sent 3 after receiving");
    
    // Clean up
    drop(tx);
    for val in rx {
        println!("Final receive: {}", val);
    }
}

Worker Pool Pattern

use std::thread;
use std::sync::mpsc;
 
struct Worker {
    id: u32,
}
 
impl Worker {
    fn new(id: u32) -> Self {
        Self { id }
    }
    
    fn process(&self, job: u32) -> u32 {
        println!("Worker {} processing job {}", self.id, job);
        job * 2
    }
}
 
fn main() {
    let (job_tx, job_rx) = mpsc::channel();
    let (result_tx, result_rx) = mpsc::channel();
    
    // Spawn worker threads
    let num_workers = 4;
    for id in 0..num_workers {
        let job_rx = job_rx.clone(); // Clone for each worker
        let result_tx = result_tx.clone();
        
        thread::spawn(move || {
            let worker = Worker::new(id);
            
            // Process jobs until channel closes
            for job in job_rx.iter() {
                let result = worker.process(job);
                result_tx.send((id, result)).unwrap();
            }
        });
    }
    
    // Drop the cloned receivers so the original can be used
    drop(job_rx);
    drop(result_tx);
    
    // Send jobs
    for job in 0..10 {
        job_tx.send(job).unwrap();
    }
    
    // Close job channel
    drop(job_tx);
    
    // Collect results
    let mut results: Vec<_> = result_rx.iter().collect();
    results.sort_by_key(|(_, r)| *r);
    
    for (worker_id, result) in results {
        println!("Worker {} returned: {}", worker_id, result);
    }
}

Pipeline Pattern

use std::thread;
use std::sync::mpsc;
 
fn main() {
    // Stage 1: Generate numbers
    let (tx1, rx1) = mpsc::channel();
    
    thread::spawn(move || {
        for i in 1..=5 {
            tx1.send(i).unwrap();
        }
    });
    
    // Stage 2: Square numbers
    let (tx2, rx2) = mpsc::channel();
    
    thread::spawn(move || {
        for num in rx1 {
            tx2.send(num * num).unwrap();
        }
    });
    
    // Stage 3: Add 10
    let (tx3, rx3) = mpsc::channel();
    
    thread::spawn(move || {
        for num in rx2 {
            tx3.send(num + 10).unwrap();
        }
    });
    
    // Collect results
    for result in rx3 {
        println!("Result: {}", result);
    }
}

Error Handling

use std::thread;
use std::sync::mpsc;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Drop sender immediately
    drop(tx);
    
    // Receiving on closed channel
    match rx.recv() {
        Ok(msg) => println!("Got: {}", msg),
        Err(e) => println!("Error: {}", e), // "receiving on a closed channel"
    }
    
    // Create new channel for demonstration
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        tx.send("hello").unwrap();
        // tx dropped here
    });
    
    // Using recv properly
    println!("Got: {}", rx.recv().unwrap());
    
    // Channel is now closed
    match rx.recv() {
        Ok(msg) => println!("Got: {}", msg),
        Err(_) => println!("Channel closed"),
    }
}

Bidirectional Communication

use std::thread;
use std::sync::mpsc;
 
fn main() {
    // Create two channels for bidirectional communication
    let (tx1, rx1) = mpsc::channel(); // Main -> Thread
    let (tx2, rx2) = mpsc::channel(); // Thread -> Main
    
    thread::spawn(move || {
        // Receive request
        let request = rx1.recv().unwrap();
        println!("Thread received: {}", request);
        
        // Send response
        tx2.send(format!("Response to: {}", request)).unwrap();
    });
    
    // Send request
    tx1.send("Hello from main".to_string()).unwrap();
    
    // Receive response
    let response = rx2.recv().unwrap();
    println!("Main received: {}", response);
}

Rate Limiting Pattern

use std::thread;
use std::sync::mpsc;
use std::time::{Duration, Instant};
 
fn main() {
    let (tx, rx) = mpsc::sync_channel(10);
    
    // Producer
    thread::spawn(move || {
        for i in 0..20 {
            if tx.send(i).is_err() {
                println!("Consumer dropped");
                break;
            }
            println!("Produced: {}", i);
        }
    });
    
    // Consumer with rate limiting
    let mut last_time = Instant::now();
    let rate = Duration::from_millis(200);
    
    for msg in rx {
        // Rate limit
        let elapsed = last_time.elapsed();
        if elapsed < rate {
            thread::sleep(rate - elapsed);
        }
        last_time = Instant::now();
        
        println!("Consumed: {}", msg);
    }
}

Summary

Channel Types:

Function Type Buffer Send Behavior
channel() Unbounded Infinite Never blocks
sync_channel(n) Bounded n items Blocks when full

Key Methods:

Method Description Blocking?
send(v) Send value Unbounded: No, Bounded: When full
recv() Receive value Yes
try_recv() Receive or error No
recv_timeout(d) Receive with timeout Partial
try_send(v) Send or error No (bounded only)
iter() Iterate over messages N/A

Error Types:

Error When it occurs
RecvError All senders dropped, no more messages
TryRecvError::Empty No message available
TryRecvError::Disconnected All senders dropped
SendError Receiver dropped
TrySendError::Full Bounded channel is full
TrySendError::Disconnected Receiver dropped

When to Use:

Scenario Recommended Channel
High throughput, no backpressure channel() (unbounded)
Memory constrained sync_channel(n) (bounded)
Producer-consumer Standard mpsc pattern
Multi-stage processing Pipeline with multiple channels

Key Points:

  • Sender can be cloned for multiple producers
  • Receiver cannot be cloned — single consumer only
  • Channel closes when all senders are dropped
  • recv() blocks; try_recv() doesn't
  • Use sync_channel for backpressure
  • Values are moved through the channel
  • The receiver iterator ends when channel closes