When using futures::stream::select, how does fairness work among multiple streams?

futures::stream::select combines multiple streams into a single stream that yields items from any of the source streams as they become available. The fairness model is not guaranteed to be perfectly fair—select polls streams in the order they were provided, returning the first ready item. This means that if multiple streams have items ready simultaneously, earlier streams in the selection have priority. For scenarios requiring round-robin fairness across ready streams, select_all combined with manual round-robin polling or futures::stream::select_with_strategy provides more control over the selection policy.

Basic select Usage

use futures::stream::{self, StreamExt, select};
use tokio::time::{interval, Duration};
 
#[tokio::main]
async fn basic_select() {
    // Create two streams with different intervals
    let stream1 = stream::repeat(1)
        .then(|x| async move {
            tokio::time::sleep(Duration::from_millis(10)).await;
            x
        });
    
    let stream2 = stream::repeat(2)
        .then(|x| async move {
            tokio::time::sleep(Duration::from_millis(10)).await;
            x
        });
    
    // select combines them into one stream
    let mut combined = stream1.select(stream2);
    
    // Take first 10 items
    let mut results = vec![];
    for _ in 0..10 {
        if let Some(item) = combined.next().await {
            results.push(item);
        }
    }
    
    println!("Results: {:?}", results);
    // Order depends on which stream is ready first
}

select combines two streams, yielding items as either becomes ready.

Polling Order and Priority

use futures::stream::{self, StreamExt};
 
#[tokio::main]
async fn polling_order() {
    // Two streams that are always ready
    let stream1 = stream::iter(vec![1, 2, 3, 4, 5]);
    let stream2 = stream::iter(vec![10, 20, 30, 40, 50]);
    
    let mut combined = stream1.select(stream2);
    
    let mut results = vec![];
    while let Some(item) = combined.next().await {
        results.push(item);
    }
    
    println!("Results: {:?}", results);
    // When both are ready, stream1 (first argument) gets priority
    // Output: [1, 10, 2, 20, 3, 30, 4, 40, 5, 50]
    // Or: [1, 2, 3, 4, 5, 10, 20, 30, 40, 50]
    // Depends on implementation details of select
}

When multiple streams are ready, the first stream has priority.

select_all for Multiple Streams

use futures::stream::{self, StreamExt, SelectAll};
 
#[tokio::main]
async fn select_all_example() {
    let stream1 = stream::iter(vec![1, 2, 3]);
    let stream2 = stream::iter(vec![10, 20, 30]);
    let stream3 = stream::iter(vec![100, 200, 300]);
    
    // SelectAll can combine any number of streams
    let mut combined: SelectAll<_> = SelectAll::new();
    combined.push(stream1);
    combined.push(stream2);
    combined.push(stream3);
    
    let mut results = vec![];
    while let Some(item) = combined.next().await {
        results.push(item);
    }
    
    println!("Results: {:?}", results);
    // Items come from whichever stream is ready
}

SelectAll handles more than two streams with similar ordering behavior.

Observing Unfairness

use futures::stream::{self, StreamExt};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
 
#[tokio::main]
async fn observe_unfairness() {
    let counter1 = Arc::new(AtomicUsize::new(0));
    let counter2 = Arc::new(AtomicUsize::new(0));
    
    // Both streams are immediately ready
    let c1 = counter1.clone();
    let stream1 = stream::repeat_with(move || {
        c1.fetch_add(1, Ordering::SeqCst);
        1
    }).take(100);
    
    let c2 = counter2.clone();
    let stream2 = stream::repeat_with(move || {
        c2.fetch_add(1, Ordering::SeqCst);
        2
    }).take(100);
    
    let mut combined = stream1.select(stream2);
    
    let mut from_stream1 = 0;
    let mut from_stream2 = 0;
    
    for _ in 0..100 {
        if let Some(item) = combined.next().await {
            if item == 1 {
                from_stream1 += 1;
            } else {
                from_stream2 += 1;
            }
        }
    }
    
    println!("From stream1: {}", from_stream1);
    println!("From stream2: {}", from_stream2);
    // May not be 50/50 due to polling order preference
}

When both streams are ready, the first stream often yields more items.

Using select_with_strategy

use futures::stream::{self, StreamExt};
use futures::future::Either;
 
#[tokio::main]
async fn select_with_strategy() {
    let stream1 = stream::iter(vec![1, 2, 3, 4, 5]);
    let stream2 = stream::iter(vec![10, 20, 30, 40, 50]);
    
    // Custom strategy for fairness
    let mut prefer_stream1 = true;
    let strategy = |_: &mut ()| {
        // Alternate preference
        prefer_stream1 = !prefer_stream1;
        if prefer_stream1 {
            // Prefer left (stream1)
            Either::Left(())
        } else {
            // Prefer right (stream2)
            Either::Right(())
        }
    };
    
    // Note: select_with_strategy is more complex
    // Here's a simpler approach with manual alternation
    
    let mut stream1 = stream1.fuse();
    let mut stream2 = stream2.fuse();
    let mut results = vec![];
    
    let mut prefer_first = true;
    
    loop {
        let item = if prefer_first {
            futures::select_biased! {
                item = stream1.next() => item,
                item = stream2.next() => item,
                complete => break,
            }
        } else {
            futures::select_biased! {
                item = stream2.next() => item,
                item = stream1.next() => item,
                complete => break,
            }
        };
        
        if let Some(x) = item {
            results.push(x);
        }
        prefer_first = !prefer_first;
    }
    
    println!("Alternating results: {:?}", results);
}

