Loading page…
Rust walkthroughs
Loading page…
futures::stream::select, how does fairness work among multiple streams?futures::stream::select combines multiple streams into a single stream that yields items from any of the source streams as they become available. The fairness model is not guaranteed to be perfectly fair—select polls streams in the order they were provided, returning the first ready item. This means that if multiple streams have items ready simultaneously, earlier streams in the selection have priority. For scenarios requiring round-robin fairness across ready streams, select_all combined with manual round-robin polling or futures::stream::select_with_strategy provides more control over the selection policy.
use futures::stream::{self, StreamExt, select};
use tokio::time::{interval, Duration};
#[tokio::main]
async fn basic_select() {
// Create two streams with different intervals
let stream1 = stream::repeat(1)
.then(|x| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
x
});
let stream2 = stream::repeat(2)
.then(|x| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
x
});
// select combines them into one stream
let mut combined = stream1.select(stream2);
// Take first 10 items
let mut results = vec![];
for _ in 0..10 {
if let Some(item) = combined.next().await {
results.push(item);
}
}
println!("Results: {:?}", results);
// Order depends on which stream is ready first
}select combines two streams, yielding items as either becomes ready.
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn polling_order() {
// Two streams that are always ready
let stream1 = stream::iter(vec![1, 2, 3, 4, 5]);
let stream2 = stream::iter(vec![10, 20, 30, 40, 50]);
let mut combined = stream1.select(stream2);
let mut results = vec![];
while let Some(item) = combined.next().await {
results.push(item);
}
println!("Results: {:?}", results);
// When both are ready, stream1 (first argument) gets priority
// Output: [1, 10, 2, 20, 3, 30, 4, 40, 5, 50]
// Or: [1, 2, 3, 4, 5, 10, 20, 30, 40, 50]
// Depends on implementation details of select
}When multiple streams are ready, the first stream has priority.
use futures::stream::{self, StreamExt, SelectAll};
#[tokio::main]
async fn select_all_example() {
let stream1 = stream::iter(vec![1, 2, 3]);
let stream2 = stream::iter(vec![10, 20, 30]);
let stream3 = stream::iter(vec![100, 200, 300]);
// SelectAll can combine any number of streams
let mut combined: SelectAll<_> = SelectAll::new();
combined.push(stream1);
combined.push(stream2);
combined.push(stream3);
let mut results = vec![];
while let Some(item) = combined.next().await {
results.push(item);
}
println!("Results: {:?}", results);
// Items come from whichever stream is ready
}SelectAll handles more than two streams with similar ordering behavior.
use futures::stream::{self, StreamExt};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[tokio::main]
async fn observe_unfairness() {
let counter1 = Arc::new(AtomicUsize::new(0));
let counter2 = Arc::new(AtomicUsize::new(0));
// Both streams are immediately ready
let c1 = counter1.clone();
let stream1 = stream::repeat_with(move || {
c1.fetch_add(1, Ordering::SeqCst);
1
}).take(100);
let c2 = counter2.clone();
let stream2 = stream::repeat_with(move || {
c2.fetch_add(1, Ordering::SeqCst);
2
}).take(100);
let mut combined = stream1.select(stream2);
let mut from_stream1 = 0;
let mut from_stream2 = 0;
for _ in 0..100 {
if let Some(item) = combined.next().await {
if item == 1 {
from_stream1 += 1;
} else {
from_stream2 += 1;
}
}
}
println!("From stream1: {}", from_stream1);
println!("From stream2: {}", from_stream2);
// May not be 50/50 due to polling order preference
}When both streams are ready, the first stream often yields more items.
use futures::stream::{self, StreamExt};
use futures::future::Either;
#[tokio::main]
async fn select_with_strategy() {
let stream1 = stream::iter(vec![1, 2, 3, 4, 5]);
let stream2 = stream::iter(vec![10, 20, 30, 40, 50]);
// Custom strategy for fairness
let mut prefer_stream1 = true;
let strategy = |_: &mut ()| {
// Alternate preference
prefer_stream1 = !prefer_stream1;
if prefer_stream1 {
// Prefer left (stream1)
Either::Left(())
} else {
// Prefer right (stream2)
Either::Right(())
}
};
// Note: select_with_strategy is more complex
// Here's a simpler approach with manual alternation
let mut stream1 = stream1.fuse();
let mut stream2 = stream2.fuse();
let mut results = vec![];
let mut prefer_first = true;
loop {
let item = if prefer_first {
futures::select_biased! {
item = stream1.next() => item,
item = stream2.next() => item,
complete => break,
}
} else {
futures::select_biased! {
item = stream2.next() => item,
item = stream1.next() => item,
complete => break,
}
};
if let Some(x) = item {
results.push(x);
}
prefer_first = !prefer_first;
}
println!("Alternating results: {:?}", results);
}select_biased! allows controlling poll order explicitly.
use futures::stream::{self, StreamExt};
use futures::select_biased;
#[tokio::main]
async fn select_biased_example() {
let mut stream1 = stream::iter(vec![1, 2, 3]).fuse();
let mut stream2 = stream::iter(vec![10, 20, 30]).fuse();
let mut stream3 = stream::iter(vec![100, 200, 300]).fuse();
let mut results = vec![];
loop {
select_biased! {
item = stream1.next() => {
if let Some(x) = item {
results.push(("stream1", x));
}
}
item = stream2.next() => {
if let Some(x) = item {
results.push(("stream2", x));
}
}
item = stream3.next() => {
if let Some(x) = item {
results.push(("stream3", x));
}
}
complete => break,
}
}
println!("Results: {:?}", results);
// stream1 has highest priority, then stream2, then stream3
}select_biased! gives explicit control over polling priority.
use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
#[tokio::main]
async fn fairness_with_timing() {
// Stream1 produces every 10ms
let stream1 = stream::unfold(0, |count| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
Some((count, count + 1))
});
// Stream2 produces every 30ms
let stream2 = stream::unfold(0, |count| async move {
tokio::time::sleep(Duration::from_millis(30)).await;
Some((count, count + 1))
});
let mut combined = stream1.select(stream2);
// Stream1 will produce more items due to faster timing
let mut stream1_count = 0;
let mut stream2_count = 0;
for _ in 0..20 {
if let Some((_, source)) = combined.next().await.map(|x| {
// Track source somehow - simplified here
(x, "unknown")
}) {
// In real code, you'd track which stream produced the item
}
}
println!("Timing-based selection complete");
}When streams have different timing, fairness naturally emerges from readiness.
use futures::stream::{self, StreamExt};
use std::collections::VecDeque;
#[tokio::main]
async fn practical_fairness() {
// Round-robin across multiple ready streams
let streams = vec![
stream::iter(vec![1, 2, 3]).fuse(),
stream::iter(vec![10, 20, 30]).fuse(),
stream::iter(vec![100, 200, 300]).fuse(),
];
let mut results = vec![];
let mut queue: VecDeque<_> = streams.into_iter().collect();
while !queue.is_empty() {
// Rotate to give each stream a chance
queue.rotate_left(1);
// Take from front
let mut current = queue.pop_front().unwrap();
if let Some(item) = current.next().await {
results.push(item);
// Put back if not exhausted
queue.push_back(current);
}
// If exhausted, don't put back
}
println!("Round-robin results: {:?}", results);
}Manual round-robin ensures fairness across ready streams.
use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
// A stream that logs when it's polled
struct LoggingStream {
name: &'static str,
items: Vec<i32>,
}
impl LoggingStream {
fn new(name: &'static str, items: Vec<i32>) -> Self {
LoggingStream { name, items }
}
}
impl Stream for LoggingStream {
type Item = i32;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
println!("Polling {}", self.name);
if self.items.is_empty() {
Poll::Ready(None)
} else {
let item = self.items.remove(0);
println!("{} returning {}", self.name, item);
Poll::Ready(Some(item))
}
}
}
#[tokio::main]
async fn poll_mechanics() {
let stream1 = LoggingStream::new("stream1", vec![1, 2]);
let stream2 = LoggingStream::new("stream2", vec![10, 20]);
let mut combined = stream1.select(stream2);
println!("Getting first item:");
let _ = combined.next().await;
println!("\nGetting second item:");
let _ = combined.next().await;
println!("\nGetting third item:");
let _ = combined.next().await;
// Note: select implementation details may vary
// Typically polls left (stream1) first, then right (stream2)
}Understanding poll order helps predict which stream yields first.
use futures::stream::{self, StreamExt};
use tokio::sync::mpsc;
#[tokio::main]
async fn buffering_for_fairness() {
// Use channels to decouple production from consumption
let (tx1, rx1) = mpsc::channel::<i32>(10);
let (tx2, rx2) = mpsc::channel::<i32>(10);
// Spawn producers
tokio::spawn(async move {
for i in 0..5 {
tx1.send(i).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
}
});
tokio::spawn(async move {
for i in 100..105 {
tx2.send(i).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
}
});
// Convert receivers to streams and select
let stream1 = rx1.map(|x| (1, x));
let stream2 = rx2.map(|x| (2, x));
let mut combined = stream1.select(stream2);
let mut from_1 = 0;
let mut from_2 = 0;
while let Some((source, _)) = combined.next().await {
if source == 1 {
from_1 += 1;
} else {
from_2 += 1;
}
}
println!("From stream 1: {}, from stream 2: {}", from_1, from_2);
}
use tokio::time::Duration;Channels with buffering provide natural fairness as producers compete.
use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
#[tokio::main]
async fn when_fairness_matters() {
// Scenario: multiple event sources with different priorities
// Without care, high-frequency sources can starve others
// High-frequency events (every 1ms)
let high_freq = stream::unfold(0, |count| async move {
tokio::time::sleep(Duration::from_millis(1)).await;
Some(("high", count, count + 1))
});
// Low-frequency events (every 10ms)
let low_freq = stream::unfold(0, |count| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
Some(("low", count, count + 1))
});
// Critical events (every 5ms, but should be prioritized)
let critical = stream::unfold(0, |count| async move {
tokio::time::sleep(Duration::from_millis(5)).await;
Some(("critical", count, count + 1))
});
// Simple select might starve low_freq
let mut combined = high_freq.select(low_freq.select(critical));
// Process with timeout to demonstrate
let start = tokio::time::Instant::now();
let mut counts = std::collections::HashMap::new();
while start.elapsed() < Duration::from_millis(50) {
if let Some((source, _, _)) = combined.next().await {
*counts.entry(source).or_insert(0) += 1;
}
}
println!("Event counts: {:?}", counts);
}High-frequency streams can dominate in simple select scenarios.
use futures::stream::{self, StreamExt};
use futures::select_biased;
use tokio::time::{interval, Duration};
#[tokio::main]
async fn priority_based_selection() {
let mut high_priority = stream::iter(vec!["urgent1", "urgent2"]).fuse();
let mut normal_priority = stream::iter(vec!["normal1", "normal2", "normal3"]).fuse();
let mut low_priority = stream::iter(vec!["low1", "low2", "low3", "low4"]).fuse();
let mut results = vec![];
loop {
// Poll in priority order
select_biased! {
item = high_priority.next() => {
match item {
Some(x) => results.push(("high", x)),
None => continue,
}
}
item = normal_priority.next() => {
match item {
Some(x) => results.push(("normal", x)),
None => continue,
}
}
item = low_priority.next() => {
match item {
Some(x) => results.push(("low", x)),
None => continue,
}
}
complete => break,
}
}
println!("Priority-ordered results: {:?}", results);
// High priority items come first when available
}select_biased! enables explicit priority ordering.
| Strategy | Fairness | Priority Control | Complexity |
|----------|----------|------------------|------------|
| select | First-ready biased | No | Low |
| SelectAll | First-ready biased | No | Low |
| select_biased! | Explicit order | Yes | Medium |
| Manual round-robin | Round-robin | No | High |
| Channels + select | Natural timing | Partial | Medium |
futures::stream::select does not guarantee fairness:
How select works:
Fairness implications:
Achieving fairness:
select_biased! for explicit controlKey insight: select is optimized for simplicity and efficiency, not fairness. The first-ready-first-served approach is efficient but can lead to starvation when one stream consistently has items ready. For scenarios requiring fair distribution across ready streams, you need explicit strategies like select_biased! with alternating preference or manual round-robin queuing. The choice depends on whether you need priority (high-priority events first) or fairness (equal opportunity for ready streams).