How do I work with mpsc channels in Rust?
Walkthrough
mpsc stands for Multiple Producer, Single Consumer — a channel pattern where multiple senders can send messages to a single receiver. Rust's std::sync::mpsc module provides channel primitives for inter-thread communication.
Key characteristics:
- Multiple senders — Clone
Senderto share among threads - Single receiver — Only one
Receiverexists (not clonable) - FIFO ordering — Messages arrive in send order
- Ownership transfer — Values sent through channels are moved
- Blocking receive —
recv()waits for messages
Channel types in std::sync::mpsc:
channel()— Creates unbounded (infinite buffer) channelsync_channel(n)— Creates bounded (fixed buffer) channel
Channels are essential for message-passing concurrency, actor patterns, and coordinating threads without shared state.
Code Examples
Basic Channel Usage
use std::thread;
use std::sync::mpsc;
fn main() {
// Create a channel: (sender, receiver)
let (tx, rx) = mpsc::channel();
// Spawn a thread that sends a message
thread::spawn(move || {
let message = String::from("hello from thread");
tx.send(message).unwrap();
});
// Receive the message (blocks until available)
let received = rx.recv().unwrap();
println!("Got: {}", received);
}Multiple Producers (Cloning Sender)
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
// Clone sender for multiple threads
let tx1 = tx.clone();
let tx2 = tx.clone();
// Drop original so receiver knows when all senders are done
drop(tx);
thread::spawn(move || {
tx1.send("Message from thread 1").unwrap();
});
thread::spawn(move || {
tx2.send("Message from thread 2").unwrap();
});
// Receive all messages
for msg in rx {
println!("Received: {}", msg);
}
// Iterator ends when all senders are dropped
}Sending Different Message Types
use std::thread;
use std::sync::mpsc;
enum Message {
Text(String),
Number(i32),
Quit,
}
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec![
Message::Text("Hello".to_string()),
Message::Number(42),
Message::Text("World".to_string()),
Message::Quit,
];
for msg in messages {
tx.send(msg).unwrap();
thread::sleep(std::time::Duration::from_millis(100));
}
});
// Process messages
for msg in rx {
match msg {
Message::Text(s) => println!("Text: {}", s),
Message::Number(n) => println!("Number: {}", n),
Message::Quit => {
println!("Quit signal received");
break;
}
}
}
}Non-Blocking Receive with try_recv
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
tx.send("delayed message").unwrap();
});
// Poll for messages without blocking
loop {
match rx.try_recv() {
Ok(msg) => {
println!("Got: {}", msg);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("No message yet, doing other work...");
thread::sleep(Duration::from_millis(50));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("Sender disconnected");
break;
}
}
}
}Receive with Timeout
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(300));
tx.send("late message").unwrap();
});
// Try to receive with timeout
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(msg) => println!("Received: {}", msg),
Err(mpsc::RecvTimeoutError::Timeout) => {
println!("Timeout - no message received");
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
println!("Sender disconnected");
}
}
// Wait longer for the message
match rx.recv_timeout(Duration::from_millis(500)) {
Ok(msg) => println!("Received: {}", msg),
Err(e) => println!("Error: {:?}", e),
}
}Iterating Over Messages
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// Spawn producer
thread::spawn(move || {
let values = vec![1, 2, 3, 4, 5];
for val in values {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(100));
}
// tx is dropped here, closing the channel
});
// Iterator automatically ends when sender is dropped
println!("Receiving values:");
for val in rx.iter() {
println!(" Got: {}", val);
}
println!("Channel closed");
}Bounded Channel (sync_channel)
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
// Create bounded channel with capacity of 2
let (tx, rx) = mpsc::sync_channel(2);
let tx_clone = tx.clone();
// Producer thread
thread::spawn(move || {
for i in 0..5 {
// send() blocks when buffer is full
match tx_clone.send(i) {
Ok(_) => println!("Sent: {}", i),
Err(_) => {
println!("Receiver dropped");
break;
}
}
thread::sleep(Duration::from_millis(50));
}
});
// Consumer reads slowly
thread::sleep(Duration::from_millis(300));
for val in rx {
println!(" Received: {}", val);
thread::sleep(Duration::from_millis(100));
}
}Try Send with Bounded Channels
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::sync_channel(2);
// Fill the bounded channel
tx.send(1).unwrap();
tx.send(2).unwrap();
// Now try_send would block (or fail with try_send)
match tx.try_send(3) {
Ok(_) => println!("Sent 3"),
Err(mpsc::TrySendError::Full(_)) => {
println!("Channel is full, cannot send");
}
Err(mpsc::TrySendError::Disconnected(_)) => {
println!("Receiver disconnected");
}
}
// Receive to make room
println!("Received: {}", rx.recv().unwrap());
// Now send should work
tx.send(3).unwrap();
println!("Sent 3 after receiving");
// Clean up
drop(tx);
for val in rx {
println!("Final receive: {}", val);
}
}Worker Pool Pattern
use std::thread;
use std::sync::mpsc;
struct Worker {
id: u32,
}
impl Worker {
fn new(id: u32) -> Self {
Self { id }
}
fn process(&self, job: u32) -> u32 {
println!("Worker {} processing job {}", self.id, job);
job * 2
}
}
fn main() {
let (job_tx, job_rx) = mpsc::channel();
let (result_tx, result_rx) = mpsc::channel();
// Spawn worker threads
let num_workers = 4;
for id in 0..num_workers {
let job_rx = job_rx.clone(); // Clone for each worker
let result_tx = result_tx.clone();
thread::spawn(move || {
let worker = Worker::new(id);
// Process jobs until channel closes
for job in job_rx.iter() {
let result = worker.process(job);
result_tx.send((id, result)).unwrap();
}
});
}
// Drop the cloned receivers so the original can be used
drop(job_rx);
drop(result_tx);
// Send jobs
for job in 0..10 {
job_tx.send(job).unwrap();
}
// Close job channel
drop(job_tx);
// Collect results
let mut results: Vec<_> = result_rx.iter().collect();
results.sort_by_key(|(_, r)| *r);
for (worker_id, result) in results {
println!("Worker {} returned: {}", worker_id, result);
}
}Pipeline Pattern
use std::thread;
use std::sync::mpsc;
fn main() {
// Stage 1: Generate numbers
let (tx1, rx1) = mpsc::channel();
thread::spawn(move || {
for i in 1..=5 {
tx1.send(i).unwrap();
}
});
// Stage 2: Square numbers
let (tx2, rx2) = mpsc::channel();
thread::spawn(move || {
for num in rx1 {
tx2.send(num * num).unwrap();
}
});
// Stage 3: Add 10
let (tx3, rx3) = mpsc::channel();
thread::spawn(move || {
for num in rx2 {
tx3.send(num + 10).unwrap();
}
});
// Collect results
for result in rx3 {
println!("Result: {}", result);
}
}Error Handling
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
// Drop sender immediately
drop(tx);
// Receiving on closed channel
match rx.recv() {
Ok(msg) => println!("Got: {}", msg),
Err(e) => println!("Error: {}", e), // "receiving on a closed channel"
}
// Create new channel for demonstration
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send("hello").unwrap();
// tx dropped here
});
// Using recv properly
println!("Got: {}", rx.recv().unwrap());
// Channel is now closed
match rx.recv() {
Ok(msg) => println!("Got: {}", msg),
Err(_) => println!("Channel closed"),
}
}Bidirectional Communication
use std::thread;
use std::sync::mpsc;
fn main() {
// Create two channels for bidirectional communication
let (tx1, rx1) = mpsc::channel(); // Main -> Thread
let (tx2, rx2) = mpsc::channel(); // Thread -> Main
thread::spawn(move || {
// Receive request
let request = rx1.recv().unwrap();
println!("Thread received: {}", request);
// Send response
tx2.send(format!("Response to: {}", request)).unwrap();
});
// Send request
tx1.send("Hello from main".to_string()).unwrap();
// Receive response
let response = rx2.recv().unwrap();
println!("Main received: {}", response);
}Rate Limiting Pattern
use std::thread;
use std::sync::mpsc;
use std::time::{Duration, Instant};
fn main() {
let (tx, rx) = mpsc::sync_channel(10);
// Producer
thread::spawn(move || {
for i in 0..20 {
if tx.send(i).is_err() {
println!("Consumer dropped");
break;
}
println!("Produced: {}", i);
}
});
// Consumer with rate limiting
let mut last_time = Instant::now();
let rate = Duration::from_millis(200);
for msg in rx {
// Rate limit
let elapsed = last_time.elapsed();
if elapsed < rate {
thread::sleep(rate - elapsed);
}
last_time = Instant::now();
println!("Consumed: {}", msg);
}
}Summary
Channel Types:
| Function | Type | Buffer | Send Behavior |
|---|---|---|---|
channel() |
Unbounded | Infinite | Never blocks |
sync_channel(n) |
Bounded | n items | Blocks when full |
Key Methods:
| Method | Description | Blocking? |
|---|---|---|
send(v) |
Send value | Unbounded: No, Bounded: When full |
recv() |
Receive value | Yes |
try_recv() |
Receive or error | No |
recv_timeout(d) |
Receive with timeout | Partial |
try_send(v) |
Send or error | No (bounded only) |
iter() |
Iterate over messages | N/A |
Error Types:
| Error | When it occurs |
|---|---|
RecvError |
All senders dropped, no more messages |
TryRecvError::Empty |
No message available |
TryRecvError::Disconnected |
All senders dropped |
SendError |
Receiver dropped |
TrySendError::Full |
Bounded channel is full |
TrySendError::Disconnected |
Receiver dropped |
When to Use:
| Scenario | Recommended Channel |
|---|---|
| High throughput, no backpressure | channel() (unbounded) |
| Memory constrained | sync_channel(n) (bounded) |
| Producer-consumer | Standard mpsc pattern |
| Multi-stage processing | Pipeline with multiple channels |
Key Points:
Sendercan be cloned for multiple producersReceivercannot be cloned — single consumer only- Channel closes when all senders are dropped
recv()blocks;try_recv()doesn't- Use
sync_channelfor backpressure - Values are moved through the channel
- The receiver iterator ends when channel closes
