Loading pageā¦
Rust walkthroughs
Loading pageā¦
tokio::sync::watch channel differ from tokio::sync::broadcast for state distribution?tokio::sync::watch is a single-value channel that always contains exactly one value representing current state, overwriting previous values on each send, while tokio::sync::broadcast is a multi-producer, multi-consumer channel that buffers a configurable number of messages and delivers them in order to all active receivers. Use watch when consumers only need the current stateāconfiguration updates, metrics, shared settingsāand use broadcast when consumers need to receive all messages in orderāevent streams, logs, notifications. The key distinction is that watch is state-oriented (what's the current value?) while broadcast is stream-oriented (what events have occurred?).
use tokio::sync::watch;
#[tokio::main]
async fn main() {
// Create a watch channel with initial value
let (tx, mut rx) = watch::channel(0);
// Receiver immediately has access to current value
println!("Initial value: {}", *rx.borrow());
// Sender updates the value
tx.send(42).unwrap();
// Receiver sees the new value
println!("After send: {}", *rx.borrow());
// Watch always has exactly one value
tx.send(100).unwrap();
tx.send(200).unwrap(); // Previous value (100) is lost
println!("Current value: {}", *rx.borrow()); // 200
}watch stores only the most recent value; intermediate values are overwritten.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
// Create a broadcast channel with capacity for 16 messages
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
// Send multiple messages
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
// Each receiver gets all messages in order
println!("Receiver 1: {:?}", rx1.recv().await); // 1
println!("Receiver 1: {:?}", rx1.recv().await); // 2
println!("Receiver 1: {:?}", rx1.recv().await); // 3
println!("Receiver 2: {:?}", rx2.recv().await); // 1
println!("Receiver 2: {:?}", rx2.recv().await); // 2
println!("Receiver 2: {:?}", rx2.recv().await); // 3
}broadcast buffers messages and delivers each to all receivers in order.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, _rx) = watch::channel("initial".to_string());
// Send some values
tx.send("first".to_string()).unwrap();
tx.send("second".to_string()).unwrap();
tx.send("current".to_string()).unwrap();
// Subscribe a new receiver
let rx = tx.subscribe();
// New receiver immediately sees current value
println!("New receiver sees: {}", *rx.borrow()); // "current"
// Past values are not accessible
// "first" and "second" are lost forever
}watch new receivers get the current value; history is not preserved.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, _rx) = broadcast::channel(16);
// Send some values before subscribing
tx.send("first".to_string()).unwrap();
tx.send("second".to_string()).unwrap();
tx.send("third".to_string()).unwrap();
// Subscribe a new receiver AFTER sends
let mut rx = tx.subscribe();
// New receiver does NOT get past messages
// It only receives messages sent AFTER subscription
// recv() will wait for new messages
}broadcast new receivers only get messages sent after subscription.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, mut rx) = watch::channel(0);
// First update
tx.send(1).unwrap();
// Receiver can see current value anytime
println!("Current: {}", *rx.borrow()); // 1
// Multiple sends - only latest matters
tx.send(2).unwrap();
tx.send(3).unwrap();
// Still just current value
println!("Still current: {}", *rx.borrow()); // 3
// Subscribe another receiver
let rx2 = tx.subscribe();
println!("New subscriber sees: {}", *rx2.borrow()); // 3
}watch always provides immediate access to current value.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel::<u32>(16);
tx.send(1).unwrap();
tx.send(2).unwrap();
// rx1 gets both messages
assert_eq!(rx1.recv().await.unwrap(), 1);
assert_eq!(rx1.recv().await.unwrap(), 2);
// rx2 subscribed after sends - misses them
let mut rx2 = tx.subscribe();
tx.send(3).unwrap();
// rx1 sees 3
assert_eq!(rx1.recv().await.unwrap(), 3);
// rx2 sees only 3 (missed 1 and 2)
assert_eq!(rx2.recv().await.unwrap(), 3);
}broadcast receivers miss messages sent before subscription.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, mut rx) = watch::channel(0);
// changed() waits for value to change from last seen
let handle = tokio::spawn(async move {
loop {
rx.changed().await.unwrap();
println!("Value changed to: {}", *rx.borrow());
}
});
tx.send(1).unwrap(); // Triggers: "Value changed to: 1"
tx.send(2).unwrap(); // Triggers: "Value changed to: 2"
tx.send(2).unwrap(); // No trigger - same value
tx.send(3).unwrap(); // Triggers: "Value changed to: 3"
handle.abort();
}watch::changed() waits until the value differs from the last observed value.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel::<u32>(16);
let handle = tokio::spawn(async move {
loop {
// recv() waits for next message
match rx.recv().await {
Ok(value) => println!("Received: {}", value),
Err(e) => {
println!("Error: {}", e);
break;
}
}
}
});
tx.send(1).unwrap(); // "Received: 1"
tx.send(2).unwrap(); // "Received: 2"
tx.send(2).unwrap(); // "Received: 2" (same value still delivered)
handle.abort();
}broadcast::recv() waits for any message, regardless of value.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, rx) = watch::channel(String::new());
// borrow() returns reference to current value
// No waiting, immediate access
let current: &str = &rx.borrow();
println!("Current: '{}'", current);
// borrow() returns a WatchRef that derefs to &T
// The value is locked during the borrow
tx.send("hello".to_string()).unwrap();
// Multiple borrows can coexist
let b1 = rx.borrow();
let b2 = rx.borrow();
println!("b1: '{}', b2: '{}'", *b1, *b2);
// borrow() doesn't block, changed() does
}watch::borrow() gives immediate synchronous access to the current value.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel::<String>(16);
tx.send("hello".to_string()).unwrap();
// recv() is async - must await
// Cannot borrow; must receive
let value: String = rx.recv().await.unwrap();
println!("Received: '{}'", value);
// No synchronous access like watch::borrow()
// Messages must be consumed in order
}broadcast has no synchronous borrow; messages must be received asynchronously.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
// watch has implicit capacity of 1 (the current value)
let (tx, rx) = watch::channel(0);
// send() is always non-blocking
// It just overwrites the current value
for i in 0..1000 {
tx.send(i).unwrap(); // Never blocks, just overwrites
}
// Only the latest value exists
println!("Current: {}", *rx.borrow()); // 999
}watch cannot overflow; it always holds exactly one value.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(3); // Capacity 3
// Fill the buffer
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
// Sending more overflows
tx.send(4).unwrap(); // Oldest message (1) is dropped
// Slow receiver sees overflow error if it had tried to read 1
// But current receiver still gets remaining messages
assert_eq!(rx.recv().await.unwrap(), 2);
assert_eq!(rx.recv().await.unwrap(), 3);
assert_eq!(rx.recv().await.unwrap(), 4);
}broadcast drops old messages when capacity is exceeded.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, rx) = watch::channel(0);
// Fast sender
for i in 1..=100 {
tx.send(i).unwrap(); // Never blocks
}
// Slow receiver - doesn't affect sender
// It just sees the latest value when it looks
println!("Receiver sees: {}", *rx.borrow()); // 100
// No overflow, no backpressure
// Sender is never blocked by slow receivers
}watch sender is never blocked by slow receivers.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel::<u32>(2);
// Fast sender fills buffer
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap(); // Drops message 1
// Slow receiver
// If it was too slow, it gets an overflow error
// But we can still receive what's in the buffer
assert_eq!(rx.recv().await.unwrap(), 2);
assert_eq!(rx.recv().await.unwrap(), 3);
}broadcast can overflow; slow receivers miss messages.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, rx1) = watch::channel(0);
// Multiple receivers from same sender
let rx2 = tx.subscribe();
let rx3 = tx.subscribe();
tx.send(42).unwrap();
// All receivers see same current value
println!("rx1: {}", *rx1.borrow()); // 42
println!("rx2: {}", *rx2.borrow()); // 42
println!("rx3: {}", *rx3.borrow()); // 42
// Each receiver tracks its own "last seen" for changed()
}watch supports multiple receivers, each tracking its own last-seen value.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
let mut rx3 = tx.subscribe();
tx.send(42).unwrap();
tx.send(100).unwrap();
// Each receiver independently receives all messages
assert_eq!(rx1.recv().await.unwrap(), 42);
assert_eq!(rx1.recv().await.unwrap(), 100);
assert_eq!(rx2.recv().await.unwrap(), 42);
assert_eq!(rx2.recv().await.unwrap(), 100);
assert_eq!(rx3.recv().await.unwrap(), 42);
assert_eq!(rx3.recv().await.unwrap(), 100);
}broadcast delivers each message to all active receivers.
use tokio::sync::watch;
use std::collections::HashMap;
#[derive(Clone, Debug, Default)]
struct AppConfig {
debug_mode: bool,
max_connections: usize,
timeouts: HashMap<String, u64>,
}
#[tokio::main]
async fn main() {
let initial_config = AppConfig::default();
let (tx, rx) = watch::channel(initial_config);
// Multiple components subscribe to config
let rx1 = tx.subscribe();
let rx2 = tx.subscribe();
// Update config
let mut new_config = AppConfig::default();
new_config.debug_mode = true;
new_config.max_connections = 100;
tx.send(new_config).unwrap();
// All components see current config
println!("Component 1 config: {:?}", *rx1.borrow());
println!("Component 2 config: {:?}", *rx2.borrow());
// Components can wait for changes
let mut rx_watcher = tx.subscribe();
tokio::spawn(async move {
loop {
rx_watcher.changed().await.unwrap();
println!("Config updated: {:?}", *rx_watcher.borrow());
}
});
}watch is ideal for distributing current configuration state.
use tokio::sync::broadcast;
#[derive(Clone, Debug)]
enum SystemEvent {
UserLoggedIn { user_id: u64 },
UserLoggedOut { user_id: u64 },
FileUploaded { name: String, size: u64 },
}
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(128);
let mut rx2 = tx.subscribe();
// Emit events
tx.send(SystemEvent::UserLoggedIn { user_id: 1 }).unwrap();
tx.send(SystemEvent::FileUploaded {
name: "document.pdf".to_string(),
size: 1024
}).unwrap();
// Multiple consumers process events in order
// Logging consumer
tokio::spawn(async move {
while let Ok(event) = rx1.recv().await {
println!("Log: {:?}", event);
}
});
// Metrics consumer
tokio::spawn(async move {
while let Ok(event) = rx2.recv().await {
match event {
SystemEvent::UserLoggedIn { user_id } => {
println!("Active users increment for {}", user_id);
}
_ => {}
}
}
});
}broadcast is ideal for event streaming where all events matter.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel::<u32>(2);
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap(); // Message 1 is now gone
// If receiver was slow and missed message 1
// It would get RecvError::Lagged
// But let's see what we actually get:
// We can still receive 2 and 3
assert_eq!(rx.recv().await.unwrap(), 2);
assert_eq!(rx.recv().await.unwrap(), 3);
// Demonstrate lagged error
let (tx2, mut rx2) = broadcast::channel::<u32>(2);
// Fill buffer
tx2.send(1).unwrap();
tx2.send(2).unwrap();
// Overflow (message 1 is dropped)
tx2.send(3).unwrap(); // Drops 1
tx2.send(4).unwrap(); // Drops 2
// If rx2 had tried to receive after only seeing up to message 1
// it would get Lagged error
// But starting fresh, it sees 3 and 4
assert_eq!(rx2.recv().await.unwrap(), 3);
assert_eq!(rx2.recv().await.unwrap(), 4);
}broadcast receivers can get Lagged errors if they fall too far behind.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, rx) = watch::channel(0);
// No matter how slow the receiver is
// it always gets the current value
for i in 0..1000000 {
tx.send(i).unwrap(); // Never blocks
}
// Even a very late receiver
println!("Current: {}", *rx.borrow()); // 999999
// No Lagged errors possible
}watch cannot lag; it always provides the current value.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, rx) = watch::channel(0);
// Sender can be cloned - all clones update the same value
let tx2 = tx.clone();
let tx3 = tx.clone();
tx.send(1).unwrap();
tx2.send(2).unwrap(); // All update same channel
tx3.send(3).unwrap();
// Single current value
println!("Value: {}", *rx.borrow()); // 3
}watch sender clones all update the same single value.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel(16);
// Sender clones all publish to same channel
let tx2 = tx.clone();
let tx3 = tx.clone();
tx.send(1).unwrap();
tx2.send(2).unwrap();
tx3.send(3).unwrap();
// All messages preserved in order
assert_eq!(rx.recv().await.unwrap(), 1);
assert_eq!(rx.recv().await.unwrap(), 2);
assert_eq!(rx.recv().await.unwrap(), 3);
}broadcast sender clones all publish to the same channel.
use tokio::sync::watch;
#[tokio::main]
async fn main() {
let (tx, rx) = watch::channel(42);
// Receiver can check if sender exists
println!("Has value: {}", rx.has_changed().is_ok());
// Drop sender
drop(tx);
// Receiver still has the last value
println!("Last value: {}", *rx.borrow()); // 42
// But changed() will return error
// (sender dropped)
// rx.changed().await would return Err
}watch receivers keep the last value after sender drops.
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx) = broadcast::channel::<u32>(16);
tx.send(1).unwrap();
tx.send(2).unwrap();
// Drop sender
drop(tx);
// Receiver can still get buffered messages
assert_eq!(rx.recv().await.unwrap(), 1);
assert_eq!(rx.recv().await.unwrap(), 2);
// But recv() will eventually return error (all senders closed)
// after exhausting buffer
}broadcast receivers can drain remaining messages after sender drops.
| Aspect | watch | broadcast |
|--------|-------|-----------|
| Value storage | Single current value | Buffered message history |
| New receivers | Get current value immediately | Miss pre-subscription messages |
| Capacity | 1 (implicit) | Configurable |
| Overflow | Impossible | Drops oldest messages |
| Slow receivers | Never block sender | Can cause Lagged errors |
| Access pattern | borrow() (sync) or changed() (async) | recv() (async only) |
| Use case | State distribution | Event streaming |
| History | None (overwritten) | Up to capacity |
| Backpressure | None | None (drops on overflow) |
The fundamental difference is the mental model:
watch is state: It holds exactly one value at any timeāthe current state. Receivers can query this state synchronously with borrow() or wait for state changes with changed(). History doesn't exist; old values are overwritten. This makes it perfect for configuration, metrics, feature flags, and any shared state where only the current value matters. A slow receiver never affects the sender, and there's no concept of "missing" updatesāyou just see the current value.
broadcast is a stream: It buffers a sequence of messages and delivers each to all active receivers in order. Receivers must consume messages asynchronously with recv(). History exists up to the capacity limit, then oldest messages drop. This makes it perfect for events, logs, signals, and any scenario where the sequence of occurrences matters. A slow receiver can lag and lose messages, but each receiver gets the same sequence.
Key insight: Choose based on whether consumers need "what's the current state?" (watch) or "what events have occurred?" (broadcast). If a consumer joining late should immediately see the current value, use watch. If a consumer should only see events that happen after subscription, use broadcast. If missing intermediate values is acceptable as long as the latest is available, watch suffices. If consumers need every event within a window, broadcast with adequate capacity is appropriate.
The capacity behavior is telling: watch implicitly has capacity 1 because it's all about "now." broadcast has explicit capacity because it's about "recent history" within bounds. This distinction determines everything else about their semantics.