select_biased! allows controlling poll order explicitly.

select_biased Macro

use futures::stream::{self, StreamExt};
use futures::select_biased;
 
#[tokio::main]
async fn select_biased_example() {
    let mut stream1 = stream::iter(vec![1, 2, 3]).fuse();
    let mut stream2 = stream::iter(vec![10, 20, 30]).fuse();
    let mut stream3 = stream::iter(vec![100, 200, 300]).fuse();
    
    let mut results = vec![];
    
    loop {
        select_biased! {
            item = stream1.next() => {
                if let Some(x) = item {
                    results.push(("stream1", x));
                }
            }
            item = stream2.next() => {
                if let Some(x) = item {
                    results.push(("stream2", x));
                }
            }
            item = stream3.next() => {
                if let Some(x) = item {
                    results.push(("stream3", x));
                }
            }
            complete => break,
        }
    }
    
    println!("Results: {:?}", results);
    // stream1 has highest priority, then stream2, then stream3
}

select_biased! gives explicit control over polling priority.

Fairness with Timing

use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
 
#[tokio::main]
async fn fairness_with_timing() {
    // Stream1 produces every 10ms
    let stream1 = stream::unfold(0, |count| async move {
        tokio::time::sleep(Duration::from_millis(10)).await;
        Some((count, count + 1))
    });
    
    // Stream2 produces every 30ms
    let stream2 = stream::unfold(0, |count| async move {
        tokio::time::sleep(Duration::from_millis(30)).await;
        Some((count, count + 1))
    });
    
    let mut combined = stream1.select(stream2);
    
    // Stream1 will produce more items due to faster timing
    let mut stream1_count = 0;
    let mut stream2_count = 0;
    
    for _ in 0..20 {
        if let Some((_, source)) = combined.next().await.map(|x| {
            // Track source somehow - simplified here
            (x, "unknown")
        }) {
            // In real code, you'd track which stream produced the item
        }
    }
    
    println!("Timing-based selection complete");
}

When streams have different timing, fairness naturally emerges from readiness.

Practical Fairness Pattern

use futures::stream::{self, StreamExt};
use std::collections::VecDeque;
 
#[tokio::main]
async fn practical_fairness() {
    // Round-robin across multiple ready streams
    let streams = vec![
        stream::iter(vec![1, 2, 3]).fuse(),
        stream::iter(vec![10, 20, 30]).fuse(),
        stream::iter(vec![100, 200, 300]).fuse(),
    ];
    
    let mut results = vec![];
    let mut queue: VecDeque<_> = streams.into_iter().collect();
    
    while !queue.is_empty() {
        // Rotate to give each stream a chance
        queue.rotate_left(1);
        
        // Take from front
        let mut current = queue.pop_front().unwrap();
        
        if let Some(item) = current.next().await {
            results.push(item);
            // Put back if not exhausted
            queue.push_back(current);
        }
        // If exhausted, don't put back
    }
    
    println!("Round-robin results: {:?}", results);
}

Manual round-robin ensures fairness across ready streams.

Understanding Poll Mechanics

use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
 
// A stream that logs when it's polled
struct LoggingStream {
    name: &'static str,
    items: Vec<i32>,
}
 
impl LoggingStream {
    fn new(name: &'static str, items: Vec<i32>) -> Self {
        LoggingStream { name, items }
    }
}
 
impl Stream for LoggingStream {
    type Item = i32;
    
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        println!("Polling {}", self.name);
        if self.items.is_empty() {
            Poll::Ready(None)
        } else {
            let item = self.items.remove(0);
            println!("{} returning {}", self.name, item);
            Poll::Ready(Some(item))
        }
    }
}
 
#[tokio::main]
async fn poll_mechanics() {
    let stream1 = LoggingStream::new("stream1", vec![1, 2]);
    let stream2 = LoggingStream::new("stream2", vec![10, 20]);
    
    let mut combined = stream1.select(stream2);
    
    println!("Getting first item:");
    let _ = combined.next().await;
    
    println!("\nGetting second item:");
    let _ = combined.next().await;
    
    println!("\nGetting third item:");
    let _ = combined.next().await;
    
    // Note: select implementation details may vary
    // Typically polls left (stream1) first, then right (stream2)
}

Understanding poll order helps predict which stream yields first.

Buffering for Fairness

use futures::stream::{self, StreamExt};
use tokio::sync::mpsc;
 
