How do I work with Channels for Message Passing in Rust?

Walkthrough

Channels provide a way for threads to communicate by passing messages rather than sharing memory. Rust's standard library provides mpsc (multiple producer, single consumer) channels, and the crossbeam crate offers additional channel types with different semantics.

Key concepts:

  • mpsc — Multiple Producer, Single Consumer (std::sync::mpsc)
  • spmc — Single Producer, Multiple Consumer (crossbeam)
  • mpmc — Multiple Producer, Multiple Consumer (crossbeam, flume)
  • bounded — Channel with a fixed capacity (can block)
  • unbounded — Channel with unlimited capacity (no blocking on send)

When to use channels:

  • Actor-based concurrency patterns
  • Worker pool communication
  • Event streaming between threads
  • Pipeline processing
  • Decoupling producers from consumers

When NOT to use channels:

  • Simple shared state (use Mutex or Arc)
  • Real-time low-latency requirements
  • When you need random access to shared data

Code Examples

Basic Channel Usage

use std::sync::mpsc;
use std::thread;
 
fn main() {
    // Create a channel
    let (tx, rx) = mpsc::channel();
    
    // Spawn a thread that sends a message
    thread::spawn(move || {
        let message = String::from("Hello from thread!");
        tx.send(message).unwrap();
    });
    
    // Receive the message in main thread
    let received = rx.recv().unwrap();
    println!("Received: {}", received);
}

Multiple Producers (mpsc)

use std::sync::mpsc;
use std::thread;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Clone the transmitter for multiple producers
    let tx1 = tx.clone();
    let tx2 = tx.clone();
    
    // Drop original tx so channel can close properly
    drop(tx);
    
    thread::spawn(move || {
        tx1.send(String::from("Message from thread 1")).unwrap();
    });
    
    thread::spawn(move || {
        tx2.send(String::from("Message from thread 2")).unwrap();
    });
    
    // Receive all messages
    for received in rx {
        println!("Received: {}", received);
    }
}

Using try_recv (Non-blocking)

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(100));
        tx.send(42).unwrap();
    });
    
    // Non-blocking receive
    loop {
        match rx.try_recv() {
            Ok(value) => {
                println!("Received: {}", value);
                break;
            }
            Err(mpsc::TryRecvError::Empty) => {
                println!("No message yet, doing other work...");
                thread::sleep(Duration::from_millis(50));
            }
            Err(mpsc::TryRecvError::Disconnected) => {
                println!("Channel closed");
                break;
            }
        }
    }
}

Using recv_timeout

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(200));
        tx.send(String::from("Delayed message")).unwrap();
    });
    
    loop {
        match rx.recv_timeout(Duration::from_millis(100)) {
            Ok(message) => {
                println!("Received: {}", message);
                break;
            }
            Err(mpsc::RecvTimeoutError::Timeout) => {
                println!("Timeout, waiting again...");
            }
            Err(mpsc::RecvTimeoutError::Disconnected) => {
                println!("Channel closed");
                break;
            }
        }
    }
}

Worker Pool with Channels

use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
 
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
 
struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}
 
type Job = Box<dyn FnOnce() + Send + 'static>;
 
impl ThreadPool {
    fn new(size: usize) -> Self {
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(mpsc::Mutex::new(receiver));
        
        let mut workers = Vec::with_capacity(size);
        
        for id in 0..size {
            let receiver = Arc::clone(&receiver);
            
            let thread = thread::spawn(move || {
                loop {
                    let job = receiver.lock().unwrap().recv().unwrap();
                    println!("Worker {} executing job", id);
                    job();
                }
            });
            
            workers.push(Worker { id, thread });
        }
        
        ThreadPool { workers, sender }
    }
    
    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}
 
fn main() {
    let pool = ThreadPool::new(4);
    
    for i in 0..8 {
        pool.execute(move || {
            println!("Processing task {}", i);
            thread::sleep(std::time::Duration::from_millis(100));
        });
    }
    
    thread::sleep(std::time::Duration::from_millis(500));
}

Note: The above needs use std::sync::Mutex; for the Mutex import.

Bounded Channel (crossbeam)

// Add to Cargo.toml: crossbeam = "0.8"
use crossbeam::channel::{bounded, select};
use std::thread;
use std::time::Duration;
 
