What is the purpose of futures::stream::Peekable for lookahead in stream processing?

The futures::stream::Peekable wrapper provides lookahead capability for streams, allowing examination of the next item without consuming it. Unlike a regular stream where calling next removes items permanently, a Peekable stream caches the peeked item internally, returning it on the subsequent next call. This enables parsing patterns where you need to look ahead to determine how to process the current context—essential for implementing parsers that need to distinguish between token sequences, protocols where message boundaries depend on upcoming data, or any streaming algorithm where decisions require visibility into future items without committing to consumption.

Basic Stream Without Lookahead

use futures::stream::{self, Stream};
 
async fn stream_without_peek() {
    let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);
    
    // Each next() consumes an item
    assert_eq!(stream.next().await, Some(1));
    assert_eq!(stream.next().await, Some(2));
    
    // No way to examine items without consuming them
    // Once consumed, items are gone
}

Regular streams only move forward; items cannot be examined without consumption.

Creating a Peekable Stream

use futures::stream::{self, Stream, StreamExt};
 
async fn creating_peekable() {
    let stream = stream::iter(vec![1, 2, 3, 4, 5]);
    
    // Wrap stream in Peekable for lookahead capability
    let mut peekable = stream.peekable();
    
    // peek() returns reference to next item without consuming
    let peeked = peekable.peek().await;
    assert_eq!(peeked, Some(&1));
    
    // Item is still available for next()
    assert_eq!(peekable.next().await, Some(1));
    
    // Can peek again
    assert_eq!(peekable.peek().await, Some(&2));
}

peekable() wraps any stream, adding the peek method for lookahead.

peek vs next Behavior

use futures::stream::{self, StreamExt};
 
async fn peek_vs_next() {
    let stream = stream::iter(vec!["a", "b", "c"]);
    let mut peekable = stream.peekable();
    
    // peek returns Option<&Item> - reference to cached item
    let peeked: Option<&&str> = peekable.peek().await;
    assert_eq!(peeked, Some(&"a"));
    
    // peek again returns same cached item
    let peeked_again: Option<&&str> = peekable.peek().await;
    assert_eq!(peeked_again, Some(&"a"));
    
    // next consumes and returns the cached item
    let consumed: Option<&str> = peekable.next().await;
    assert_eq!(consumed, Some("a"));
    
    // Now peek returns next item
    let peeked: Option<&&str> = peekable.peek().await;
    assert_eq!(peeked, Some(&"b"));
}

Multiple peeks return references to the same cached item until next consumes it.

Peekable Internal State

use futures::stream::{self, StreamExt};
 
async fn internal_state() {
    let stream = stream::iter(vec![1, 2, 3]);
    let mut peekable = stream.peekable();
    
    // Initially, Peekable has no cached item
    // First peek triggers next() on underlying stream
    
    let first_peek = peekable.peek().await;
    // Internal state: cached = Some(1)
    
    let second_peek = peekable.peek().await;
    // Internal state: cached = Some(1) (unchanged)
    
    let _ = peekable.next().await;
    // Internal state: cached = None (cleared after next)
    
    // Next peek fetches new item
    let third_peek = peekable.peek().await;
    // Internal state: cached = Some(2)
    assert_eq!(third_peek, Some(&2));
}

Peekable maintains an internal cache of one item for lookahead.

Parsing with Lookahead

use futures::stream::{self, StreamExt};
 
#[derive(Debug, PartialEq)]
enum Token {
    Number(i32),
    Plus,
    Minus,
    Equals,
    EqualsEquals, // ==
}
 
async fn parse_tokens() {
    let tokens = vec![
        Token::Number(1),
        Token::Equals,
        Token::Equals,     // Should become EqualsEquals
        Token::Number(2),
    ];
    
    let stream = stream::iter(tokens);
    let mut peekable = stream.peekable();
    
    let mut parsed = Vec::new();
    
    while let Some(token) = peekable.next().await {
        match token {
            Token::Equals => {
                // Look ahead to see if next is also Equals
                if let Some(&Token::Equals) = peekable.peek().await {
                    // Consume the second Equals
                    peekable.next().await;
                    parsed.push(Token::EqualsEquals);
                } else {
                    parsed.push(Token::Equals);
                }
            }
            other => parsed.push(other),
        }
    }
    
    assert_eq!(parsed, vec![
        Token::Number(1),
        Token::EqualsEquals,
        Token::Number(2),
    ]);
}

Peek enables merging consecutive tokens without committing to consumption.

Conditional Consumption Based on Peek

use futures::stream::{self, StreamExt};
 