#[tokio::main]
async fn buffering_for_fairness() {
    // Use channels to decouple production from consumption
    let (tx1, rx1) = mpsc::channel::<i32>(10);
    let (tx2, rx2) = mpsc::channel::<i32>(10);
    
    // Spawn producers
    tokio::spawn(async move {
        for i in 0..5 {
            tx1.send(i).await.unwrap();
            tokio::time::sleep(Duration::from_millis(1)).await;
        }
    });
    
    tokio::spawn(async move {
        for i in 100..105 {
            tx2.send(i).await.unwrap();
            tokio::time::sleep(Duration::from_millis(1)).await;
        }
    });
    
    // Convert receivers to streams and select
    let stream1 = rx1.map(|x| (1, x));
    let stream2 = rx2.map(|x| (2, x));
    
    let mut combined = stream1.select(stream2);
    
    let mut from_1 = 0;
    let mut from_2 = 0;
    
    while let Some((source, _)) = combined.next().await {
        if source == 1 {
            from_1 += 1;
        } else {
            from_2 += 1;
        }
    }
    
    println!("From stream 1: {}, from stream 2: {}", from_1, from_2);
}
 
use tokio::time::Duration;

Channels with buffering provide natural fairness as producers compete.

When Fairness Matters

use futures::stream::{self, StreamExt};
use tokio::time::{interval, Duration};
 
#[tokio::main]
async fn when_fairness_matters() {
    // Scenario: multiple event sources with different priorities
    // Without care, high-frequency sources can starve others
    
    // High-frequency events (every 1ms)
    let high_freq = stream::unfold(0, |count| async move {
        tokio::time::sleep(Duration::from_millis(1)).await;
        Some(("high", count, count + 1))
    });
    
    // Low-frequency events (every 10ms)
    let low_freq = stream::unfold(0, |count| async move {
        tokio::time::sleep(Duration::from_millis(10)).await;
        Some(("low", count, count + 1))
    });
    
    // Critical events (every 5ms, but should be prioritized)
    let critical = stream::unfold(0, |count| async move {
        tokio::time::sleep(Duration::from_millis(5)).await;
        Some(("critical", count, count + 1))
    });
    
    // Simple select might starve low_freq
    let mut combined = high_freq.select(low_freq.select(critical));
    
    // Process with timeout to demonstrate
    let start = tokio::time::Instant::now();
    let mut counts = std::collections::HashMap::new();
    
    while start.elapsed() < Duration::from_millis(50) {
        if let Some((source, _, _)) = combined.next().await {
            *counts.entry(source).or_insert(0) += 1;
        }
    }
    
    println!("Event counts: {:?}", counts);
}

High-frequency streams can dominate in simple select scenarios.

Priority-Based Selection

use futures::stream::{self, StreamExt};
use futures::select_biased;
use tokio::time::{interval, Duration};
 
#[tokio::main]
async fn priority_based_selection() {
    let mut high_priority = stream::iter(vec!["urgent1", "urgent2"]).fuse();
    let mut normal_priority = stream::iter(vec!["normal1", "normal2", "normal3"]).fuse();
    let mut low_priority = stream::iter(vec!["low1", "low2", "low3", "low4"]).fuse();
    
    let mut results = vec![];
    
    loop {
        // Poll in priority order
        select_biased! {
            item = high_priority.next() => {
                match item {
                    Some(x) => results.push(("high", x)),
                    None => continue,
                }
            }
            item = normal_priority.next() => {
                match item {
                    Some(x) => results.push(("normal", x)),
                    None => continue,
                }
            }
            item = low_priority.next() => {
                match item {
                    Some(x) => results.push(("low", x)),
                    None => continue,
                }
            }
            complete => break,
        }
    }
    
    println!("Priority-ordered results: {:?}", results);
    // High priority items come first when available
}

select_biased! enables explicit priority ordering.

Comparing Selection Strategies

Strategy Fairness Priority Control Complexity
select First-ready biased No Low
SelectAll First-ready biased No Low
select_biased! Explicit order Yes Medium
Manual round-robin Round-robin No High
Channels + select Natural timing Partial Medium

Synthesis

futures::stream::select does not guarantee fairness:

How select works:

  • Polls streams in argument order
  • Returns first ready item
  • If both ready, first argument wins
  • Order preference is implementation-defined

Fairness implications:

  • Ready streams are not polled round-robin
  • Earlier streams in selection have priority
  • Fast streams can starve slow streams
  • Timing differences naturally create fairness

Achieving fairness:

  • Use select_biased! for explicit control
  • Implement manual round-robin with queue rotation
  • Use channels to decouple producers
  • Add timing/ticks to slow down fast streams

Key insight: select is optimized for simplicity and efficiency, not fairness. The first-ready-first-served approach is efficient but can lead to starvation when one stream consistently has items ready. For scenarios requiring fair distribution across ready streams, you need explicit strategies like select_biased! with alternating preference or manual round-robin queuing. The choice depends on whether you need priority (high-priority events first) or fairness (equal opportunity for ready streams).