fn main() {
    // Create a bounded channel with capacity 3
    let (tx, rx) = bounded(3);
    
    thread::spawn(move || {
        for i in 0..10 {
            tx.send(i).unwrap();
            println!("Sent: {}", i);
        }
    });
    
    thread::spawn(move || {
        for received in rx {
            println!("Received: {}", received);
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    thread::sleep(Duration::from_millis(1500));
}

Unbounded Channel (crossbeam)

use crossbeam::channel::unbounded;
use std::thread;
 
fn main() {
    let (tx, rx) = unbounded();
    
    // Send many messages without blocking
    for i in 0..1000 {
        tx.send(i).unwrap();
    }
    
    // Drop sender to close channel
    drop(tx);
    
    // Receive all messages
    let sum: i32 = rx.iter().sum();
    println!("Sum: {}", sum);
}

Select Multiple Channels

use crossbeam::channel::{bounded, select};
use std::thread;
use std::time::Duration;
 
fn main() {
    let (tx1, rx1) = bounded::<&str>(1);
    let (tx2, rx2) = bounded::<&str>(1);
    
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(100));
        tx1.send("from channel 1").unwrap();
    });
    
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(50));
        tx2.send("from channel 2").unwrap();
    });
    
    // Wait for first message from either channel
    select! {
        recv(rx1) -> msg => println!("Received {}", msg.unwrap()),
        recv(rx2) -> msg => println!("Received {}", msg.unwrap()),
    }
}

Pipeline Pattern

use std::sync::mpsc;
use std::thread;
 
fn main() {
    // Stage 1: Generate numbers
    let (tx1, rx1) = mpsc::channel();
    
    thread::spawn(move || {
        for i in 1..=5 {
            tx1.send(i).unwrap();
        }
    });
    
    // Stage 2: Square numbers
    let (tx2, rx2) = mpsc::channel();
    
    thread::spawn(move || {
        for num in rx1 {
            tx2.send(num * num).unwrap();
        }
    });
    
    // Stage 3: Print results
    for result in rx2 {
        println!("Squared: {}", result);
    }
}

Fan-out Pattern

use std::sync::mpsc;
use std::thread;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Producer
    thread::spawn(move || {
        for i in 1..=10 {
            tx.send(i).unwrap();
        }
    });
    
    // Multiple consumers (fan-out)
    let rx = std::sync::Arc::new(std::sync::Mutex::new(rx));
    let mut handles = vec![];
    
    for worker_id in 0..3 {
        let rx = Arc::clone(&rx);
        let handle = thread::spawn(move || {
            loop {
                let msg = rx.lock().unwrap().recv();
                match msg {
                    Ok(value) => println!("Worker {} got: {}", worker_id, value),
                    Err(_) => break,
                }
            }
        });
        handles.push(handle);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}
 
use std::sync::Arc;

Fan-in Pattern

use std::sync::mpsc;
use std::thread;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Multiple producers (fan-in)
    let mut handles = vec![];
    
    for producer_id in 0..3 {
        let tx = tx.clone();
        let handle = thread::spawn(move || {
            for i in 0..3 {
                let msg = format!("Producer {}: msg {}", producer_id, i);
                tx.send(msg).unwrap();
            }
        });
        handles.push(handle);
    }
    
    // Drop the original sender
    drop(tx);
    
    // Single consumer receives from all
    for received in rx {
        println!("Received: {}", received);
    }
    
    for handle in handles {
        handle.join().unwrap();
    }
}

Actor Pattern

use std::sync::mpsc;
use std::thread;
 
enum Message {
    Increment,
    Decrement,
    GetValue(mpsc::Sender<i32>),
}
 
struct CounterActor {
    count: i32,
    receiver: mpsc::Receiver<Message>,
}
 
impl CounterActor {
    fn new(receiver: mpsc::Receiver<Message>) -> Self {
        Self { count: 0, receiver }
    }
    
    fn run(&mut self) {
        while let Ok(msg) = self.receiver.recv() {
            match msg {
                Message::Increment => self.count += 1,
                Message::Decrement => self.count -= 1,
                Message::GetValue(tx) => {
                    tx.send(self.count).unwrap();
                }
            }
        }
    }
}
 
struct CounterHandle {
    sender: mpsc::Sender<Message>,
}
 
impl CounterHandle {
    fn new() -> (Self, mpsc::Receiver<Message>) {
        let (tx, rx) = mpsc::channel();
        (Self { sender: tx }, rx)
    }
    
    fn increment(&self) {
        self.sender.send(Message::Increment).unwrap();
    }
    
    fn decrement(&self) {
        self.sender.send(Message::Decrement).unwrap();
    }
    
    fn get_value(&self) -> i32 {
        let (tx, rx) = mpsc::channel();
        self.sender.send(Message::GetValue(tx)).unwrap();
        rx.recv().unwrap()
    }
}
 
fn main() {
    let (handle, receiver) = CounterHandle::new();
    
    // Spawn actor thread
    thread::spawn(move || {
        let mut actor = CounterActor::new(receiver);
        actor.run();
    });
    
    handle.increment();
    handle.increment();
    handle.decrement();
    
    println!("Value: {}", handle.get_value());
}

Using flume for MPMC

// Add to Cargo.toml: flume = "0.11"
use flume;
use std::thread;
 
