Loading page…
Rust walkthroughs
Loading page…
Channels in Rust provide a way to communicate between threads by sending messages. They are a core part of Rust's concurrency model, following the principle: "Do not communicate by sharing memory; instead, share memory by communicating."
Rust's standard library provides std::sync::mpsc (multiple producer, single consumer) channels. The mpsc module offers:
channel() — Creates a sender and receiver for asynchronous communicationsync_channel() — Creates a bounded channel with a fixed buffer sizeKey concepts:
mpsc::Sender) — Used to send messages; can be cloned for multiple producersmpsc::Receiver) — Used to receive messages; cannot be cloned (single consumer)channel(), can grow indefinitelysync_channel(n), block when fullFor async code, external crates like tokio::sync::mpsc provide async-aware channels.
use std::sync::mpsc;
use std::thread;
fn main() {
// Create an unbounded channel
let (sender, receiver) = mpsc::channel();
// Spawn a thread that sends a message
thread::spawn(move || {
let message = String::from("Hello from the spawned thread!");
sender.send(message).unwrap();
});
// Receive the message in the main thread
let received = receiver.recv().unwrap();
println!("Received: {}", received);
}use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
let messages = vec![
"First message",
"Second message",
"Third message",
];
for msg in messages {
sender.send(msg.to_string()).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
// recv() blocks until a message arrives
for received in receiver {
println!("Received: {}", received);
}
// The for loop ends when the sender is dropped
println!("Channel closed (all senders dropped)");
}use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
// Clone the sender for multiple producers
let sender1 = sender.clone();
let sender2 = sender.clone();
// Drop the original sender (not strictly necessary, but good practice)
drop(sender);
// Spawn multiple producer threads
let handle1 = thread::spawn(move || {
let messages = vec!["A1", "A2", "A3"];
for msg in messages {
sender1.send(format!("Thread 1: {}", msg)).unwrap();
}
});
let handle2 = thread::spawn(move || {
let messages = vec!["B1", "B2", "B3"];
for msg in messages {
sender2.send(format!("Thread 2: {}", msg)).unwrap();
}
});
// Wait for both threads to finish
handle1.join().unwrap();
handle2.join().unwrap();
// Collect all messages
for received in receiver {
println!("Got: {}", received);
}
}use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Create a bounded channel with buffer size of 2
let (sender, receiver) = mpsc::sync_channel(2);
let producer = thread::spawn(move || {
for i in 0..5 {
// This will block when the buffer is full
sender.send(i).unwrap();
println!("Sent: {}", i);
}
// Sender is dropped here
});
// Give producer time to fill the buffer
thread::sleep(Duration::from_millis(500));
let consumer = thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
for received in receiver {
println!("Received: {}", received);
thread::sleep(Duration::from_millis(100));
}
});
producer.join().unwrap();
consumer.join().unwrap();
}use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
sender.send("Delayed message").unwrap();
});
// try_recv() doesn't block
loop {
match receiver.try_recv() {
Ok(msg) => {
println!("Received: {}", msg);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("No message yet, doing other work...");
thread::sleep(Duration::from_millis(100));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("Channel closed");
break;
}
}
}
}use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
sender.send("Hello").unwrap();
});
// Wait for a message with a timeout
match receiver.recv_timeout(Duration::from_millis(100)) {
Ok(msg) => println!("Received: {}", msg),
Err(mpsc::RecvTimeoutError::Timeout) => {
println!("Timeout, no message received in 100ms");
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
println!("Channel closed");
}
}
// Try again with longer timeout
match receiver.recv_timeout(Duration::from_millis(500)) {
Ok(msg) => println!("Received: {}", msg),
Err(e) => println!("Error: {:?}", e),
}
}use std::sync::mpsc;
use std::thread;
// Define the message types
enum Message {
Text(String),
Number(i32),
Quit,
}
fn main() {
let (sender, receiver) = mpsc::channel();
// Spawn a worker thread
let worker = thread::spawn(move || {
sender.send(Message::Text("Hello".to_string())).unwrap();
sender.send(Message::Number(42)).unwrap();
sender.send(Message::Quit).unwrap();
});
// Process messages based on type
for msg in receiver {
match msg {
Message::Text(s) => println!("Text: {}", s),
Message::Number(n) => println!("Number: {}", n),
Message::Quit => {
println!("Received quit signal");
break;
}
}
}
worker.join().unwrap();
}use std::sync::mpsc;
use std::thread;
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
struct Job {
id: usize,
work: Box<dyn FnOnce() + Send + 'static>,
}
fn main() {
let (sender, receiver) = mpsc::channel::<Job>();
let receiver = mpsc::Receiver::into_iter(receiver);
// Create worker threads
let mut workers: Vec<Worker> = Vec::new();
for id in 0..4 {
let rx = receiver.clone(); // Note: into_iter() allows cloning
let handle = thread::spawn(move || {
for job in rx {
println!("Worker {} executing job {}", id, job.id);
(job.work)();
}
});
workers.push(Worker { id, thread: handle });
}
// Send jobs
for i in 0..10 {
let job = Job {
id: i,
work: Box::new(move || {
thread::sleep(std::time::Duration::from_millis(100));
println!("Job {} completed", i);
}),
};
sender.send(job).unwrap();
}
// Drop sender to signal workers to stop
drop(sender);
// Wait for all workers to finish
for worker in workers {
worker.thread.join().unwrap();
}
}use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
// Producer 1
thread::spawn(move || {
for i in 0..3 {
tx1.send(format!("Channel 1: {}", i)).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
// Producer 2
thread::spawn(move || {
for i in 0..3 {
tx2.send(format!("Channel 2: {}", i)).unwrap();
thread::sleep(Duration::from_millis(150));
}
});
// Use select-like pattern with non-blocking receive
loop {
let msg1 = rx1.try_recv();
let msg2 = rx2.try_recv();
if let Ok(msg) = msg1 {
println!("From rx1: {}", msg);
}
if let Ok(msg) = msg2 {
println!("From rx2: {}", msg);
}
// Check if both channels are closed
if msg1.is_err() && msg2.is_err() {
match (msg1.unwrap_err(), msg2.unwrap_err()) {
(mpsc::TryRecvError::Disconnected, mpsc::TryRecvError::Disconnected) => {
println!("Both channels closed");
break;
}
_ => thread::sleep(Duration::from_millis(10)),
}
}
}
}// Add to Cargo.toml:
// [dependencies]
// crossbeam = "0.8"
use crossbeam_channel::{bounded, unbounded};
use std::thread;
fn main() {
// Crossbeam supports multiple consumers
let (sender, receiver) = bounded(3);
// Clone receiver for multiple consumers
let receiver1 = receiver.clone();
let receiver2 = receiver.clone();
// Producer
thread::spawn(move || {
for i in 0..6 {
sender.send(i).unwrap();
println!("Sent: {}", i);
}
});
// Consumer 1
let h1 = thread::spawn(move || {
for msg in receiver1 {
println!("Consumer 1: {}", msg);
}
});
// Consumer 2
let h2 = thread::spawn(move || {
for msg in receiver2 {
println!("Consumer 2: {}", msg);
}
});
// Drop the original receiver so the channel can close
drop(receiver);
h1.join().unwrap();
h2.join().unwrap();
}| Channel Type | Function | Description |
|-------------|----------|-------------|
| Unbounded | mpsc::channel() | No limit on messages; may cause memory issues |
| Bounded | mpsc::sync_channel(n) | Fixed buffer; blocks when full |
| Async (tokio) | tokio::sync::mpsc | Async-aware channels for async code |
| Multi-consumer | crossbeam_channel | Supports multiple receivers |
Key Methods:
send(msg) — Send a message (blocks on bounded channels if full)recv() — Receive a message (blocks until one is available)try_recv() — Non-blocking receiverecv_timeout(duration) — Receive with timeoutiter() / into_iter() — Iterate over received messagesBest Practices:
crossbeam or tokio channels for advanced use cases