How do I work with Channels for Message Passing in Rust?
Walkthrough
Channels provide a way for threads to communicate by passing messages rather than sharing memory. Rust's standard library provides mpsc (multiple producer, single consumer) channels, and the crossbeam crate offers additional channel types with different semantics.
Key concepts:
- mpsc — Multiple Producer, Single Consumer (std::sync::mpsc)
- spmc — Single Producer, Multiple Consumer (crossbeam)
- mpmc — Multiple Producer, Multiple Consumer (crossbeam, flume)
- bounded — Channel with a fixed capacity (can block)
- unbounded — Channel with unlimited capacity (no blocking on send)
When to use channels:
- Actor-based concurrency patterns
- Worker pool communication
- Event streaming between threads
- Pipeline processing
- Decoupling producers from consumers
When NOT to use channels:
- Simple shared state (use
MutexorArc) - Real-time low-latency requirements
- When you need random access to shared data
Code Examples
Basic Channel Usage
use std::sync::mpsc;
use std::thread;
fn main() {
// Create a channel
let (tx, rx) = mpsc::channel();
// Spawn a thread that sends a message
thread::spawn(move || {
let message = String::from("Hello from thread!");
tx.send(message).unwrap();
});
// Receive the message in main thread
let received = rx.recv().unwrap();
println!("Received: {}", received);
}Multiple Producers (mpsc)
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Clone the transmitter for multiple producers
let tx1 = tx.clone();
let tx2 = tx.clone();
// Drop original tx so channel can close properly
drop(tx);
thread::spawn(move || {
tx1.send(String::from("Message from thread 1")).unwrap();
});
thread::spawn(move || {
tx2.send(String::from("Message from thread 2")).unwrap();
});
// Receive all messages
for received in rx {
println!("Received: {}", received);
}
}Using try_recv (Non-blocking)
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
tx.send(42).unwrap();
});
// Non-blocking receive
loop {
match rx.try_recv() {
Ok(value) => {
println!("Received: {}", value);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("No message yet, doing other work...");
thread::sleep(Duration::from_millis(50));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("Channel closed");
break;
}
}
}
}Using recv_timeout
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
tx.send(String::from("Delayed message")).unwrap();
});
loop {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(message) => {
println!("Received: {}", message);
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
println!("Timeout, waiting again...");
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
println!("Channel closed");
break;
}
}
}
}Worker Pool with Channels
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
fn new(size: usize) -> Self {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(mpsc::Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
let receiver = Arc::clone(&receiver);
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} executing job", id);
job();
}
});
workers.push(Worker { id, thread });
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Processing task {}", i);
thread::sleep(std::time::Duration::from_millis(100));
});
}
thread::sleep(std::time::Duration::from_millis(500));
}Note: The above needs use std::sync::Mutex; for the Mutex import.
Bounded Channel (crossbeam)
// Add to Cargo.toml: crossbeam = "0.8"
use crossbeam::channel::{bounded, select};
use std::thread;
use std::time::Duration;
fn main() {
// Create a bounded channel with capacity 3
let (tx, rx) = bounded(3);
thread::spawn(move || {
for i in 0..10 {
tx.send(i).unwrap();
println!("Sent: {}", i);
}
});
thread::spawn(move || {
for received in rx {
println!("Received: {}", received);
thread::sleep(Duration::from_millis(100));
}
});
thread::sleep(Duration::from_millis(1500));
}Unbounded Channel (crossbeam)
use crossbeam::channel::unbounded;
use std::thread;
fn main() {
let (tx, rx) = unbounded();
// Send many messages without blocking
for i in 0..1000 {
tx.send(i).unwrap();
}
// Drop sender to close channel
drop(tx);
// Receive all messages
let sum: i32 = rx.iter().sum();
println!("Sum: {}", sum);
}Select Multiple Channels
use crossbeam::channel::{bounded, select};
use std::thread;
use std::time::Duration;
fn main() {
let (tx1, rx1) = bounded::<&str>(1);
let (tx2, rx2) = bounded::<&str>(1);
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
tx1.send("from channel 1").unwrap();
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx2.send("from channel 2").unwrap();
});
// Wait for first message from either channel
select! {
recv(rx1) -> msg => println!("Received {}", msg.unwrap()),
recv(rx2) -> msg => println!("Received {}", msg.unwrap()),
}
}Pipeline Pattern
use std::sync::mpsc;
use std::thread;
fn main() {
// Stage 1: Generate numbers
let (tx1, rx1) = mpsc::channel();
thread::spawn(move || {
for i in 1..=5 {
tx1.send(i).unwrap();
}
});
// Stage 2: Square numbers
let (tx2, rx2) = mpsc::channel();
thread::spawn(move || {
for num in rx1 {
tx2.send(num * num).unwrap();
}
});
// Stage 3: Print results
for result in rx2 {
println!("Squared: {}", result);
}
}Fan-out Pattern
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Producer
thread::spawn(move || {
for i in 1..=10 {
tx.send(i).unwrap();
}
});
// Multiple consumers (fan-out)
let rx = std::sync::Arc::new(std::sync::Mutex::new(rx));
let mut handles = vec![];
for worker_id in 0..3 {
let rx = Arc::clone(&rx);
let handle = thread::spawn(move || {
loop {
let msg = rx.lock().unwrap().recv();
match msg {
Ok(value) => println!("Worker {} got: {}", worker_id, value),
Err(_) => break,
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
use std::sync::Arc;Fan-in Pattern
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Multiple producers (fan-in)
let mut handles = vec![];
for producer_id in 0..3 {
let tx = tx.clone();
let handle = thread::spawn(move || {
for i in 0..3 {
let msg = format!("Producer {}: msg {}", producer_id, i);
tx.send(msg).unwrap();
}
});
handles.push(handle);
}
// Drop the original sender
drop(tx);
// Single consumer receives from all
for received in rx {
println!("Received: {}", received);
}
for handle in handles {
handle.join().unwrap();
}
}Actor Pattern
use std::sync::mpsc;
use std::thread;
enum Message {
Increment,
Decrement,
GetValue(mpsc::Sender<i32>),
}
struct CounterActor {
count: i32,
receiver: mpsc::Receiver<Message>,
}
impl CounterActor {
fn new(receiver: mpsc::Receiver<Message>) -> Self {
Self { count: 0, receiver }
}
fn run(&mut self) {
while let Ok(msg) = self.receiver.recv() {
match msg {
Message::Increment => self.count += 1,
Message::Decrement => self.count -= 1,
Message::GetValue(tx) => {
tx.send(self.count).unwrap();
}
}
}
}
}
struct CounterHandle {
sender: mpsc::Sender<Message>,
}
impl CounterHandle {
fn new() -> (Self, mpsc::Receiver<Message>) {
let (tx, rx) = mpsc::channel();
(Self { sender: tx }, rx)
}
fn increment(&self) {
self.sender.send(Message::Increment).unwrap();
}
fn decrement(&self) {
self.sender.send(Message::Decrement).unwrap();
}
fn get_value(&self) -> i32 {
let (tx, rx) = mpsc::channel();
self.sender.send(Message::GetValue(tx)).unwrap();
rx.recv().unwrap()
}
}
fn main() {
let (handle, receiver) = CounterHandle::new();
// Spawn actor thread
thread::spawn(move || {
let mut actor = CounterActor::new(receiver);
actor.run();
});
handle.increment();
handle.increment();
handle.decrement();
println!("Value: {}", handle.get_value());
}Using flume for MPMC
// Add to Cargo.toml: flume = "0.11"
use flume;
use std::thread;
fn main() {
let (tx, rx) = flume::bounded(10);
// Multiple producers
let mut producers = vec![];
for i in 0..3 {
let tx = tx.clone();
producers.push(thread::spawn(move || {
for j in 0..3 {
tx.send((i, j)).unwrap();
}
}));
}
drop(tx);
// Multiple consumers
let mut consumers = vec![];
for _ in 0..2 {
let rx = rx.clone();
consumers.push(thread::spawn(move || {
while let Ok((p, n)) = rx.recv() {
println!("Consumer received: producer={}, num={}", p, n);
}
}));
}
drop(rx);
for p in producers {
p.join().unwrap();
}
for c in consumers {
c.join().unwrap();
}
}Channel Error Handling
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Sender error handling
thread::spawn(move || {
if tx.send(42).is_err() {
println!("Receiver was dropped before receiving");
}
});
// Receiver error handling
match rx.recv() {
Ok(value) => println!("Received: {}", value),
Err(mpsc::RecvError) => println!("All senders were dropped"),
}
// Try_recv error handling
let (tx2, rx2) = mpsc::channel();
drop(tx2); // Close sender
match rx2.try_recv() {
Ok(value) => println!("Got: {}", value),
Err(mpsc::TryRecvError::Empty) => println!("Channel empty"),
Err(mpsc::TryRecvError::Disconnected) => println!("Channel closed"),
}
}Channel with Structured Messages
use std::sync::mpsc;
use std::thread;
#[derive(Debug)]
enum Command {
Set { key: String, value: i32 },
Get { key: String, respond_to: mpsc::Sender<Option<i32>> },
Delete { key: String },
}
struct KeyValueStore {
data: std::collections::HashMap<String, i32>,
receiver: mpsc::Receiver<Command>,
}
impl KeyValueStore {
fn new(receiver: mpsc::Receiver<Command>) -> Self {
Self {
data: std::collections::HashMap::new(),
receiver,
}
}
fn run(&mut self) {
while let Ok(cmd) = self.receiver.recv() {
match cmd {
Command::Set { key, value } => {
self.data.insert(key, value);
}
Command::Get { key, respond_to } => {
let value = self.data.get(&key).copied();
respond_to.send(value).unwrap();
}
Command::Delete { key } => {
self.data.remove(&key);
}
}
}
}
}
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut store = KeyValueStore::new(rx);
store.run();
});
// Set values
tx.send(Command::Set { key: String::from("a"), value: 1 }).unwrap();
tx.send(Command::Set { key: String::from("b"), value: 2 }).unwrap();
// Get value
let (respond_tx, respond_rx) = mpsc::channel();
tx.send(Command::Get { key: String::from("a"), respond_to: respond_tx }).unwrap();
println!("Value of 'a': {:?}", respond_rx.recv().unwrap());
}Summary
Standard Library Channel Types (std::sync::mpsc):
| Type | Description |
|---|---|
Sender<T> |
Sending end (cloneable for multiple producers) |
Receiver<T> |
Receiving end (single consumer) |
channel() |
Create unbounded channel |
sync_channel(n) |
Create bounded channel with capacity n |
Receiver Methods:
| Method | Blocking? | Description |
|---|---|---|
recv() |
Yes | Block until message received |
try_recv() |
No | Return immediately, may error |
recv_timeout(dur) |
Partial | Block with timeout |
iter() |
No | Iterate over received messages |
Channel Error Types:
| Error | When It Occurs |
|---|---|
RecvError |
All senders dropped |
TryRecvError::Empty |
No message available |
TryRecvError::Disconnected |
All senders dropped |
SendError |
Receiver dropped |
Channel Patterns:
| Pattern | Description |
|---|---|
| mpsc | Multiple producers, single consumer |
| Pipeline | Chain of processing stages |
| Fan-out | One producer, multiple consumers |
| Fan-in | Multiple producers, one consumer |
| Actor | Message-driven isolated state |
| Worker pool | Distribute work to workers |
Channel Crate Comparison:
| Crate | Type | Features |
|---|---|---|
| std::sync::mpsc | mpsc | Standard library, simple |
| crossbeam | mpsc, spmc, mpmc | Bounded/unbounded, select |
| flume | mpmc | Fast, ergonomic API |
| tokio::sync::mpsc | mpsc | Async channels |
Key Points:
- Channels provide message-passing concurrency
- Standard library provides mpsc channels
sync_channel(n)creates bounded channels- Clone
Senderfor multiple producers - Use
try_recv()orrecv_timeout()for non-blocking - Consider crossbeam or flume for advanced features
- Actor pattern isolates mutable state
- Pipelines chain processing stages
- Fan-out distributes work to multiple consumers
- Fan-in aggregates from multiple producers
