Loading pageâŚ
Rust walkthroughs
Loading pageâŚ
futures::stream::Peekable for lookahead in stream processing?The futures::stream::Peekable wrapper provides lookahead capability for streams, allowing examination of the next item without consuming it. Unlike a regular stream where calling next removes items permanently, a Peekable stream caches the peeked item internally, returning it on the subsequent next call. This enables parsing patterns where you need to look ahead to determine how to process the current contextâessential for implementing parsers that need to distinguish between token sequences, protocols where message boundaries depend on upcoming data, or any streaming algorithm where decisions require visibility into future items without committing to consumption.
use futures::stream::{self, Stream};
async fn stream_without_peek() {
let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);
// Each next() consumes an item
assert_eq!(stream.next().await, Some(1));
assert_eq!(stream.next().await, Some(2));
// No way to examine items without consuming them
// Once consumed, items are gone
}Regular streams only move forward; items cannot be examined without consumption.
use futures::stream::{self, Stream, StreamExt};
async fn creating_peekable() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
// Wrap stream in Peekable for lookahead capability
let mut peekable = stream.peekable();
// peek() returns reference to next item without consuming
let peeked = peekable.peek().await;
assert_eq!(peeked, Some(&1));
// Item is still available for next()
assert_eq!(peekable.next().await, Some(1));
// Can peek again
assert_eq!(peekable.peek().await, Some(&2));
}peekable() wraps any stream, adding the peek method for lookahead.
use futures::stream::{self, StreamExt};
async fn peek_vs_next() {
let stream = stream::iter(vec!["a", "b", "c"]);
let mut peekable = stream.peekable();
// peek returns Option<&Item> - reference to cached item
let peeked: Option<&&str> = peekable.peek().await;
assert_eq!(peeked, Some(&"a"));
// peek again returns same cached item
let peeked_again: Option<&&str> = peekable.peek().await;
assert_eq!(peeked_again, Some(&"a"));
// next consumes and returns the cached item
let consumed: Option<&str> = peekable.next().await;
assert_eq!(consumed, Some("a"));
// Now peek returns next item
let peeked: Option<&&str> = peekable.peek().await;
assert_eq!(peeked, Some(&"b"));
}Multiple peeks return references to the same cached item until next consumes it.
use futures::stream::{self, StreamExt};
async fn internal_state() {
let stream = stream::iter(vec![1, 2, 3]);
let mut peekable = stream.peekable();
// Initially, Peekable has no cached item
// First peek triggers next() on underlying stream
let first_peek = peekable.peek().await;
// Internal state: cached = Some(1)
let second_peek = peekable.peek().await;
// Internal state: cached = Some(1) (unchanged)
let _ = peekable.next().await;
// Internal state: cached = None (cleared after next)
// Next peek fetches new item
let third_peek = peekable.peek().await;
// Internal state: cached = Some(2)
assert_eq!(third_peek, Some(&2));
}Peekable maintains an internal cache of one item for lookahead.
use futures::stream::{self, StreamExt};
#[derive(Debug, PartialEq)]
enum Token {
Number(i32),
Plus,
Minus,
Equals,
EqualsEquals, // ==
}
async fn parse_tokens() {
let tokens = vec![
Token::Number(1),
Token::Equals,
Token::Equals, // Should become EqualsEquals
Token::Number(2),
];
let stream = stream::iter(tokens);
let mut peekable = stream.peekable();
let mut parsed = Vec::new();
while let Some(token) = peekable.next().await {
match token {
Token::Equals => {
// Look ahead to see if next is also Equals
if let Some(&Token::Equals) = peekable.peek().await {
// Consume the second Equals
peekable.next().await;
parsed.push(Token::EqualsEquals);
} else {
parsed.push(Token::Equals);
}
}
other => parsed.push(other),
}
}
assert_eq!(parsed, vec![
Token::Number(1),
Token::EqualsEquals,
Token::Number(2),
]);
}Peek enables merging consecutive tokens without committing to consumption.
use futures::stream::{self, StreamExt};
async fn conditional_consumption() {
let numbers = stream::iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
let mut peekable = numbers.peekable();
let mut consumed = Vec::new();
// Consume items until we see a number divisible by 5
while let Some(&n) = peekable.peek().await {
if n % 5 == 0 {
// Stop before the divisible-by-5 number
break;
}
consumed.push(peekable.next().await.unwrap());
}
assert_eq!(consumed, vec![1, 2, 3, 4]);
// The 5 is still available
assert_eq!(peekable.peek().await, Some(&5));
assert_eq!(peekable.next().await, Some(5));
}Peek allows stopping based on upcoming items without consuming them.
use futures::stream::{self, StreamExt};
async fn peek_mut() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let mut peekable = stream.peekable();
// peek_mut returns mutable reference
if let Some(value) = peekable.peek_mut().await {
*value *= 10; // Modify the cached item
}
// The modified value is returned by next()
assert_eq!(peekable.next().await, Some(10));
assert_eq!(peekable.next().await, Some(2));
}peek_mut allows modifying the cached item before consumption.
use futures::stream::{self, StreamExt};
async fn multi_token_lookahead() {
// Peekable only caches one item, but we can consume and track
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let mut peekable = stream.peekable();
// For multi-item lookahead, manually track consumed items
let mut lookahead_buffer: Vec<i32> = Vec::new();
// Pre-fill buffer with items we might need to put back
for _ in 0..3 {
if let Some(item) = peekable.next().await {
lookahead_buffer.push(item);
}
}
// Now we have 3 items of lookahead in buffer
println!("Lookahead buffer: {:?}", lookahead_buffer);
assert_eq!(lookahead_buffer, vec![1, 2, 3]);
// Process items, potentially putting some back conceptually
// (though Peekable doesn't support push_back)
}For multi-item lookahead, combine Peekable with manual buffering.
use futures::stream::{self, StreamExt};
#[derive(Debug, Clone, PartialEq)]
enum ParseEvent {
StartElement(String),
EndElement(String),
Text(String),
}
async fn parse_xml_like() {
let events = vec![
ParseEvent::StartElement("div".to_string()),
ParseEvent::Text("hello".to_string()),
ParseEvent::EndElement("div".to_string()),
ParseEvent::StartElement("span".to_string()),
ParseEvent::EndElement("span".to_string()),
];
let stream = stream::iter(events);
let mut peekable = stream.peekable();
let mut depth = 0;
let mut processed = Vec::new();
while let Some(event) = peekable.next().await {
match event {
ParseEvent::StartElement(name) => {
depth += 1;
// Peek to see if next is EndElement (empty element optimization)
if let Some(ParseEvent::EndElement(end_name)) = peekable.peek().await {
if end_name == &name {
// Empty element, consume end tag too
peekable.next().await;
processed.push(format!("Empty element: {}", name));
depth -= 1;
continue;
}
}
processed.push(format!("Start: {}", name));
}
ParseEvent::EndElement(name) => {
depth -= 1;
processed.push(format!("End: {}", name));
}
ParseEvent::Text(text) => {
processed.push(format!("Text: {}", text));
}
}
}
println!("Processed: {:?}", processed);
}Peek enables context-sensitive parsing based on upcoming tokens.
use futures::stream::{self, StreamExt};
async fn detect_message_boundary() {
// Protocol: messages end with newline followed by digit
let bytes: Vec<u8> = b"message1\n1message2\n2\nother".to_vec();
let stream = stream::iter(bytes);
let mut peekable = stream.peekable();
let mut messages: Vec<String> = Vec::new();
let mut current = String::new();
while let Some(&byte) = peekable.peek().await {
peekable.next().await;
if byte == b'\n' {
// Peek to check if next is digit (message boundary)
if let Some(&next) = peekable.peek().await {
if next.is_ascii_digit() {
// Message complete
messages.push(current.clone());
current.clear();
continue;
}
}
}
current.push(byte as char);
}
// Handle remaining
if !current.is_empty() {
messages.push(current);
}
// Note: This is simplified - actual protocol parsing would be more complex
}Peek allows detecting message boundaries based on content patterns.
use futures::stream::{self, StreamExt};
async fn peek_with_take_while() {
let stream = stream::iter(vec![1, 2, 3, 4, 5, 6, 1, 2, 3]);
let mut peekable = stream.peekable();
// Take while numbers are ascending
// But peek allows checking without consuming
let mut ascending = Vec::new();
let mut prev = 0;
while let Some(¤t) = peekable.peek().await {
if current > prev {
ascending.push(peekable.next().await.unwrap());
prev = current;
} else {
break;
}
}
assert_eq!(ascending, vec![1, 2, 3, 4, 5, 6]);
// Stream continues at 1
assert_eq!(peekable.next().await, Some(1));
}Peek enables conditional continuation without consumption side effects.
use futures::stream::{self, StreamExt};
#[derive(Debug)]
enum ParseError {
UnexpectedToken(i32),
EndOfStream,
}
async fn parse_with_recovery() -> Result<Vec<i32>, ParseError> {
let stream = stream::iter(vec![1, -1, 2, 3, -1, 4, 5]);
let mut peekable = stream.peekable();
let mut result = Vec::new();
loop {
match peekable.peek().await {
Some(&n) if *n > 0 => {
result.push(peekable.next().await.unwrap());
}
Some(&-1) => {
// Skip error marker
peekable.next().await;
println!("Skipped error marker");
}
Some(&n) if *n < 0 => {
// Unexpected negative number
peekable.next().await; // Skip it
println!("Recovered from unexpected: {}", n);
}
None => break,
_ => {}
}
}
Ok(result)
}
#[tokio::main]
async fn main() {
let result = parse_with_recovery().await;
assert_eq!(result, Ok(vec![1, 2, 3, 4, 5]));
}Peek allows examining tokens for error recovery without committing.
use futures::stream::{self, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
async fn async_peekable() {
// Create an async channel
let (tx, rx) = tokio::sync::mpsc::channel::<i32>(10);
// Convert receiver to stream, then to peekable
let stream = ReceiverStream::new(rx);
let mut peekable = stream.peekable();
// Send some values
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
tx.send(3).await.unwrap();
drop(tx); // Close sender
// Can peek async values
assert_eq!(peekable.peek().await, Some(&1));
assert_eq!(peekable.next().await, Some(1));
assert_eq!(peekable.peek().await, Some(&2));
}Peekable works with any stream, including async channel streams.
use futures::stream::{self, StreamExt};
async fn performance_characteristics() {
let stream = stream::iter(0..1_000_000);
let mut peekable = stream.peekable();
// Peekable adds minimal overhead:
// - One item buffer (cached value)
// - Boolean flag for whether cache is populated
// - No heap allocation per peek
// Peeking the same item repeatedly is cheap
for _ in 0..10 {
let _ = peekable.peek().await;
}
// All 10 peeks return reference to same cached item
// Only one underlying stream fetch occurred
assert_eq!(peekable.next().await, Some(0));
// The peek is O(1) - just checking the cache
// If cache empty, triggers one next() on underlying stream
}Peekable is lightweight, buffering at most one item.
use futures::stream::{self, StreamExt};
async fn when_not_to_use() {
// Don't use Peekable when:
// 1. You always consume items anyway
let stream = stream::iter(vec![1, 2, 3]);
let mut peekable = stream.peekable();
while let Some(n) = peekable.next().await {
// If you never peek, Peekable adds overhead for no benefit
println!("{}", n);
}
// 2. You need more than one item of lookahead
// Peekable only caches one item; use manual buffering for multi-item
// 3. You need to push items back
// Peekable can't put items back onto stream
// 4. Random access is needed
// Peekable is still sequential; use Vec if random access needed
}Use Peekable when you need single-item lookahead, not as a default wrapper.
use futures::stream::{self, StreamExt};
use std::io::{BufRead, BufReader, Cursor};
async fn line_protocol_parser() {
// Protocol: lines with continuation starting with whitespace
let input = "START\n continuation\n more\nEND\nSTART2\nEND2\n";
let lines: Vec<&str> = input.lines().collect();
let stream = stream::iter(lines);
let mut peekable = stream.peekable();
let mut messages: Vec<Vec<String>> = Vec::new();
let mut current_message: Vec<String> = Vec::new();
while let Some(line) = peekable.next().await {
if line.starts_with("START") {
current_message.push(line.to_string());
// Peek ahead for continuation lines
while let Some(&next_line) = peekable.peek().await {
if next_line.starts_with(" ") {
// Continuation line
current_message.push(peekable.next().await.unwrap().to_string());
} else {
break;
}
}
messages.push(current_message.clone());
current_message.clear();
}
}
// messages[0] = ["START", " continuation", " more"]
// messages[1] = ["START2"]
}Peek allows distinguishing continuation lines from new messages.
use futures::stream::{self, StreamExt};
#[derive(Debug, Clone, PartialEq)]
enum Token {
Num(i64),
Plus,
Minus,
Star,
Slash,
LParen,
RParen,
}
async fn parse_expression() {
// Parse: 1 + 2 * (3 - 4)
let tokens = vec![
Token::Num(1),
Token::Plus,
Token::Num(2),
Token::Star,
Token::LParen,
Token::Num(3),
Token::Minus,
Token::Num(4),
Token::RParen,
];
let stream = stream::iter(tokens);
let mut peekable = stream.peekable();
// Parser needs lookahead for operator precedence
// e.g., after seeing Num(2), need to peek to see if next is * or +
fn parse_term(peekable: &mut futures::stream::Peekable<impl StreamExt<Item = Token> + Unpin>) -> Option<i64> {
match peekable.peek().await? {
Token::Num(n) => {
let n = *n;
peekable.next().await;
Some(n)
}
Token::LParen => {
peekable.next().await;
let result = parse_expression_internal(peekable)?;
if peekable.peek().await == Some(&Token::RParen) {
peekable.next().await;
}
Some(result)
}
_ => None,
}
}
fn parse_expression_internal(peekable: &mut futures::stream::Peekable<impl StreamExt<Item = Token> + Unpin>) -> Option<i64> {
let mut left = parse_term(peekable)?;
while let Some(&op) = peekable.peek().await {
match op {
Token::Plus => {
peekable.next().await;
let right = parse_term(peekable)?;
left += right;
}
Token::Minus => {
peekable.next().await;
let right = parse_term(peekable)?;
left -= right;
}
Token::Star => {
peekable.next().await;
let right = parse_term(peekable)?;
left *= right;
}
Token::Slash => {
peekable.next().await;
let right = parse_term(peekable)?;
left /= right;
}
Token::RParen => break,
_ => break,
}
}
Some(left)
}
// Simplified parsing demonstration
}Expression parsing requires lookahead to handle operator precedence.
Peekable methods:
| Method | Returns | Behavior |
|--------|---------|----------|
| peek() | Option<&Item> | Returns reference without consuming |
| peek_mut() | Option<&mut Item> | Returns mutable reference |
| next() | Option<Item> | Consumes and returns cached item |
Peek state machine:
| State | peek() | next() |
|-------|----------|----------|
| Cache empty | Fetch from stream, cache, return ref | Fetch from stream, return item |
| Cache populated | Return ref to cached | Return cached, clear cache |
| Stream ended | Return None | Return None |
Common patterns:
| Pattern | Description | |---------|-------------| | Multi-character token | Peek to combine consecutive tokens | | Conditional break | Stop before matching item | | Error recovery | Examine next token, decide to skip or handle | | State machine | Peek to determine next state transition |
Key insight: Peekable solves the fundamental limitation of streamsâyou cannot examine future items without consuming them. This is essential for parsing algorithms where the current token's meaning depends on what follows: distinguishing = from ==, detecting continuation lines in protocols, implementing operator precedence parsing where you need to see the operator before committing to consume operands. The implementation is lightweight, caching at most one item, making it suitable for high-performance streaming contexts. The peek_mut variant extends this to allow in-place modification of the cached item, useful for transforming items before they're processed downstream. While Peekable only provides single-item lookahead, it composes naturally with manual buffering for multi-item lookahead requirements. Use it when your streaming algorithm requires seeing the next item to decide how to process the current one, but avoid it when you always consume items regardlessâthere's no benefit to the extra caching in those cases.