How do I work with channels for message passing in Rust?

Walkthrough

Channels in Rust provide a way to communicate between threads by sending messages. They are a core part of Rust's concurrency model, following the principle: "Do not communicate by sharing memory; instead, share memory by communicating."

Rust's standard library provides std::sync::mpsc (multiple producer, single consumer) channels. The mpsc module offers:

  • channel() — Creates a sender and receiver for asynchronous communication
  • sync_channel() — Creates a bounded channel with a fixed buffer size

Key concepts:

  • Sender (mpsc::Sender) — Used to send messages; can be cloned for multiple producers
  • Receiver (mpsc::Receiver) — Used to receive messages; cannot be cloned (single consumer)
  • Unbounded channels — Created with channel(), can grow indefinitely
  • Bounded channels — Created with sync_channel(n), block when full

For async code, external crates like tokio::sync::mpsc provide async-aware channels.

Code Examples

Basic Channel Communication

use std::sync::mpsc;
use std::thread;
 
fn main() {
    // Create an unbounded channel
    let (sender, receiver) = mpsc::channel();
    
    // Spawn a thread that sends a message
    thread::spawn(move || {
        let message = String::from("Hello from the spawned thread!");
        sender.send(message).unwrap();
    });
    
    // Receive the message in the main thread
    let received = receiver.recv().unwrap();
    println!("Received: {}", received);
}

Sending Multiple Messages

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
 
fn main() {
    let (sender, receiver) = mpsc::channel();
    
    thread::spawn(move || {
        let messages = vec![
            "First message",
            "Second message",
            "Third message",
        ];
        
        for msg in messages {
            sender.send(msg.to_string()).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    // recv() blocks until a message arrives
    for received in receiver {
        println!("Received: {}", received);
    }
    
    // The for loop ends when the sender is dropped
    println!("Channel closed (all senders dropped)");
}

Multiple Producers (mpsc)

use std::sync::mpsc;
use std::thread;
 
fn main() {
    let (sender, receiver) = mpsc::channel();
    
    // Clone the sender for multiple producers
    let sender1 = sender.clone();
    let sender2 = sender.clone();
    
    // Drop the original sender (not strictly necessary, but good practice)
    drop(sender);
    
    // Spawn multiple producer threads
    let handle1 = thread::spawn(move || {
        let messages = vec!["A1", "A2", "A3"];
        for msg in messages {
            sender1.send(format!("Thread 1: {}", msg)).unwrap();
        }
    });
    
    let handle2 = thread::spawn(move || {
        let messages = vec!["B1", "B2", "B3"];
        for msg in messages {
            sender2.send(format!("Thread 2: {}", msg)).unwrap();
        }
    });
    
    // Wait for both threads to finish
    handle1.join().unwrap();
    handle2.join().unwrap();
    
    // Collect all messages
    for received in receiver {
        println!("Got: {}", received);
    }
}

Bounded (Synchronous) Channel

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
 
fn main() {
    // Create a bounded channel with buffer size of 2
    let (sender, receiver) = mpsc::sync_channel(2);
    
    let producer = thread::spawn(move || {
        for i in 0..5 {
            // This will block when the buffer is full
            sender.send(i).unwrap();
            println!("Sent: {}", i);
        }
        // Sender is dropped here
    });
    
    // Give producer time to fill the buffer
    thread::sleep(Duration::from_millis(500));
    
    let consumer = thread::spawn(move || {
        thread::sleep(Duration::from_millis(100));
        for received in receiver {
            println!("Received: {}", received);
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    producer.join().unwrap();
    consumer.join().unwrap();
}

Non-Blocking Receive with try_recv

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
 
fn main() {
    let (sender, receiver) = mpsc::channel();
    
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(500));
        sender.send("Delayed message").unwrap();
    });
    
    // try_recv() doesn't block
    loop {
        match receiver.try_recv() {
            Ok(msg) => {
                println!("Received: {}", msg);
                break;
            }
            Err(mpsc::TryRecvError::Empty) => {
                println!("No message yet, doing other work...");
                thread::sleep(Duration::from_millis(100));
            }
            Err(mpsc::TryRecvError::Disconnected) => {
                println!("Channel closed");
                break;
            }
        }
    }
}

Receiving with Timeout (recv_timeout)

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
 
fn main() {
    let (sender, receiver) = mpsc::channel();
    
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(200));
        sender.send("Hello").unwrap();
    });
    
    // Wait for a message with a timeout
    match receiver.recv_timeout(Duration::from_millis(100)) {
        Ok(msg) => println!("Received: {}", msg),
        Err(mpsc::RecvTimeoutError::Timeout) => {
            println!("Timeout, no message received in 100ms");
        }
        Err(mpsc::RecvTimeoutError::Disconnected) => {
            println!("Channel closed");
        }
    }
    
    // Try again with longer timeout
    match receiver.recv_timeout(Duration::from_millis(500)) {
        Ok(msg) => println!("Received: {}", msg),
        Err(e) => println!("Error: {:?}", e),
    }
}

Sending Different Types with Enums

use std::sync::mpsc;
use std::thread;
 
