When using futures::stream::select, how does fairness work among multiple streams?
futures::stream::select combines multiple streams into a single stream that yields items from any of the source streams as they become available. The fairness model is not guaranteed to be perfectly fair—select polls streams in the order they were provided, returning the first ready item. This means that if multiple streams have items ready simultaneously, earlier streams in the selection have priority. For scenarios requiring round-robin fairness across ready streams, select_all combined with manual round-robin polling or futures::stream::select_with_strategy provides more control over the selection policy.
Basic select Usage
use futures::stream::{self, StreamExt, select};
use tokio::time::{interval, Duration};
#[tokio::main]
async fn basic_select() {
// Create two streams with different intervals
let stream1 = stream::repeat(1)
.then(|x| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
x
});
let stream2 = stream::repeat(2)
.then(|x| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
x
});
// select combines them into one stream
let mut combined = stream1.select(stream2);
// Take first 10 items
let mut results = vec![];
for _ in 0..10 {
if let Some(item) = combined.next().await {
results.push(item);
}
}
println!("Results: {:?}", results);
// Order depends on which stream is ready first
}select combines two streams, yielding items as either becomes ready.
Polling Order and Priority
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn polling_order() {
// Two streams that are always ready
let stream1 = stream::iter(vec![1, 2, 3, 4, 5]);
let stream2 = stream::iter(vec![10, 20, 30, 40, 50]);
let mut combined = stream1.select(stream2);
let mut results = vec![];
while let Some(item) = combined.next().await {
results.push(item);
}
println!("Results: {:?}", results);
// When both are ready, stream1 (first argument) gets priority
// Output: [1, 10, 2, 20, 3, 30, 4, 40, 5, 50]
// Or: [1, 2, 3, 4, 5, 10, 20, 30, 40, 50]
// Depends on implementation details of select
}When multiple streams are ready, the first stream has priority.
select_all for Multiple Streams
use futures::stream::{self, StreamExt, SelectAll};
#[tokio::main]
async fn select_all_example() {
let stream1 = stream::iter(vec![1, 2, 3]);
let stream2 = stream::iter(vec![10, 20, 30]);
let stream3 = stream::iter(vec![100, 200, 300]);
// SelectAll can combine any number of streams
let mut combined: SelectAll<_> = SelectAll::new();
combined.push(stream1);
combined.push(stream2);
combined.push(stream3);
let mut results = vec![];
while let Some(item) = combined.next().await {
results.push(item);
}
println!("Results: {:?}", results);
// Items come from whichever stream is ready
}SelectAll handles more than two streams with similar ordering behavior.
Observing Unfairness
use futures::stream::{self, StreamExt};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[tokio::main]
async fn observe_unfairness() {
let counter1 = Arc::new(AtomicUsize::new(0));
let counter2 = Arc::new(AtomicUsize::new(0));
// Both streams are immediately ready
let c1 = counter1.clone();
let stream1 = stream::repeat_with(move || {
c1.fetch_add(1, Ordering::SeqCst);
1
}).take(100);
let c2 = counter2.clone();
let stream2 = stream::repeat_with(move || {
c2.fetch_add(1, Ordering::SeqCst);
2
}).take(100);
let mut combined = stream1.select(stream2);
let mut from_stream1 = 0;
let mut from_stream2 = 0;
for _ in 0..100 {
if let Some(item) = combined.next().await {
if item == 1 {
from_stream1 += 1;
} else {
from_stream2 += 1;
}
}
}
println!("From stream1: {}", from_stream1);
println!("From stream2: {}", from_stream2);
// May not be 50/50 due to polling order preference
}When both streams are ready, the first stream often yields more items.
Using select_with_strategy
use futures::stream::{self, StreamExt};
use futures::future::Either;
#[tokio::main]
async fn select_with_strategy() {
let stream1 = stream::iter(vec![1, 2, 3, 4, 5]);
let stream2 = stream::iter(vec![10, 20, 30, 40, 50]);
// Custom strategy for fairness
let mut prefer_stream1 = true;
let strategy = |_: &mut ()| {
// Alternate preference
prefer_stream1 = !prefer_stream1;
if prefer_stream1 {
// Prefer left (stream1)
Either::Left(())
} else {
// Prefer right (stream2)
Either::Right(())
}
};
// Note: select_with_strategy is more complex
// Here's a simpler approach with manual alternation
let mut stream1 = stream1.fuse();
let mut stream2 = stream2.fuse();
let mut results = vec![];
let mut prefer_first = true;
loop {
let item = if prefer_first {
futures::select_biased! {
item = stream1.next() => item,
item = stream2.next() => item,
complete => break,
}
} else {
futures::select_biased! {
item = stream2.next() => item,
item = stream1.next() => item,
complete => break,
}
};
if let Some(x) = item {
results.push(x);
}
prefer_first = !prefer_first;
}
println!("Alternating results: {:?}", results);
}select_biased! allows controlling poll order explicitly.
select_biased Macro
use futures::stream::{self, StreamExt};
use futures::select_biased;
#[tokio::main]
async fn select_biased_example() {
let mut stream1 = stream::iter(vec![1, 2, 3]).fuse();
let mut stream2 = stream::iter(vec![10, 20, 30]).fuse();
let mut stream3 = stream::iter(vec![100, 200, 300]).fuse();
let mut results = vec![];
loop {
select_biased! {
item = stream1.next() => {
if let Some(x) = item {
results.push(("stream1", x));
}
}
item = stream2.next() => {
if let Some(x) = item {
results.push(("stream2", x));
}
}
item = stream3.next() => {
if let Some(x) = item {
results.push(("stream3", x));
}
}
complete => break,
}
}
println!("Results: {:?}", results);
// stream1 has highest priority, then stream2, then stream3
}select_biased! gives explicit control over polling priority.
Fairness with Timing
use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
#[tokio::main]
async fn fairness_with_timing() {
// Stream1 produces every 10ms
let stream1 = stream::unfold(0, |count| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
Some((count, count + 1))
});
// Stream2 produces every 30ms
let stream2 = stream::unfold(0, |count| async move {
tokio::time::sleep(Duration::from_millis(30)).await;
Some((count, count + 1))
});
let mut combined = stream1.select(stream2);
// Stream1 will produce more items due to faster timing
let mut stream1_count = 0;
let mut stream2_count = 0;
for _ in 0..20 {
if let Some((_, source)) = combined.next().await.map(|x| {
// Track source somehow - simplified here
(x, "unknown")
}) {
// In real code, you'd track which stream produced the item
}
}
println!("Timing-based selection complete");
}When streams have different timing, fairness naturally emerges from readiness.
Practical Fairness Pattern
use futures::stream::{self, StreamExt};
use std::collections::VecDeque;
#[tokio::main]
async fn practical_fairness() {
// Round-robin across multiple ready streams
let streams = vec![
stream::iter(vec![1, 2, 3]).fuse(),
stream::iter(vec![10, 20, 30]).fuse(),
stream::iter(vec![100, 200, 300]).fuse(),
];
let mut results = vec![];
let mut queue: VecDeque<_> = streams.into_iter().collect();
while !queue.is_empty() {
// Rotate to give each stream a chance
queue.rotate_left(1);
// Take from front
let mut current = queue.pop_front().unwrap();
if let Some(item) = current.next().await {
results.push(item);
// Put back if not exhausted
queue.push_back(current);
}
// If exhausted, don't put back
}
println!("Round-robin results: {:?}", results);
}Manual round-robin ensures fairness across ready streams.
Understanding Poll Mechanics
use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
// A stream that logs when it's polled
struct LoggingStream {
name: &'static str,
items: Vec<i32>,
}
impl LoggingStream {
fn new(name: &'static str, items: Vec<i32>) -> Self {
LoggingStream { name, items }
}
}
impl Stream for LoggingStream {
type Item = i32;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
println!("Polling {}", self.name);
if self.items.is_empty() {
Poll::Ready(None)
} else {
let item = self.items.remove(0);
println!("{} returning {}", self.name, item);
Poll::Ready(Some(item))
}
}
}
#[tokio::main]
async fn poll_mechanics() {
let stream1 = LoggingStream::new("stream1", vec![1, 2]);
let stream2 = LoggingStream::new("stream2", vec![10, 20]);
let mut combined = stream1.select(stream2);
println!("Getting first item:");
let _ = combined.next().await;
println!("\nGetting second item:");
let _ = combined.next().await;
println!("\nGetting third item:");
let _ = combined.next().await;
// Note: select implementation details may vary
// Typically polls left (stream1) first, then right (stream2)
}Understanding poll order helps predict which stream yields first.
Buffering for Fairness
use futures::stream::{self, StreamExt};
use tokio::sync::mpsc;
#[tokio::main]
async fn buffering_for_fairness() {
// Use channels to decouple production from consumption
let (tx1, rx1) = mpsc::channel::<i32>(10);
let (tx2, rx2) = mpsc::channel::<i32>(10);
// Spawn producers
tokio::spawn(async move {
for i in 0..5 {
tx1.send(i).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
}
});
tokio::spawn(async move {
for i in 100..105 {
tx2.send(i).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
}
});
// Convert receivers to streams and select
let stream1 = rx1.map(|x| (1, x));
let stream2 = rx2.map(|x| (2, x));
let mut combined = stream1.select(stream2);
let mut from_1 = 0;
let mut from_2 = 0;
while let Some((source, _)) = combined.next().await {
if source == 1 {
from_1 += 1;
} else {
from_2 += 1;
}
}
println!("From stream 1: {}, from stream 2: {}", from_1, from_2);
}
use tokio::time::Duration;Channels with buffering provide natural fairness as producers compete.
When Fairness Matters
use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
#[tokio::main]
async fn when_fairness_matters() {
// Scenario: multiple event sources with different priorities
// Without care, high-frequency sources can starve others
// High-frequency events (every 1ms)
let high_freq = stream::unfold(0, |count| async move {
tokio::time::sleep(Duration::from_millis(1)).await;
Some(("high", count, count + 1))
});
// Low-frequency events (every 10ms)
let low_freq = stream::unfold(0, |count| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
Some(("low", count, count + 1))
});
// Critical events (every 5ms, but should be prioritized)
let critical = stream::unfold(0, |count| async move {
tokio::time::sleep(Duration::from_millis(5)).await;
Some(("critical", count, count + 1))
});
// Simple select might starve low_freq
let mut combined = high_freq.select(low_freq.select(critical));
// Process with timeout to demonstrate
let start = tokio::time::Instant::now();
let mut counts = std::collections::HashMap::new();
while start.elapsed() < Duration::from_millis(50) {
if let Some((source, _, _)) = combined.next().await {
*counts.entry(source).or_insert(0) += 1;
}
}
println!("Event counts: {:?}", counts);
}High-frequency streams can dominate in simple select scenarios.
Priority-Based Selection
use futures::stream::{self, StreamExt};
use futures::select_biased;
use tokio::time::{interval, Duration};
#[tokio::main]
async fn priority_based_selection() {
let mut high_priority = stream::iter(vec!["urgent1", "urgent2"]).fuse();
let mut normal_priority = stream::iter(vec!["normal1", "normal2", "normal3"]).fuse();
let mut low_priority = stream::iter(vec!["low1", "low2", "low3", "low4"]).fuse();
let mut results = vec![];
loop {
// Poll in priority order
select_biased! {
item = high_priority.next() => {
match item {
Some(x) => results.push(("high", x)),
None => continue,
}
}
item = normal_priority.next() => {
match item {
Some(x) => results.push(("normal", x)),
None => continue,
}
}
item = low_priority.next() => {
match item {
Some(x) => results.push(("low", x)),
None => continue,
}
}
complete => break,
}
}
println!("Priority-ordered results: {:?}", results);
// High priority items come first when available
}select_biased! enables explicit priority ordering.
Comparing Selection Strategies
| Strategy | Fairness | Priority Control | Complexity |
|---|---|---|---|
select |
First-ready biased | No | Low |
SelectAll |
First-ready biased | No | Low |
select_biased! |
Explicit order | Yes | Medium |
| Manual round-robin | Round-robin | No | High |
| Channels + select | Natural timing | Partial | Medium |
Synthesis
futures::stream::select does not guarantee fairness:
How select works:
- Polls streams in argument order
- Returns first ready item
- If both ready, first argument wins
- Order preference is implementation-defined
Fairness implications:
- Ready streams are not polled round-robin
- Earlier streams in selection have priority
- Fast streams can starve slow streams
- Timing differences naturally create fairness
Achieving fairness:
- Use
select_biased!for explicit control - Implement manual round-robin with queue rotation
- Use channels to decouple producers
- Add timing/ticks to slow down fast streams
Key insight: select is optimized for simplicity and efficiency, not fairness. The first-ready-first-served approach is efficient but can lead to starvation when one stream consistently has items ready. For scenarios requiring fair distribution across ready streams, you need explicit strategies like select_biased! with alternating preference or manual round-robin queuing. The choice depends on whether you need priority (high-priority events first) or fairness (equal opportunity for ready streams).
