Loading page…
Rust walkthroughs
Loading page…
mpsc stands for Multiple Producer, Single Consumer — a channel pattern where multiple senders can send messages to a single receiver. Rust's std::sync::mpsc module provides channel primitives for inter-thread communication.
Key characteristics:
Sender to share among threadsReceiver exists (not clonable)recv() waits for messagesChannel types in std::sync::mpsc:
channel() — Creates unbounded (infinite buffer) channelsync_channel(n) — Creates bounded (fixed buffer) channelChannels are essential for message-passing concurrency, actor patterns, and coordinating threads without shared state.
use std::thread;
use std::sync::mpsc;
fn main() {
// Create a channel: (sender, receiver)
let (tx, rx) = mpsc::channel();
// Spawn a thread that sends a message
thread::spawn(move || {
let message = String::from("hello from thread");
tx.send(message).unwrap();
});
// Receive the message (blocks until available)
let received = rx.recv().unwrap();
println!("Got: {}", received);
}use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
// Clone sender for multiple threads
let tx1 = tx.clone();
let tx2 = tx.clone();
// Drop original so receiver knows when all senders are done
drop(tx);
thread::spawn(move || {
tx1.send("Message from thread 1").unwrap();
});
thread::spawn(move || {
tx2.send("Message from thread 2").unwrap();
});
// Receive all messages
for msg in rx {
println!("Received: {}", msg);
}
// Iterator ends when all senders are dropped
}use std::thread;
use std::sync::mpsc;
enum Message {
Text(String),
Number(i32),
Quit,
}
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec![
Message::Text("Hello".to_string()),
Message::Number(42),
Message::Text("World".to_string()),
Message::Quit,
];
for msg in messages {
tx.send(msg).unwrap();
thread::sleep(std::time::Duration::from_millis(100));
}
});
// Process messages
for msg in rx {
match msg {
Message::Text(s) => println!("Text: {}", s),
Message::Number(n) => println!("Number: {}", n),
Message::Quit => {
println!("Quit signal received");
break;
}
}
}
}use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
tx.send("delayed message").unwrap();
});
// Poll for messages without blocking
loop {
match rx.try_recv() {
Ok(msg) => {
println!("Got: {}", msg);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("No message yet, doing other work...");
thread::sleep(Duration::from_millis(50));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("Sender disconnected");
break;
}
}
}
}use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(300));
tx.send("late message").unwrap();
});
// Try to receive with timeout
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(msg) => println!("Received: {}", msg),
Err(mpsc::RecvTimeoutError::Timeout) => {
println!("Timeout - no message received");
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
println!("Sender disconnected");
}
}
// Wait longer for the message
match rx.recv_timeout(Duration::from_millis(500)) {
Ok(msg) => println!("Received: {}", msg),
Err(e) => println!("Error: {:?}", e),
}
}use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// Spawn producer
thread::spawn(move || {
let values = vec![1, 2, 3, 4, 5];
for val in values {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(100));
}
// tx is dropped here, closing the channel
});
// Iterator automatically ends when sender is dropped
println!("Receiving values:");
for val in rx.iter() {
println!(" Got: {}", val);
}
println!("Channel closed");
}use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
// Create bounded channel with capacity of 2
let (tx, rx) = mpsc::sync_channel(2);
let tx_clone = tx.clone();
// Producer thread
thread::spawn(move || {
for i in 0..5 {
// send() blocks when buffer is full
match tx_clone.send(i) {
Ok(_) => println!("Sent: {}", i),
Err(_) => {
println!("Receiver dropped");
break;
}
}
thread::sleep(Duration::from_millis(50));
}
});
// Consumer reads slowly
thread::sleep(Duration::from_millis(300));
for val in rx {
println!(" Received: {}", val);
thread::sleep(Duration::from_millis(100));
}
}use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::sync_channel(2);
// Fill the bounded channel
tx.send(1).unwrap();
tx.send(2).unwrap();
// Now try_send would block (or fail with try_send)
match tx.try_send(3) {
Ok(_) => println!("Sent 3"),
Err(mpsc::TrySendError::Full(_)) => {
println!("Channel is full, cannot send");
}
Err(mpsc::TrySendError::Disconnected(_)) => {
println!("Receiver disconnected");
}
}
// Receive to make room
println!("Received: {}", rx.recv().unwrap());
// Now send should work
tx.send(3).unwrap();
println!("Sent 3 after receiving");
// Clean up
drop(tx);
for val in rx {
println!("Final receive: {}", val);
}
}use std::thread;
use std::sync::mpsc;
struct Worker {
id: u32,
}
impl Worker {
fn new(id: u32) -> Self {
Self { id }
}
fn process(&self, job: u32) -> u32 {
println!("Worker {} processing job {}", self.id, job);
job * 2
}
}
fn main() {
let (job_tx, job_rx) = mpsc::channel();
let (result_tx, result_rx) = mpsc::channel();
// Spawn worker threads
let num_workers = 4;
for id in 0..num_workers {
let job_rx = job_rx.clone(); // Clone for each worker
let result_tx = result_tx.clone();
thread::spawn(move || {
let worker = Worker::new(id);
// Process jobs until channel closes
for job in job_rx.iter() {
let result = worker.process(job);
result_tx.send((id, result)).unwrap();
}
});
}
// Drop the cloned receivers so the original can be used
drop(job_rx);
drop(result_tx);
// Send jobs
for job in 0..10 {
job_tx.send(job).unwrap();
}
// Close job channel
drop(job_tx);
// Collect results
let mut results: Vec<_> = result_rx.iter().collect();
results.sort_by_key(|(_, r)| *r);
for (worker_id, result) in results {
println!("Worker {} returned: {}", worker_id, result);
}
}use std::thread;
use std::sync::mpsc;
fn main() {
// Stage 1: Generate numbers
let (tx1, rx1) = mpsc::channel();
thread::spawn(move || {
for i in 1..=5 {
tx1.send(i).unwrap();
}
});
// Stage 2: Square numbers
let (tx2, rx2) = mpsc::channel();
thread::spawn(move || {
for num in rx1 {
tx2.send(num * num).unwrap();
}
});
// Stage 3: Add 10
let (tx3, rx3) = mpsc::channel();
thread::spawn(move || {
for num in rx2 {
tx3.send(num + 10).unwrap();
}
});
// Collect results
for result in rx3 {
println!("Result: {}", result);
}
}use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
// Drop sender immediately
drop(tx);
// Receiving on closed channel
match rx.recv() {
Ok(msg) => println!("Got: {}", msg),
Err(e) => println!("Error: {}", e), // "receiving on a closed channel"
}
// Create new channel for demonstration
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send("hello").unwrap();
// tx dropped here
});
// Using recv properly
println!("Got: {}", rx.recv().unwrap());
// Channel is now closed
match rx.recv() {
Ok(msg) => println!("Got: {}", msg),
Err(_) => println!("Channel closed"),
}
}use std::thread;
use std::sync::mpsc;
fn main() {
// Create two channels for bidirectional communication
let (tx1, rx1) = mpsc::channel(); // Main -> Thread
let (tx2, rx2) = mpsc::channel(); // Thread -> Main
thread::spawn(move || {
// Receive request
let request = rx1.recv().unwrap();
println!("Thread received: {}", request);
// Send response
tx2.send(format!("Response to: {}", request)).unwrap();
});
// Send request
tx1.send("Hello from main".to_string()).unwrap();
// Receive response
let response = rx2.recv().unwrap();
println!("Main received: {}", response);
}use std::thread;
use std::sync::mpsc;
use std::time::{Duration, Instant};
fn main() {
let (tx, rx) = mpsc::sync_channel(10);
// Producer
thread::spawn(move || {
for i in 0..20 {
if tx.send(i).is_err() {
println!("Consumer dropped");
break;
}
println!("Produced: {}", i);
}
});
// Consumer with rate limiting
let mut last_time = Instant::now();
let rate = Duration::from_millis(200);
for msg in rx {
// Rate limit
let elapsed = last_time.elapsed();
if elapsed < rate {
thread::sleep(rate - elapsed);
}
last_time = Instant::now();
println!("Consumed: {}", msg);
}
}Channel Types:
| Function | Type | Buffer | Send Behavior |
|----------|------|--------|---------------|
| channel() | Unbounded | Infinite | Never blocks |
| sync_channel(n) | Bounded | n items | Blocks when full |
Key Methods:
| Method | Description | Blocking? |
|--------|-------------|-----------|
| send(v) | Send value | Unbounded: No, Bounded: When full |
| recv() | Receive value | Yes |
| try_recv() | Receive or error | No |
| recv_timeout(d) | Receive with timeout | Partial |
| try_send(v) | Send or error | No (bounded only) |
| iter() | Iterate over messages | N/A |
Error Types:
| Error | When it occurs |
|-------|---------------|
| RecvError | All senders dropped, no more messages |
| TryRecvError::Empty | No message available |
| TryRecvError::Disconnected | All senders dropped |
| SendError | Receiver dropped |
| TrySendError::Full | Bounded channel is full |
| TrySendError::Disconnected | Receiver dropped |
When to Use:
| Scenario | Recommended Channel |
|----------|-------------------|
| High throughput, no backpressure | channel() (unbounded) |
| Memory constrained | sync_channel(n) (bounded) |
| Producer-consumer | Standard mpsc pattern |
| Multi-stage processing | Pipeline with multiple channels |
Key Points:
Sender can be cloned for multiple producersReceiver cannot be cloned — single consumer onlyrecv() blocks; try_recv() doesn'tsync_channel for backpressure