async fn conditional_consumption() {
    let numbers = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
    let mut peekable = numbers.peekable();
    
    let mut consumed = Vec::new();
    
    // Consume items until we see a number divisible by 5
    while let Some(&n) = peekable.peek().await {
        if n % 5 == 0 {
            // Stop before the divisible-by-5 number
            break;
        }
        consumed.push(peekable.next().await.unwrap());
    }
    
    assert_eq!(consumed, vec![1, 2, 3, 4]);
    
    // The 5 is still available
    assert_eq!(peekable.peek().await, Some(&5));
    assert_eq!(peekable.next().await, Some(5));
}

Peek allows stopping based on upcoming items without consuming them.

Peek Mut for In-Place Modification

use futures::stream::{self, StreamExt};
 
async fn peek_mut() {
    let stream = stream::iter(vec![1, 2, 3, 4, 5]);
    let mut peekable = stream.peekable();
    
    // peek_mut returns mutable reference
    if let Some(value) = peekable.peek_mut().await {
        *value *= 10; // Modify the cached item
    }
    
    // The modified value is returned by next()
    assert_eq!(peekable.next().await, Some(10));
    assert_eq!(peekable.next().await, Some(2));
}

peek_mut allows modifying the cached item before consumption.

Multi-Token Lookahead Pattern

use futures::stream::{self, StreamExt};
 
async fn multi_token_lookahead() {
    // Peekable only caches one item, but we can consume and track
    let stream = stream::iter(vec![1, 2, 3, 4, 5]);
    let mut peekable = stream.peekable();
    
    // For multi-item lookahead, manually track consumed items
    let mut lookahead_buffer: Vec<i32> = Vec::new();
    
    // Pre-fill buffer with items we might need to put back
    for _ in 0..3 {
        if let Some(item) = peekable.next().await {
            lookahead_buffer.push(item);
        }
    }
    
    // Now we have 3 items of lookahead in buffer
    println!("Lookahead buffer: {:?}", lookahead_buffer);
    assert_eq!(lookahead_buffer, vec![1, 2, 3]);
    
    // Process items, potentially putting some back conceptually
    // (though Peekable doesn't support push_back)
}

For multi-item lookahead, combine Peekable with manual buffering.

Parser State Machine Pattern

use futures::stream::{self, StreamExt};
 
#[derive(Debug, Clone, PartialEq)]
enum ParseEvent {
    StartElement(String),
    EndElement(String),
    Text(String),
}
 
async fn parse_xml_like() {
    let events = vec![
        ParseEvent::StartElement("div".to_string()),
        ParseEvent::Text("hello".to_string()),
        ParseEvent::EndElement("div".to_string()),
        ParseEvent::StartElement("span".to_string()),
        ParseEvent::EndElement("span".to_string()),
    ];
    
    let stream = stream::iter(events);
    let mut peekable = stream.peekable();
    
    let mut depth = 0;
    let mut processed = Vec::new();
    
    while let Some(event) = peekable.next().await {
        match event {
            ParseEvent::StartElement(name) => {
                depth += 1;
                
                // Peek to see if next is EndElement (empty element optimization)
                if let Some(ParseEvent::EndElement(end_name)) = peekable.peek().await {
                    if end_name == &name {
                        // Empty element, consume end tag too
                        peekable.next().await;
                        processed.push(format!("Empty element: {}", name));
                        depth -= 1;
                        continue;
                    }
                }
                
                processed.push(format!("Start: {}", name));
            }
            ParseEvent::EndElement(name) => {
                depth -= 1;
                processed.push(format!("End: {}", name));
            }
            ParseEvent::Text(text) => {
                processed.push(format!("Text: {}", text));
            }
        }
    }
    
    println!("Processed: {:?}", processed);
}

Peek enables context-sensitive parsing based on upcoming tokens.

Protocol Delimiter Detection

use futures::stream::{self, StreamExt};
 
async fn detect_message_boundary() {
    // Protocol: messages end with newline followed by digit
    let bytes: Vec<u8> = b"message1\n1message2\n2\nother".to_vec();
    let stream = stream::iter(bytes);
    let mut peekable = stream.peekable();
    
    let mut messages: Vec<String> = Vec::new();
    let mut current = String::new();
    
    while let Some(&byte) = peekable.peek().await {
        peekable.next().await;
        
        if byte == b'\n' {
            // Peek to check if next is digit (message boundary)
            if let Some(&next) = peekable.peek().await {
                if next.is_ascii_digit() {
                    // Message complete
                    messages.push(current.clone());
                    current.clear();
                    continue;
                }
            }
        }
        
        current.push(byte as char);
    }
    
    // Handle remaining
    if !current.is_empty() {
        messages.push(current);
    }
    
    // Note: This is simplified - actual protocol parsing would be more complex
}