fn main() {
    let (tx, rx) = flume::bounded(10);
    
    // Multiple producers
    let mut producers = vec![];
    for i in 0..3 {
        let tx = tx.clone();
        producers.push(thread::spawn(move || {
            for j in 0..3 {
                tx.send((i, j)).unwrap();
            }
        }));
    }
    drop(tx);
    
    // Multiple consumers
    let mut consumers = vec![];
    for _ in 0..2 {
        let rx = rx.clone();
        consumers.push(thread::spawn(move || {
            while let Ok((p, n)) = rx.recv() {
                println!("Consumer received: producer={}, num={}", p, n);
            }
        }));
    }
    drop(rx);
    
    for p in producers {
        p.join().unwrap();
    }
    for c in consumers {
        c.join().unwrap();
    }
}

Channel Error Handling

use std::sync::mpsc;
use std::thread;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    // Sender error handling
    thread::spawn(move || {
        if tx.send(42).is_err() {
            println!("Receiver was dropped before receiving");
        }
    });
    
    // Receiver error handling
    match rx.recv() {
        Ok(value) => println!("Received: {}", value),
        Err(mpsc::RecvError) => println!("All senders were dropped"),
    }
    
    // Try_recv error handling
    let (tx2, rx2) = mpsc::channel();
    drop(tx2);  // Close sender
    
    match rx2.try_recv() {
        Ok(value) => println!("Got: {}", value),
        Err(mpsc::TryRecvError::Empty) => println!("Channel empty"),
        Err(mpsc::TryRecvError::Disconnected) => println!("Channel closed"),
    }
}

Channel with Structured Messages

use std::sync::mpsc;
use std::thread;
 
#[derive(Debug)]
enum Command {
    Set { key: String, value: i32 },
    Get { key: String, respond_to: mpsc::Sender<Option<i32>> },
    Delete { key: String },
}
 
struct KeyValueStore {
    data: std::collections::HashMap<String, i32>,
    receiver: mpsc::Receiver<Command>,
}
 
impl KeyValueStore {
    fn new(receiver: mpsc::Receiver<Command>) -> Self {
        Self {
            data: std::collections::HashMap::new(),
            receiver,
        }
    }
    
    fn run(&mut self) {
        while let Ok(cmd) = self.receiver.recv() {
            match cmd {
                Command::Set { key, value } => {
                    self.data.insert(key, value);
                }
                Command::Get { key, respond_to } => {
                    let value = self.data.get(&key).copied();
                    respond_to.send(value).unwrap();
                }
                Command::Delete { key } => {
                    self.data.remove(&key);
                }
            }
        }
    }
}
 
fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move || {
        let mut store = KeyValueStore::new(rx);
        store.run();
    });
    
    // Set values
    tx.send(Command::Set { key: String::from("a"), value: 1 }).unwrap();
    tx.send(Command::Set { key: String::from("b"), value: 2 }).unwrap();
    
    // Get value
    let (respond_tx, respond_rx) = mpsc::channel();
    tx.send(Command::Get { key: String::from("a"), respond_to: respond_tx }).unwrap();
    println!("Value of 'a': {:?}", respond_rx.recv().unwrap());
}

Summary

Standard Library Channel Types (std::sync::mpsc):

Type Description
Sender<T> Sending end (cloneable for multiple producers)
Receiver<T> Receiving end (single consumer)
channel() Create unbounded channel
sync_channel(n) Create bounded channel with capacity n

Receiver Methods:

Method Blocking? Description
recv() Yes Block until message received
try_recv() No Return immediately, may error
recv_timeout(dur) Partial Block with timeout
iter() No Iterate over received messages

Channel Error Types:

Error When It Occurs
RecvError All senders dropped
TryRecvError::Empty No message available
TryRecvError::Disconnected All senders dropped
SendError Receiver dropped

Channel Patterns:

Pattern Description
mpsc Multiple producers, single consumer
Pipeline Chain of processing stages
Fan-out One producer, multiple consumers
Fan-in Multiple producers, one consumer
Actor Message-driven isolated state
Worker pool Distribute work to workers

Channel Crate Comparison:

Crate Type Features
std::sync::mpsc mpsc Standard library, simple
crossbeam mpsc, spmc, mpmc Bounded/unbounded, select
flume mpmc Fast, ergonomic API
tokio::sync::mpsc mpsc Async channels

Key Points:

  • Channels provide message-passing concurrency
  • Standard library provides mpsc channels
  • sync_channel(n) creates bounded channels
  • Clone Sender for multiple producers
  • Use try_recv() or recv_timeout() for non-blocking
  • Consider crossbeam or flume for advanced features
  • Actor pattern isolates mutable state
  • Pipelines chain processing stages
  • Fan-out distributes work to multiple consumers
  • Fan-in aggregates from multiple producers