Loading page…
Rust walkthroughs
Loading page…
Channels provide a way for threads to communicate by passing messages rather than sharing memory. Rust's standard library provides mpsc (multiple producer, single consumer) channels, and the crossbeam crate offers additional channel types with different semantics.
Key concepts:
When to use channels:
When NOT to use channels:
Mutex or Arc)use std::sync::mpsc;
use std::thread;
fn main() {
// Create a channel
let (tx, rx) = mpsc::channel();
// Spawn a thread that sends a message
thread::spawn(move || {
let message = String::from("Hello from thread!");
tx.send(message).unwrap();
});
// Receive the message in main thread
let received = rx.recv().unwrap();
println!("Received: {}", received);
}use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Clone the transmitter for multiple producers
let tx1 = tx.clone();
let tx2 = tx.clone();
// Drop original tx so channel can close properly
drop(tx);
thread::spawn(move || {
tx1.send(String::from("Message from thread 1")).unwrap();
});
thread::spawn(move || {
tx2.send(String::from("Message from thread 2")).unwrap();
});
// Receive all messages
for received in rx {
println!("Received: {}", received);
}
}use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
tx.send(42).unwrap();
});
// Non-blocking receive
loop {
match rx.try_recv() {
Ok(value) => {
println!("Received: {}", value);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("No message yet, doing other work...");
thread::sleep(Duration::from_millis(50));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("Channel closed");
break;
}
}
}
}use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
tx.send(String::from("Delayed message")).unwrap();
});
loop {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(message) => {
println!("Received: {}", message);
break;
}
Err(mpsc::RecvTimeoutError::Timeout) => {
println!("Timeout, waiting again...");
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
println!("Channel closed");
break;
}
}
}
}use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
fn new(size: usize) -> Self {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(mpsc::Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
let receiver = Arc::clone(&receiver);
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} executing job", id);
job();
}
});
workers.push(Worker { id, thread });
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Processing task {}", i);
thread::sleep(std::time::Duration::from_millis(100));
});
}
thread::sleep(std::time::Duration::from_millis(500));
}Note: The above needs use std::sync::Mutex; for the Mutex import.
// Add to Cargo.toml: crossbeam = "0.8"
use crossbeam::channel::{bounded, select};
use std::thread;
use std::time::Duration;
fn main() {
// Create a bounded channel with capacity 3
let (tx, rx) = bounded(3);
thread::spawn(move || {
for i in 0..10 {
tx.send(i).unwrap();
println!("Sent: {}", i);
}
});
thread::spawn(move || {
for received in rx {
println!("Received: {}", received);
thread::sleep(Duration::from_millis(100));
}
});
thread::sleep(Duration::from_millis(1500));
}use crossbeam::channel::unbounded;
use std::thread;
fn main() {
let (tx, rx) = unbounded();
// Send many messages without blocking
for i in 0..1000 {
tx.send(i).unwrap();
}
// Drop sender to close channel
drop(tx);
// Receive all messages
let sum: i32 = rx.iter().sum();
println!("Sum: {}", sum);
}use crossbeam::channel::{bounded, select};
use std::thread;
use std::time::Duration;
fn main() {
let (tx1, rx1) = bounded::<&str>(1);
let (tx2, rx2) = bounded::<&str>(1);
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
tx1.send("from channel 1").unwrap();
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx2.send("from channel 2").unwrap();
});
// Wait for first message from either channel
select! {
recv(rx1) -> msg => println!("Received {}", msg.unwrap()),
recv(rx2) -> msg => println!("Received {}", msg.unwrap()),
}
}use std::sync::mpsc;
use std::thread;
fn main() {
// Stage 1: Generate numbers
let (tx1, rx1) = mpsc::channel();
thread::spawn(move || {
for i in 1..=5 {
tx1.send(i).unwrap();
}
});
// Stage 2: Square numbers
let (tx2, rx2) = mpsc::channel();
thread::spawn(move || {
for num in rx1 {
tx2.send(num * num).unwrap();
}
});
// Stage 3: Print results
for result in rx2 {
println!("Squared: {}", result);
}
}use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Producer
thread::spawn(move || {
for i in 1..=10 {
tx.send(i).unwrap();
}
});
// Multiple consumers (fan-out)
let rx = std::sync::Arc::new(std::sync::Mutex::new(rx));
let mut handles = vec![];
for worker_id in 0..3 {
let rx = Arc::clone(&rx);
let handle = thread::spawn(move || {
loop {
let msg = rx.lock().unwrap().recv();
match msg {
Ok(value) => println!("Worker {} got: {}", worker_id, value),
Err(_) => break,
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
use std::sync::Arc;use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Multiple producers (fan-in)
let mut handles = vec![];
for producer_id in 0..3 {
let tx = tx.clone();
let handle = thread::spawn(move || {
for i in 0..3 {
let msg = format!("Producer {}: msg {}", producer_id, i);
tx.send(msg).unwrap();
}
});
handles.push(handle);
}
// Drop the original sender
drop(tx);
// Single consumer receives from all
for received in rx {
println!("Received: {}", received);
}
for handle in handles {
handle.join().unwrap();
}
}use std::sync::mpsc;
use std::thread;
enum Message {
Increment,
Decrement,
GetValue(mpsc::Sender<i32>),
}
struct CounterActor {
count: i32,
receiver: mpsc::Receiver<Message>,
}
impl CounterActor {
fn new(receiver: mpsc::Receiver<Message>) -> Self {
Self { count: 0, receiver }
}
fn run(&mut self) {
while let Ok(msg) = self.receiver.recv() {
match msg {
Message::Increment => self.count += 1,
Message::Decrement => self.count -= 1,
Message::GetValue(tx) => {
tx.send(self.count).unwrap();
}
}
}
}
}
struct CounterHandle {
sender: mpsc::Sender<Message>,
}
impl CounterHandle {
fn new() -> (Self, mpsc::Receiver<Message>) {
let (tx, rx) = mpsc::channel();
(Self { sender: tx }, rx)
}
fn increment(&self) {
self.sender.send(Message::Increment).unwrap();
}
fn decrement(&self) {
self.sender.send(Message::Decrement).unwrap();
}
fn get_value(&self) -> i32 {
let (tx, rx) = mpsc::channel();
self.sender.send(Message::GetValue(tx)).unwrap();
rx.recv().unwrap()
}
}
fn main() {
let (handle, receiver) = CounterHandle::new();
// Spawn actor thread
thread::spawn(move || {
let mut actor = CounterActor::new(receiver);
actor.run();
});
handle.increment();
handle.increment();
handle.decrement();
println!("Value: {}", handle.get_value());
}// Add to Cargo.toml: flume = "0.11"
use flume;
use std::thread;
fn main() {
let (tx, rx) = flume::bounded(10);
// Multiple producers
let mut producers = vec![];
for i in 0..3 {
let tx = tx.clone();
producers.push(thread::spawn(move || {
for j in 0..3 {
tx.send((i, j)).unwrap();
}
}));
}
drop(tx);
// Multiple consumers
let mut consumers = vec![];
for _ in 0..2 {
let rx = rx.clone();
consumers.push(thread::spawn(move || {
while let Ok((p, n)) = rx.recv() {
println!("Consumer received: producer={}, num={}", p, n);
}
}));
}
drop(rx);
for p in producers {
p.join().unwrap();
}
for c in consumers {
c.join().unwrap();
}
}use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Sender error handling
thread::spawn(move || {
if tx.send(42).is_err() {
println!("Receiver was dropped before receiving");
}
});
// Receiver error handling
match rx.recv() {
Ok(value) => println!("Received: {}", value),
Err(mpsc::RecvError) => println!("All senders were dropped"),
}
// Try_recv error handling
let (tx2, rx2) = mpsc::channel();
drop(tx2); // Close sender
match rx2.try_recv() {
Ok(value) => println!("Got: {}", value),
Err(mpsc::TryRecvError::Empty) => println!("Channel empty"),
Err(mpsc::TryRecvError::Disconnected) => println!("Channel closed"),
}
}use std::sync::mpsc;
use std::thread;
#[derive(Debug)]
enum Command {
Set { key: String, value: i32 },
Get { key: String, respond_to: mpsc::Sender<Option<i32>> },
Delete { key: String },
}
struct KeyValueStore {
data: std::collections::HashMap<String, i32>,
receiver: mpsc::Receiver<Command>,
}
impl KeyValueStore {
fn new(receiver: mpsc::Receiver<Command>) -> Self {
Self {
data: std::collections::HashMap::new(),
receiver,
}
}
fn run(&mut self) {
while let Ok(cmd) = self.receiver.recv() {
match cmd {
Command::Set { key, value } => {
self.data.insert(key, value);
}
Command::Get { key, respond_to } => {
let value = self.data.get(&key).copied();
respond_to.send(value).unwrap();
}
Command::Delete { key } => {
self.data.remove(&key);
}
}
}
}
}
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let mut store = KeyValueStore::new(rx);
store.run();
});
// Set values
tx.send(Command::Set { key: String::from("a"), value: 1 }).unwrap();
tx.send(Command::Set { key: String::from("b"), value: 2 }).unwrap();
// Get value
let (respond_tx, respond_rx) = mpsc::channel();
tx.send(Command::Get { key: String::from("a"), respond_to: respond_tx }).unwrap();
println!("Value of 'a': {:?}", respond_rx.recv().unwrap());
}Standard Library Channel Types (std::sync::mpsc):
| Type | Description |
|------|-------------|
| Sender<T> | Sending end (cloneable for multiple producers) |
| Receiver<T> | Receiving end (single consumer) |
| channel() | Create unbounded channel |
| sync_channel(n) | Create bounded channel with capacity n |
Receiver Methods:
| Method | Blocking? | Description |
|--------|-----------|-------------|
| recv() | Yes | Block until message received |
| try_recv() | No | Return immediately, may error |
| recv_timeout(dur) | Partial | Block with timeout |
| iter() | No | Iterate over received messages |
Channel Error Types:
| Error | When It Occurs |
|-------|---------------|
| RecvError | All senders dropped |
| TryRecvError::Empty | No message available |
| TryRecvError::Disconnected | All senders dropped |
| SendError | Receiver dropped |
Channel Patterns:
| Pattern | Description | |---------|-------------| | mpsc | Multiple producers, single consumer | | Pipeline | Chain of processing stages | | Fan-out | One producer, multiple consumers | | Fan-in | Multiple producers, one consumer | | Actor | Message-driven isolated state | | Worker pool | Distribute work to workers |
Channel Crate Comparison:
| Crate | Type | Features | |-------|------|----------| | std::sync::mpsc | mpsc | Standard library, simple | | crossbeam | mpsc, spmc, mpmc | Bounded/unbounded, select | | flume | mpmc | Fast, ergonomic API | | tokio::sync::mpsc | mpsc | Async channels |
Key Points:
sync_channel(n) creates bounded channelsSender for multiple producerstry_recv() or recv_timeout() for non-blocking