Peek allows detecting message boundaries based on content patterns.

Combining Peek with take_while

use futures::stream::{self, StreamExt};
 
async fn peek_with_take_while() {
    let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 1, 2, 3]);
    let mut peekable = stream.peekable();
    
    // Take while numbers are ascending
    // But peek allows checking without consuming
    
    let mut ascending = Vec::new();
    let mut prev = 0;
    
    while let Some(&current) = peekable.peek().await {
        if current > prev {
            ascending.push(peekable.next().await.unwrap());
            prev = current;
        } else {
            break;
        }
    }
    
    assert_eq!(ascending, vec![1, 2, 3, 4, 5, 6]);
    
    // Stream continues at 1
    assert_eq!(peekable.next().await, Some(1));
}

Peek enables conditional continuation without consumption side effects.

Error Recovery with Peek

use futures::stream::{self, StreamExt};
 
#[derive(Debug)]
enum ParseError {
    UnexpectedToken(i32),
    EndOfStream,
}
 
async fn parse_with_recovery() -> Result<Vec<i32>, ParseError> {
    let stream = stream::iter(vec![1, -1, 2, 3, -1, 4, 5]);
    let mut peekable = stream.peekable();
    
    let mut result = Vec::new();
    
    loop {
        match peekable.peek().await {
            Some(&n) if *n > 0 => {
                result.push(peekable.next().await.unwrap());
            }
            Some(&-1) => {
                // Skip error marker
                peekable.next().await;
                println!("Skipped error marker");
            }
            Some(&n) if *n < 0 => {
                // Unexpected negative number
                peekable.next().await; // Skip it
                println!("Recovered from unexpected: {}", n);
            }
            None => break,
            _ => {}
        }
    }
    
    Ok(result)
}
 
#[tokio::main]
async fn main() {
    let result = parse_with_recovery().await;
    assert_eq!(result, Ok(vec![1, 2, 3, 4, 5]));
}

Peek allows examining tokens for error recovery without committing.

Peekable with Async Stream