// Define the message types
enum Message {
    Text(String),
    Number(i32),
    Quit,
}
 
fn main() {
    let (sender, receiver) = mpsc::channel();
    
    // Spawn a worker thread
    let worker = thread::spawn(move || {
        sender.send(Message::Text("Hello".to_string())).unwrap();
        sender.send(Message::Number(42)).unwrap();
        sender.send(Message::Quit).unwrap();
    });
    
    // Process messages based on type
    for msg in receiver {
        match msg {
            Message::Text(s) => println!("Text: {}", s),
            Message::Number(n) => println!("Number: {}", n),
            Message::Quit => {
                println!("Received quit signal");
                break;
            }
        }
    }
    
    worker.join().unwrap();
}

Worker Pool Pattern

use std::sync::mpsc;
use std::thread;
 
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
 
struct Job {
    id: usize,
    work: Box<dyn FnOnce() + Send + 'static>,
}
 
fn main() {
    let (sender, receiver) = mpsc::channel::<Job>();
    let receiver = mpsc::Receiver::into_iter(receiver);
    
    // Create worker threads
    let mut workers: Vec<Worker> = Vec::new();
    
    for id in 0..4 {
        let rx = receiver.clone(); // Note: into_iter() allows cloning
        
        let handle = thread::spawn(move || {
            for job in rx {
                println!("Worker {} executing job {}", id, job.id);
                (job.work)();
            }
        });
        
        workers.push(Worker { id, thread: handle });
    }
    
    // Send jobs
    for i in 0..10 {
        let job = Job {
            id: i,
            work: Box::new(move || {
                thread::sleep(std::time::Duration::from_millis(100));
                println!("Job {} completed", i);
            }),
        };
        sender.send(job).unwrap();
    }
    
    // Drop sender to signal workers to stop
    drop(sender);
    
    // Wait for all workers to finish
    for worker in workers {
        worker.thread.join().unwrap();
    }
}

Channel with Select-like Behavior

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
 
fn main() {
    let (tx1, rx1) = mpsc::channel();
    let (tx2, rx2) = mpsc::channel();
    
    // Producer 1
    thread::spawn(move || {
        for i in 0..3 {
            tx1.send(format!("Channel 1: {}", i)).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
    });
    
    // Producer 2
    thread::spawn(move || {
        for i in 0..3 {
            tx2.send(format!("Channel 2: {}", i)).unwrap();
            thread::sleep(Duration::from_millis(150));
        }
    });
    
    // Use select-like pattern with non-blocking receive
    loop {
        let msg1 = rx1.try_recv();
        let msg2 = rx2.try_recv();
        
        if let Ok(msg) = msg1 {
            println!("From rx1: {}", msg);
        }
        if let Ok(msg) = msg2 {
            println!("From rx2: {}", msg);
        }
        
        // Check if both channels are closed
        if msg1.is_err() && msg2.is_err() {
            match (msg1.unwrap_err(), msg2.unwrap_err()) {
                (mpsc::TryRecvError::Disconnected, mpsc::TryRecvError::Disconnected) => {
                    println!("Both channels closed");
                    break;
                }
                _ => thread::sleep(Duration::from_millis(10)),
            }
        }
    }
}

Using crossbeam for Multi-Consumer Channels

// Add to Cargo.toml:
// [dependencies]
// crossbeam = "0.8"
 
use crossbeam_channel::{bounded, unbounded};
use std::thread;
 
fn main() {
    // Crossbeam supports multiple consumers
    let (sender, receiver) = bounded(3);
    
    // Clone receiver for multiple consumers
    let receiver1 = receiver.clone();
    let receiver2 = receiver.clone();
    
    // Producer
    thread::spawn(move || {
        for i in 0..6 {
            sender.send(i).unwrap();
            println!("Sent: {}", i);
        }
    });
    
    // Consumer 1
    let h1 = thread::spawn(move || {
        for msg in receiver1 {
            println!("Consumer 1: {}", msg);
        }
    });
    
    // Consumer 2
    let h2 = thread::spawn(move || {
        for msg in receiver2 {
            println!("Consumer 2: {}", msg);
        }
    });
    
    // Drop the original receiver so the channel can close
    drop(receiver);
    
    h1.join().unwrap();
    h2.join().unwrap();
}

Summary

Channel Type Function Description
Unbounded mpsc::channel() No limit on messages; may cause memory issues
Bounded mpsc::sync_channel(n) Fixed buffer; blocks when full
Async (tokio) tokio::sync::mpsc Async-aware channels for async code
Multi-consumer crossbeam_channel Supports multiple receivers

Key Methods:

  • send(msg) — Send a message (blocks on bounded channels if full)
  • recv() — Receive a message (blocks until one is available)
  • try_recv() — Non-blocking receive
  • recv_timeout(duration) — Receive with timeout
  • iter() / into_iter() — Iterate over received messages

Best Practices:

  • Use bounded channels to prevent memory exhaustion
  • Drop senders when done to close the channel properly
  • Use enums to send different message types
  • Consider crossbeam or tokio channels for advanced use cases