use futures::stream::{self, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
 
async fn async_peekable() {
    // Create an async channel
    let (tx, rx) = tokio::sync::mpsc::channel::<i32>(10);
    
    // Convert receiver to stream, then to peekable
    let stream = ReceiverStream::new(rx);
    let mut peekable = stream.peekable();
    
    // Send some values
    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    tx.send(3).await.unwrap();
    drop(tx); // Close sender
    
    // Can peek async values
    assert_eq!(peekable.peek().await, Some(&1));
    assert_eq!(peekable.next().await, Some(1));
    assert_eq!(peekable.peek().await, Some(&2));
}

Peekable works with any stream, including async channel streams.

Memory and Performance Considerations

use futures::stream::{self, StreamExt};
 
async fn performance_characteristics() {
    let stream = stream::iter(0..1_000_000);
    let mut peekable = stream.peekable();
    
    // Peekable adds minimal overhead:
    // - One item buffer (cached value)
    // - Boolean flag for whether cache is populated
    // - No heap allocation per peek
    
    // Peeking the same item repeatedly is cheap
    for _ in 0..10 {
        let _ = peekable.peek().await;
    }
    // All 10 peeks return reference to same cached item
    
    // Only one underlying stream fetch occurred
    assert_eq!(peekable.next().await, Some(0));
    
    // The peek is O(1) - just checking the cache
    // If cache empty, triggers one next() on underlying stream
}

Peekable is lightweight, buffering at most one item.

When Not to Use Peekable

use futures::stream::{self, StreamExt};
 
async fn when_not_to_use() {
    // Don't use Peekable when:
    
    // 1. You always consume items anyway
    let stream = stream::iter(vec![1, 2, 3]);
    let mut peekable = stream.peekable();
    while let Some(n) = peekable.next().await {
        // If you never peek, Peekable adds overhead for no benefit
        println!("{}", n);
    }
    
    // 2. You need more than one item of lookahead
    // Peekable only caches one item; use manual buffering for multi-item
    
    // 3. You need to push items back
    // Peekable can't put items back onto stream
    
    // 4. Random access is needed
    // Peekable is still sequential; use Vec if random access needed
}

Use Peekable when you need single-item lookahead, not as a default wrapper.

Real-World Example: Line-Based Protocol

use futures::stream::{self, StreamExt};
use std::io::{BufRead, BufReader, Cursor};
 
async fn line_protocol_parser() {
    // Protocol: lines with continuation starting with whitespace
    let input = "START\n  continuation\n  more\nEND\nSTART2\nEND2\n";
    let lines: Vec<&str> = input.lines().collect();
    
    let stream = stream::iter(lines);
    let mut peekable = stream.peekable();
    
    let mut messages: Vec<Vec<String>> = Vec::new();
    let mut current_message: Vec<String> = Vec::new();
    
    while let Some(line) = peekable.next().await {
        if line.starts_with("START") {
            current_message.push(line.to_string());
            
            // Peek ahead for continuation lines
            while let Some(&next_line) = peekable.peek().await {
                if next_line.starts_with("  ") {
                    // Continuation line
                    current_message.push(peekable.next().await.unwrap().to_string());
                } else {
                    break;
                }
            }
            
            messages.push(current_message.clone());
            current_message.clear();
        }
    }
    
    // messages[0] = ["START", "  continuation", "  more"]
    // messages[1] = ["START2"]
}

Peek allows distinguishing continuation lines from new messages.

Real-World Example: Expression Parser

use futures::stream::{self, StreamExt};
 
#[derive(Debug, Clone, PartialEq)]
enum Token {
    Num(i64),
    Plus,
    Minus,
    Star,
    Slash,
    LParen,
    RParen,
}
 
async fn parse_expression() {
    // Parse: 1 + 2 * (3 - 4)
    let tokens = vec![
        Token::Num(1),
        Token::Plus,
        Token::Num(2),
        Token::Star,
        Token::LParen,
        Token::Num(3),
        Token::Minus,
        Token::Num(4),
        Token::RParen,
    ];
    
    let stream = stream::iter(tokens);
    let mut peekable = stream.peekable();
    
    // Parser needs lookahead for operator precedence
    // e.g., after seeing Num(2), need to peek to see if next is * or +
    
    fn parse_term(peekable: &mut futures::stream::Peekable<impl StreamExt<Item = Token> + Unpin>) -> Option<i64> {
        match peekable.peek().await? {
            Token::Num(n) => {
                let n = *n;
                peekable.next().await;
                Some(n)
            }
            Token::LParen => {
                peekable.next().await;
                let result = parse_expression_internal(peekable)?;
                if peekable.peek().await == Some(&Token::RParen) {
                    peekable.next().await;
                }
                Some(result)
            }
            _ => None,
        }
    }
    
    fn parse_expression_internal(peekable: &mut futures::stream::Peekable<impl StreamExt<Item = Token> + Unpin>) -> Option<i64> {
        let mut left = parse_term(peekable)?;
        
        while let Some(&op) = peekable.peek().await {
            match op {
                Token::Plus => {
                    peekable.next().await;
                    let right = parse_term(peekable)?;
                    left += right;
                }
                Token::Minus => {
                    peekable.next().await;
                    let right = parse_term(peekable)?;
                    left -= right;
                }
                Token::Star => {
                    peekable.next().await;
                    let right = parse_term(peekable)?;
                    left *= right;
                }
                Token::Slash => {
                    peekable.next().await;
                    let right = parse_term(peekable)?;
                    left /= right;
                }
                Token::RParen => break,
                _ => break,
            }
        }
        
        Some(left)
    }
    
    // Simplified parsing demonstration
}

Expression parsing requires lookahead to handle operator precedence.

Synthesis

Peekable methods:

Method Returns Behavior
peek() Option<&Item> Returns reference without consuming
peek_mut() Option<&mut Item> Returns mutable reference
next() Option<Item> Consumes and returns cached item

Peek state machine:

State peek() next()
Cache empty Fetch from stream, cache, return ref Fetch from stream, return item
Cache populated Return ref to cached Return cached, clear cache
Stream ended Return None Return None

Common patterns:

Pattern Description
Multi-character token Peek to combine consecutive tokens
Conditional break Stop before matching item
Error recovery Examine next token, decide to skip or handle
State machine Peek to determine next state transition

Key insight: Peekable solves the fundamental limitation of streams—you cannot examine future items without consuming them. This is essential for parsing algorithms where the current token's meaning depends on what follows: distinguishing = from ==, detecting continuation lines in protocols, implementing operator precedence parsing where you need to see the operator before committing to consume operands. The implementation is lightweight, caching at most one item, making it suitable for high-performance streaming contexts. The peek_mut variant extends this to allow in-place modification of the cached item, useful for transforming items before they're processed downstream. While Peekable only provides single-item lookahead, it composes naturally with manual buffering for multi-item lookahead requirements. Use it when your streaming algorithm requires seeing the next item to decide how to process the current one, but avoid it when you always consume items regardless—there's no benefit to the extra caching in those cases.